[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

Vitaly Davidovich vitalyd at gmail.com
Fri Jun 27 18:05:10 EDT 2014


Why not do this in continuation passing style? That is, submit items to a
TPE that execute first (I.e. they produce more work) and when they're done
producing work, have them queue up subsequent tasks (and those in turn can
produce further tasks).

Sent from my phone
On Jun 27, 2014 5:21 PM, "DT" <dt at flyingtroika.com> wrote:

>  if we dispatch tasks based on queuing we would not get guarantees that
> the tasks first in line will finish execution first and futures are done to
> return results (in general none of the dispatching mechanism will give me
> such behavior, can be wrong of course, sorry if I am mangled callable
> priorities which I wanted to use to control execution of the tasks in some
> order with overall thread priorities)
> I think that I need some sort of scheduling constraints what task should
> be done first so I can use futures results for dispatching next set of
> tasks. I am not sure if for example  CompletableFuture in java 8 meant to
> be used for such problems. Namely tasks have to be executed in certain
> order/deliver results and they can have dependency/relationships between
> each other.
>
> By the way in what circumstances would you recommend to use TPE
> constructor with own ThreadFactory implementation and not to relly on the
> default one?
>
> DT
>
> On 6/27/2014 1:09 PM, David Holmes wrote:
>
> Priority can only really be used for the dispatching/queueing mechanism.
> Once tasks are executing priority is essentially meaningless. Thread
> priorities have no significance in general.
>
> David
>
> -----Original Message-----
> *From:* concurrency-interest-bounces at cs.oswego.edu [
> mailto:concurrency-interest-bounces at cs.oswego.edu
> <concurrency-interest-bounces at cs.oswego.edu>]*On Behalf Of *DT
> *Sent:* Saturday, 28 June 2014 1:42 AM
> *To:* viktor ?lang; dholmes at ieee.org
> *Cc:* concurrency-interest
> *Subject:* Re: [concurrency-interest] Custom ThreadPoolExecutor
> implementation to handle priority based Callable tasks
>
>                  We see several scenarios how to approach:
>
> 1-st approach is to use a PriorityBlockingQueue, provide Comparator for
> Callable tasks to change the order based on the Callable priority. Then
> when we call invokeAll( Collections of Callable tasks) on TPE to execute
> tasks from queue based on the priority, 1-st, 2-d, n which means that tasks
> will be executed based on abstract groups (for instance  1-st group – all
> tasks with high priority go first according to TPE scheduling.
> tpe.execute(thread) will be called and tasks will be assigned to available
> tpe worker threads.
>
> Following this logic tpe does not guarantee for sure that all high
> priority tasks will be executed first. Or this guarantee is not enough
> enforced (maybe there are some policy to enforce this)
>
>  2-d approach is to modify invokeAll() to handle priority explicitly
> which does not look like a clean solution.
>
>  3-d approach is to have multiple TPEs and each of these tpe will handle
> only one type of priority without mix, for example tpe1 will
> invokeAll(tasks priority 1 only) , tpe2 will invokeAll(tasks priority 2
> only). Which tpe should be instantiated -1st should follow business logic.
>
>   Here is an example what we trying to implement. Assuming we have
> different types of tasks which are bounded to get some resources using
> udp/tcp/http/sql. Something like
>
> sqlWorker needs to execute SQLs and return results, httpWorker type has to
> handle http calls and return some results, etc. the issue is that even if
> we assign some priorities to these workers to perform similar tasks
> following some business logic what data needs to get first within our
> workflow we would have to change priorities dynamically just because same
> type of workers might  finish execution within different timeslot. So we
> have to wait until all workers are executed (practically tune timeslot
> based on the longest execution time) or drop some tasks due to the timing
> limits. If we follow 3-d approach it would be hard to manage TPEs. If we
> follow 2-d approach it would be hard to maintain tasks and guarantee
> execution group by group.  And 1-st approach seems does not completely
> satisfy the problem concurrency execution logic.
> On 6/27/2014 3:12 AM, √iktor Ҡlang wrote:
>
> "  public abstract void setPriority(int priority);" ? Changing priority
> on an already submitted task seems strange.
>
> I agree with David, just use a PriorityBlockingQueue [
> http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html]
> for the submission queue of the TPE and provide a Comparator that checks
> for your PriorityCallable and puts non-PriorityCallable-Callables at an
> appropriate default priority.
>
>
> On Fri, Jun 27, 2014 at 6:16 AM, David Holmes <davidcholmes at aapt.net.au>
> wrote:
>
>> Can't you just use your PriorityCallable with a custom PriorityQueue and
>> standard ThreadPoolExecutor?
>>
>> David
>>
>> > -----Original Message-----
>> > From: concurrency-interest-bounces at cs.oswego.edu
>> > [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of DT
>> > Sent: Friday, 27 June 2014 2:08 PM
>> > To: concurrency-interest at cs.oswego.edu
>> > Subject: [concurrency-interest] Custom ThreadPoolExecutor implementation
>> > to handle priority based Callable tasks
>> >
>> >
>> > If we were to control Callable Tasks with Priorities and to manage
>> > execution of those callable tasks through ThreadPoolExecutor we would
>> > have to implement custom ThreadPoolExecutor and PriorityCallable (see a
>> > suggestion below). What other ways to accomplish this do you see ?
>> >
>> > public interface PriorityCallable extends Callable<Object> {
>> >      public abstract void setPriority(int priority);
>> >      ...
>> > }
>> >
>> > public class CustomThreadPoolExecutor  extends ThreadPoolExecutor {
>> > ...
>> > ...
>> > public <T> List<Future<T>> customInvokeAll(
>> >                  Collection<? extends Callable<T>> tasks, long timeout,
>> > TimeUnit unit) // pass PriorityCallable
>> >                  throws InterruptedException {
>> >              if (tasks == null || unit == null)
>> >                  throw new NullPointerException();
>> >              long nanos = unit.toNanos(timeout);
>> >              List<Future<T>> futures = new
>> > ArrayList<Future<T>>(tasks.size());
>> >              boolean done = false;
>> >              try {
>> >                  // handle Priority based Callable tasks here, though
>> > can expect timing issues
>> >                  // custom logic to group Callable tasks , if (priority
>> > == 1, 2, 3 create group of callables, etc )
>> >                  for (Callable<T> t : tasks)
>> >                      futures.add(newTaskFor(t));
>> >
>> >                  long lastTime = System.nanoTime();
>> >
>> >                  // Interleave time checks and calls to execute in case
>> >                  // executor doesn't have any/much parallelism.
>> >                  Iterator<Future<T>> it = futures.iterator();
>> >                  while (it.hasNext()) {
>> >                      execute((Runnable)(it.next())); // should we expect
>> > futures get executed within the same timeframe for Callables with
>> > different priorities
>> >                      long now = System.nanoTime();
>> >                      nanos -= now - lastTime;
>> >                      lastTime = now;
>> >                      if (nanos <= 0)
>> >                          return futures;
>> >                  }
>> >
>> >                  for (Future<T> f : futures) {
>> >                      if (!f.isDone()) {
>> >                          if (nanos <= 0)
>> >                              return futures;
>> >                          try {
>> >                              f.get(nanos, TimeUnit.NANOSECONDS);
>> >                          } catch (CancellationException ignore) {
>> >                          } catch (ExecutionException ignore) { // Should
>> > we cancel tasks based on the priority as well?
>> >                          } catch (TimeoutException toe) {
>> >                              return futures;
>> >                          }
>> >                          long now = System.nanoTime();
>> >                          nanos -= now - lastTime;
>> >                          lastTime = now;
>> >                      }
>> >                  }
>> >                  done = true;
>> >                  return futures;
>> >              } finally {
>> >                  if (!done)
>> >                      for (Future<T> f : futures)
>> >                          f.cancel(true);
>> >              }
>> >          }
>> > ...
>> > }
>> >
>> > Thank you,
>> > dt
>> > http://www.flyingtroika.com/
>> >
>> > _______________________________________________
>> > Concurrency-interest mailing list
>> > Concurrency-interest at cs.oswego.edu
>> > http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>
>
>
>  --
>  Cheers,
>>
>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140627/f4dd41ca/attachment-0001.html>


More information about the Concurrency-interest mailing list