[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

DT dt at flyingtroika.com
Fri Jun 27 23:49:39 EDT 2014


I believe that
1 -  should be ok to accomplish
2 -  ok to satisfy or at least get very close to it as we know all 
dependencies at the beginning
3 -  I like idea with counters: simple, straight and easy to follow

However, the issue is that all the tasks (with dependency, etc) have to 
finish execution within the same time slot as a whole.  We have to 
enforce some sort of timing constraint. Same task can take different 
time to execute (we can apply some statistical methods to get the 
average for all the tasks based on the historical execution time).

Would it be better to apply some sort of cyclic barriers or counters?

Thanks,
DT

On 6/27/2014 3:30 PM, Oleksandr Otenko wrote:
> 1. Each task must know how many dependencies it has.
>
> 2. Each task knows which other tasks depend on it.
>
> 3. Upon finishing a task, decrement the counters of all dependent 
> tasks. Those that reach zero, get scheduled with the TPE.
>
>
> If you don't know which tasks depend on which before executing them 
> (ie you don't have answer for (2)), there is no efficient way of 
> planning their execution.
>
> Alex
>
>
> On 27/06/2014 22:04, DT wrote:
>> if we dispatch tasks based on queuing we would not get guarantees 
>> that the tasks first in line will finish execution first and futures 
>> are done to return results (in general none of the dispatching 
>> mechanism will give me such behavior, can be wrong of course, sorry 
>> if I am mangled callable priorities which I wanted to use to control 
>> execution of the tasks in some order with overall thread priorities)
>> I think that I need some sort of scheduling constraints what task 
>> should be done first so I can use futures results for dispatching 
>> next set of tasks. I am not sure if for example CompletableFuture in 
>> java 8 meant to be used for such problems. Namely tasks have to be 
>> executed in certain order/deliver results and they can have 
>> dependency/relationships between each other.
>>
>> By the way in what circumstances would you recommend to use TPE 
>> constructor with own ThreadFactory implementation and not to relly on 
>> the default one?
>>
>> DT
>>
>> On 6/27/2014 1:09 PM, David Holmes wrote:
>>> Priority can only really be used for the dispatching/queueing 
>>> mechanism. Once tasks are executing priority is essentially 
>>> meaningless. Thread priorities have no significance in general.
>>> David
>>>
>>>     -----Original Message-----
>>>     *From:* concurrency-interest-bounces at cs.oswego.edu
>>>     [mailto:concurrency-interest-bounces at cs.oswego.edu]*On Behalf Of *DT
>>>     *Sent:* Saturday, 28 June 2014 1:42 AM
>>>     *To:* viktor ?lang; dholmes at ieee.org
>>>     *Cc:* concurrency-interest
>>>     *Subject:* Re: [concurrency-interest] Custom ThreadPoolExecutor
>>>     implementation to handle priority based Callable tasks
>>>
>>>                     We see several scenarios how to approach:
>>>
>>>     1-st approach is to use a PriorityBlockingQueue, provide
>>>     Comparator for Callable tasks to change the order based on the
>>>     Callable priority. Then when we call invokeAll( Collections of
>>>     Callable tasks) on TPE to execute tasks from queue based on the
>>>     priority, 1-st, 2-d, n which means that tasks will be executed
>>>     based on abstract groups (for instance1-st group -- all tasks
>>>     with high priority go first according to TPE scheduling.
>>>     tpe.execute(thread) will be called and tasks will be assigned to
>>>     available tpe worker threads.
>>>
>>>     Following this logic tpe does not guarantee for sure that all
>>>     high priority tasks will be executed first. Or this guarantee is
>>>     not enough enforced (maybe there are some policy to enforce this)
>>>
>>>     2-d approach is to modify invokeAll() to handle priority
>>>     explicitly which does not look like a clean solution.
>>>
>>>     3-d approach is to have multiple TPEs and each of these tpe will
>>>     handle only one type of priority without mix, for example tpe1
>>>     will invokeAll(tasks priority 1 only) , tpe2 will
>>>     invokeAll(tasks priority 2 only). Which tpe should be
>>>     instantiated -1^st should follow business logic.
>>>
>>>     Here is an example what we trying to implement. Assuming we have
>>>     different types of tasks which are bounded to get some resources
>>>     using udp/tcp/http/sql. Something like
>>>
>>>     sqlWorker needs to execute SQLs and return results, httpWorker
>>>     type has to handle http calls and return some results, etc. the
>>>     issue is that even if we assign some priorities to these workers
>>>     to perform similar tasks following some business logic what data
>>>     needs to get first within our workflow we would have to change
>>>     priorities dynamically just because same type of workers
>>>     mightfinish execution within different timeslot. So we have to
>>>     wait until all workers are executed (practically tune timeslot
>>>     based on the longest execution time) or drop some tasks due to
>>>     the timing limits. If we follow 3-d approach it would be hard to
>>>     manage TPEs. If we follow 2-d approach it would be hard to
>>>     maintain tasks and guarantee execution group by group.And 1-st
>>>     approach seems does not completely satisfy the problem
>>>     concurrency execution logic.
>>>
>>>     On 6/27/2014 3:12 AM, ?iktor ?lang wrote:
>>>>     "  public abstract void setPriority(int priority);" ? Changing
>>>>     priority on an already submitted task seems strange.
>>>>
>>>>     I agree with David, just use a PriorityBlockingQueue
>>>>     [http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html]
>>>>     for the submission queue of the TPE and provide a Comparator
>>>>     that checks for your PriorityCallable and puts
>>>>     non-PriorityCallable-Callables at an appropriate default priority.
>>>>
>>>>
>>>>     On Fri, Jun 27, 2014 at 6:16 AM, David Holmes
>>>>     <davidcholmes at aapt.net.au <mailto:davidcholmes at aapt.net.au>> wrote:
>>>>
>>>>         Can't you just use your PriorityCallable with a custom
>>>>         PriorityQueue and
>>>>         standard ThreadPoolExecutor?
>>>>
>>>>         David
>>>>
>>>>         > -----Original Message-----
>>>>         > From: concurrency-interest-bounces at cs.oswego.edu
>>>>         <mailto:concurrency-interest-bounces at cs.oswego.edu>
>>>>         > [mailto:concurrency-interest-bounces at cs.oswego.edu
>>>>         <mailto:concurrency-interest-bounces at cs.oswego.edu>]On
>>>>         Behalf Of DT
>>>>         > Sent: Friday, 27 June 2014 2:08 PM
>>>>         > To: concurrency-interest at cs.oswego.edu
>>>>         <mailto:concurrency-interest at cs.oswego.edu>
>>>>         > Subject: [concurrency-interest] Custom ThreadPoolExecutor
>>>>         implementation
>>>>         > to handle priority based Callable tasks
>>>>         >
>>>>         >
>>>>         > If we were to control Callable Tasks with Priorities and
>>>>         to manage
>>>>         > execution of those callable tasks through
>>>>         ThreadPoolExecutor we would
>>>>         > have to implement custom ThreadPoolExecutor and
>>>>         PriorityCallable (see a
>>>>         > suggestion below). What other ways to accomplish this do
>>>>         you see ?
>>>>         >
>>>>         > public interface PriorityCallable extends Callable<Object> {
>>>>         >      public abstract void setPriority(int priority);
>>>>         >      ...
>>>>         > }
>>>>         >
>>>>         > public class CustomThreadPoolExecutor  extends
>>>>         ThreadPoolExecutor {
>>>>         > ...
>>>>         > ...
>>>>         > public <T> List<Future<T>> customInvokeAll(
>>>>         >                  Collection<? extends Callable<T>> tasks,
>>>>         long timeout,
>>>>         > TimeUnit unit) // pass PriorityCallable
>>>>         >                  throws InterruptedException {
>>>>         >              if (tasks == null || unit == null)
>>>>         >                  throw new NullPointerException();
>>>>         >              long nanos = unit.toNanos(timeout);
>>>>         >              List<Future<T>> futures = new
>>>>         > ArrayList<Future<T>>(tasks.size());
>>>>         >              boolean done = false;
>>>>         >              try {
>>>>         >                  // handle Priority based Callable tasks
>>>>         here, though
>>>>         > can expect timing issues
>>>>         >                  // custom logic to group Callable tasks
>>>>         , if (priority
>>>>         > == 1, 2, 3 create group of callables, etc )
>>>>         >                  for (Callable<T> t : tasks)
>>>>         >  futures.add(newTaskFor(t));
>>>>         >
>>>>         >                  long lastTime = System.nanoTime();
>>>>         >
>>>>         >                  // Interleave time checks and calls to
>>>>         execute in case
>>>>         >                  // executor doesn't have any/much
>>>>         parallelism.
>>>>         >  Iterator<Future<T>> it = futures.iterator();
>>>>         >                  while (it.hasNext()) {
>>>>         >  execute((Runnable)(it.next())); // should we expect
>>>>         > futures get executed within the same timeframe for
>>>>         Callables with
>>>>         > different priorities
>>>>         >                      long now = System.nanoTime();
>>>>         >                      nanos -= now - lastTime;
>>>>         >                      lastTime = now;
>>>>         >                      if (nanos <= 0)
>>>>         >                          return futures;
>>>>         >                  }
>>>>         >
>>>>         >                  for (Future<T> f : futures) {
>>>>         >                      if (!f.isDone()) {
>>>>         >                          if (nanos <= 0)
>>>>         >                              return futures;
>>>>         >                          try {
>>>>         >                              f.get(nanos,
>>>>         TimeUnit.NANOSECONDS);
>>>>         >                          } catch (CancellationException
>>>>         ignore) {
>>>>         >                          } catch (ExecutionException
>>>>         ignore) { // Should
>>>>         > we cancel tasks based on the priority as well?
>>>>         >                          } catch (TimeoutException toe) {
>>>>         >                              return futures;
>>>>         >                          }
>>>>         >                          long now = System.nanoTime();
>>>>         >                          nanos -= now - lastTime;
>>>>         >                          lastTime = now;
>>>>         >                      }
>>>>         >                  }
>>>>         >                  done = true;
>>>>         >                  return futures;
>>>>         >              } finally {
>>>>         >                  if (!done)
>>>>         >                      for (Future<T> f : futures)
>>>>         >                          f.cancel(true);
>>>>         >              }
>>>>         >          }
>>>>         > ...
>>>>         > }
>>>>         >
>>>>         > Thank you,
>>>>         > dt
>>>>         > http://www.flyingtroika.com/
>>>>         >
>>>>         > _______________________________________________
>>>>         > Concurrency-interest mailing list
>>>>         > Concurrency-interest at cs.oswego.edu
>>>>         <mailto:Concurrency-interest at cs.oswego.edu>
>>>>         > http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>
>>>>         _______________________________________________
>>>>         Concurrency-interest mailing list
>>>>         Concurrency-interest at cs.oswego.edu
>>>>         <mailto:Concurrency-interest at cs.oswego.edu>
>>>>         http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>
>>>>
>>>>
>>>>
>>>>     -- 
>>>>     Cheers,
>>>>     ?
>>>
>>
>>
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140627/45e09fa1/attachment-0001.html>


More information about the Concurrency-interest mailing list