[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

Peter Levart peter.levart at gmail.com
Sun Jun 29 19:06:52 EDT 2014


On 06/28/2014 05:49 AM, DT wrote:
> I believe that
> 1 -  should be ok to accomplish
> 2 -  ok to satisfy or at least get very close to it as we know all 
> dependencies at the beginning
> 3 -  I like idea with counters: simple, straight and easy to follow
>
> However, the issue is that all the tasks (with dependency, etc) have 
> to finish execution within the same time slot as a whole.  We have to 
> enforce some sort of timing constraint. Same task can take different 
> time to execute (we can apply some statistical methods to get the 
> average for all the tasks based on the historical execution time).
>
> Would it be better to apply some sort of cyclic barriers or counters?
>
> Thanks,
> DT

Do you know the time slot in advance, when you submit the 1st task in a 
group of dependent tasks? If so, you could combine the Oleksandr's 
suggestion (which is how JDK8 CompletableFuture works) with the notion 
of "deadline" that you assign to a group. Deadline can serve two purposes:
- expires the tree of unfinished dependent tasks in a group when 
deadline is reached, skipping their execution
- can be used to prioritize de-queueing of tasks by TPE that will expire 
earlier (using PriorityBlockingQueue)

See here:

https://github.com/plevart/concurrent-utils/blob/master/src/si/pele/concurrent/ExpirableFutureTask.java

And an example here:

https://github.com/plevart/concurrent-utils/blob/master/src/si/pele/concurrent/test/EFTTest.java


Regards, Peter

> On 6/27/2014 3:30 PM, Oleksandr Otenko wrote:
>> 1. Each task must know how many dependencies it has.
>>
>> 2. Each task knows which other tasks depend on it.
>>
>> 3. Upon finishing a task, decrement the counters of all dependent 
>> tasks. Those that reach zero, get scheduled with the TPE.
>>
>>
>> If you don't know which tasks depend on which before executing them 
>> (ie you don't have answer for (2)), there is no efficient way of 
>> planning their execution.
>>
>> Alex
>>
>>
>> On 27/06/2014 22:04, DT wrote:
>>> if we dispatch tasks based on queuing we would not get guarantees 
>>> that the tasks first in line will finish execution first and futures 
>>> are done to return results (in general none of the dispatching 
>>> mechanism will give me such behavior, can be wrong of course, sorry 
>>> if I am mangled callable priorities which I wanted to use to control 
>>> execution of the tasks in some order with overall thread priorities)
>>> I think that I need some sort of scheduling constraints what task 
>>> should be done first so I can use futures results for dispatching 
>>> next set of tasks. I am not sure if for example CompletableFuture in 
>>> java 8 meant to be used for such problems. Namely tasks have to be 
>>> executed in certain order/deliver results and they can have 
>>> dependency/relationships between each other.
>>>
>>> By the way in what circumstances would you recommend to use TPE 
>>> constructor with own ThreadFactory implementation and not to relly 
>>> on the default one?
>>>
>>> DT
>>>
>>> On 6/27/2014 1:09 PM, David Holmes wrote:
>>>> Priority can only really be used for the dispatching/queueing 
>>>> mechanism. Once tasks are executing priority is essentially 
>>>> meaningless. Thread priorities have no significance in general.
>>>> David
>>>>
>>>>     -----Original Message-----
>>>>     *From:* concurrency-interest-bounces at cs.oswego.edu
>>>>     [mailto:concurrency-interest-bounces at cs.oswego.edu]*On Behalf
>>>>     Of *DT
>>>>     *Sent:* Saturday, 28 June 2014 1:42 AM
>>>>     *To:* viktor ?lang; dholmes at ieee.org
>>>>     *Cc:* concurrency-interest
>>>>     *Subject:* Re: [concurrency-interest] Custom ThreadPoolExecutor
>>>>     implementation to handle priority based Callable tasks
>>>>
>>>>                     We see several scenarios how to approach:
>>>>
>>>>     1-st approach is to use a PriorityBlockingQueue, provide
>>>>     Comparator for Callable tasks to change the order based on the
>>>>     Callable priority. Then when we call invokeAll( Collections of
>>>>     Callable tasks) on TPE to execute tasks from queue based on the
>>>>     priority, 1-st, 2-d, n which means that tasks will be executed
>>>>     based on abstract groups (for instance1-st group -- all tasks
>>>>     with high priority go first according to TPE scheduling.
>>>>     tpe.execute(thread) will be called and tasks will be assigned
>>>>     to available tpe worker threads.
>>>>
>>>>     Following this logic tpe does not guarantee for sure that all
>>>>     high priority tasks will be executed first. Or this guarantee
>>>>     is not enough enforced (maybe there are some policy to enforce
>>>>     this)
>>>>
>>>>     2-d approach is to modify invokeAll() to handle priority
>>>>     explicitly which does not look like a clean solution.
>>>>
>>>>     3-d approach is to have multiple TPEs and each of these tpe
>>>>     will handle only one type of priority without mix, for example
>>>>     tpe1 will invokeAll(tasks priority 1 only) , tpe2 will
>>>>     invokeAll(tasks priority 2 only). Which tpe should be
>>>>     instantiated -1^st should follow business logic.
>>>>
>>>>     Here is an example what we trying to implement. Assuming we
>>>>     have different types of tasks which are bounded to get some
>>>>     resources using udp/tcp/http/sql. Something like
>>>>
>>>>     sqlWorker needs to execute SQLs and return results, httpWorker
>>>>     type has to handle http calls and return some results, etc. the
>>>>     issue is that even if we assign some priorities to these
>>>>     workers to perform similar tasks following some business logic
>>>>     what data needs to get first within our workflow we would have
>>>>     to change priorities dynamically just because same type of
>>>>     workers mightfinish execution within different timeslot. So we
>>>>     have to wait until all workers are executed (practically tune
>>>>     timeslot based on the longest execution time) or drop some
>>>>     tasks due to the timing limits. If we follow 3-d approach it
>>>>     would be hard to manage TPEs. If we follow 2-d approach it
>>>>     would be hard to maintain tasks and guarantee execution group
>>>>     by group.And 1-st approach seems does not completely satisfy
>>>>     the problem concurrency execution logic.
>>>>
>>>>     On 6/27/2014 3:12 AM, ?iktor ?lang wrote:
>>>>>     "  public abstract void setPriority(int priority);" ? Changing
>>>>>     priority on an already submitted task seems strange.
>>>>>
>>>>>     I agree with David, just use a PriorityBlockingQueue
>>>>>     [http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html]
>>>>>     for the submission queue of the TPE and provide a Comparator
>>>>>     that checks for your PriorityCallable and puts
>>>>>     non-PriorityCallable-Callables at an appropriate default priority.
>>>>>
>>>>>
>>>>>     On Fri, Jun 27, 2014 at 6:16 AM, David Holmes
>>>>>     <davidcholmes at aapt.net.au <mailto:davidcholmes at aapt.net.au>>
>>>>>     wrote:
>>>>>
>>>>>         Can't you just use your PriorityCallable with a custom
>>>>>         PriorityQueue and
>>>>>         standard ThreadPoolExecutor?
>>>>>
>>>>>         David
>>>>>
>>>>>         > -----Original Message-----
>>>>>         > From: concurrency-interest-bounces at cs.oswego.edu
>>>>>         <mailto:concurrency-interest-bounces at cs.oswego.edu>
>>>>>         > [mailto:concurrency-interest-bounces at cs.oswego.edu
>>>>>         <mailto:concurrency-interest-bounces at cs.oswego.edu>]On
>>>>>         Behalf Of DT
>>>>>         > Sent: Friday, 27 June 2014 2:08 PM
>>>>>         > To: concurrency-interest at cs.oswego.edu
>>>>>         <mailto:concurrency-interest at cs.oswego.edu>
>>>>>         > Subject: [concurrency-interest] Custom
>>>>>         ThreadPoolExecutor implementation
>>>>>         > to handle priority based Callable tasks
>>>>>         >
>>>>>         >
>>>>>         > If we were to control Callable Tasks with Priorities and
>>>>>         to manage
>>>>>         > execution of those callable tasks through
>>>>>         ThreadPoolExecutor we would
>>>>>         > have to implement custom ThreadPoolExecutor and
>>>>>         PriorityCallable (see a
>>>>>         > suggestion below). What other ways to accomplish this do
>>>>>         you see ?
>>>>>         >
>>>>>         > public interface PriorityCallable extends Callable<Object> {
>>>>>         >      public abstract void setPriority(int priority);
>>>>>         >      ...
>>>>>         > }
>>>>>         >
>>>>>         > public class CustomThreadPoolExecutor  extends
>>>>>         ThreadPoolExecutor {
>>>>>         > ...
>>>>>         > ...
>>>>>         > public <T> List<Future<T>> customInvokeAll(
>>>>>         >                  Collection<? extends Callable<T>>
>>>>>         tasks, long timeout,
>>>>>         > TimeUnit unit) // pass PriorityCallable
>>>>>         >                  throws InterruptedException {
>>>>>         >              if (tasks == null || unit == null)
>>>>>         >                  throw new NullPointerException();
>>>>>         >              long nanos = unit.toNanos(timeout);
>>>>>         >              List<Future<T>> futures = new
>>>>>         > ArrayList<Future<T>>(tasks.size());
>>>>>         >              boolean done = false;
>>>>>         >              try {
>>>>>         >                  // handle Priority based Callable tasks
>>>>>         here, though
>>>>>         > can expect timing issues
>>>>>         >                  // custom logic to group Callable tasks
>>>>>         , if (priority
>>>>>         > == 1, 2, 3 create group of callables, etc )
>>>>>         >                  for (Callable<T> t : tasks)
>>>>>         >  futures.add(newTaskFor(t));
>>>>>         >
>>>>>         >                  long lastTime = System.nanoTime();
>>>>>         >
>>>>>         >                  // Interleave time checks and calls to
>>>>>         execute in case
>>>>>         >                  // executor doesn't have any/much
>>>>>         parallelism.
>>>>>         >  Iterator<Future<T>> it = futures.iterator();
>>>>>         >                  while (it.hasNext()) {
>>>>>         >  execute((Runnable)(it.next())); // should we expect
>>>>>         > futures get executed within the same timeframe for
>>>>>         Callables with
>>>>>         > different priorities
>>>>>         >                      long now = System.nanoTime();
>>>>>         >                      nanos -= now - lastTime;
>>>>>         >                      lastTime = now;
>>>>>         >                      if (nanos <= 0)
>>>>>         >                          return futures;
>>>>>         >                  }
>>>>>         >
>>>>>         >                  for (Future<T> f : futures) {
>>>>>         >                      if (!f.isDone()) {
>>>>>         >                          if (nanos <= 0)
>>>>>         >                              return futures;
>>>>>         >                          try {
>>>>>         >                              f.get(nanos,
>>>>>         TimeUnit.NANOSECONDS);
>>>>>         >                          } catch (CancellationException
>>>>>         ignore) {
>>>>>         >                          } catch (ExecutionException
>>>>>         ignore) { // Should
>>>>>         > we cancel tasks based on the priority as well?
>>>>>         >                          } catch (TimeoutException toe) {
>>>>>         >                              return futures;
>>>>>         >                          }
>>>>>         >                          long now = System.nanoTime();
>>>>>         >                          nanos -= now - lastTime;
>>>>>         >                          lastTime = now;
>>>>>         >                      }
>>>>>         >                  }
>>>>>         >                  done = true;
>>>>>         >                  return futures;
>>>>>         >              } finally {
>>>>>         >                  if (!done)
>>>>>         >                      for (Future<T> f : futures)
>>>>>         >                          f.cancel(true);
>>>>>         >              }
>>>>>         >          }
>>>>>         > ...
>>>>>         > }
>>>>>         >
>>>>>         > Thank you,
>>>>>         > dt
>>>>>         > http://www.flyingtroika.com/
>>>>>         >
>>>>>         > _______________________________________________
>>>>>         > Concurrency-interest mailing list
>>>>>         > Concurrency-interest at cs.oswego.edu
>>>>>         <mailto:Concurrency-interest at cs.oswego.edu>
>>>>>         > http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>
>>>>>         _______________________________________________
>>>>>         Concurrency-interest mailing list
>>>>>         Concurrency-interest at cs.oswego.edu
>>>>>         <mailto:Concurrency-interest at cs.oswego.edu>
>>>>>         http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>     -- 
>>>>>     Cheers,
>>>>>     ?
>>>>
>>>
>>>
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.oswego.edu
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140630/adf940ca/attachment-0001.html>


More information about the Concurrency-interest mailing list