[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

Peter Levart peter.levart at gmail.com
Tue Jul 1 09:45:58 EDT 2014


On 07/01/2014 06:23 AM, DT wrote:
> Peter,
>
> Is this code (ExpirableFutureTask ) based on java 7 or it requires 8? 
> I need to perform some experiments with CompletableFutures, its for 
> sure. We are using java 7 and moving to 8 can take some time. I am a 
> slightly conservative in terms of moving to the next version.

I used lambdas in the EFTTest example, but ExpirableFutureTask should 
compile in JDK7 too - it's based on FutureTask and you can always use 
anonymous inner classes instead of lambdas.

Regards, Peter

>
> DT
>
> On 6/29/2014 4:06 PM, Peter Levart wrote:
>>
>> On 06/28/2014 05:49 AM, DT wrote:
>>> I believe that
>>> 1 -  should be ok to accomplish
>>> 2 -  ok to satisfy or at least get very close to it as we know all 
>>> dependencies at the beginning
>>> 3 -  I like idea with counters: simple, straight and easy to follow
>>>
>>> However, the issue is that all the tasks (with dependency, etc) have 
>>> to finish execution within the same time slot as a whole. We have to 
>>> enforce some sort of timing constraint. Same task can take different 
>>> time to execute (we can apply some statistical methods to get the 
>>> average for all the tasks based on the historical execution time).
>>>
>>> Would it be better to apply some sort of cyclic barriers or counters?
>>>
>>> Thanks,
>>> DT
>>
>> Do you know the time slot in advance, when you submit the 1st task in 
>> a group of dependent tasks? If so, you could combine the Oleksandr's 
>> suggestion (which is how JDK8 CompletableFuture works) with the 
>> notion of "deadline" that you assign to a group. Deadline can serve 
>> two purposes:
>> - expires the tree of unfinished dependent tasks in a group when 
>> deadline is reached, skipping their execution
>> - can be used to prioritize de-queueing of tasks by TPE that will 
>> expire earlier (using PriorityBlockingQueue)
>>
>> See here:
>>
>> https://github.com/plevart/concurrent-utils/blob/master/src/si/pele/concurrent/ExpirableFutureTask.java 
>>
>>
>> And an example here:
>>
>> https://github.com/plevart/concurrent-utils/blob/master/src/si/pele/concurrent/test/EFTTest.java 
>>
>>
>>
>> Regards, Peter
>>
>>> On 6/27/2014 3:30 PM, Oleksandr Otenko wrote:
>>>> 1. Each task must know how many dependencies it has.
>>>>
>>>> 2. Each task knows which other tasks depend on it.
>>>>
>>>> 3. Upon finishing a task, decrement the counters of all dependent 
>>>> tasks. Those that reach zero, get scheduled with the TPE.
>>>>
>>>>
>>>> If you don't know which tasks depend on which before executing them 
>>>> (ie you don't have answer for (2)), there is no efficient way of 
>>>> planning their execution.
>>>>
>>>> Alex
>>>>
>>>>
>>>> On 27/06/2014 22:04, DT wrote:
>>>>> if we dispatch tasks based on queuing we would not get guarantees 
>>>>> that the tasks first in line will finish execution first and 
>>>>> futures are done to return results (in general none of the 
>>>>> dispatching mechanism will give me such behavior, can be wrong of 
>>>>> course, sorry if I am mangled callable priorities which I wanted 
>>>>> to use to control execution of the tasks in some order with 
>>>>> overall thread priorities)
>>>>> I think that I need some sort of scheduling constraints what task 
>>>>> should be done first so I can use futures results for dispatching 
>>>>> next set of tasks. I am not sure if for example CompletableFuture 
>>>>> in java 8 meant to be used for such problems. Namely tasks have to 
>>>>> be executed in certain order/deliver results and they can have 
>>>>> dependency/relationships between each other.
>>>>>
>>>>> By the way in what circumstances would you recommend to use TPE 
>>>>> constructor with own ThreadFactory implementation and not to relly 
>>>>> on the default one?
>>>>>
>>>>> DT
>>>>>
>>>>> On 6/27/2014 1:09 PM, David Holmes wrote:
>>>>>> Priority can only really be used for the dispatching/queueing 
>>>>>> mechanism. Once tasks are executing priority is essentially 
>>>>>> meaningless. Thread priorities have no significance in general.
>>>>>> David
>>>>>>
>>>>>>     -----Original Message-----
>>>>>>     *From:* concurrency-interest-bounces at cs.oswego.edu
>>>>>> [mailto:concurrency-interest-bounces at cs.oswego.edu]*On Behalf
>>>>>>     Of *DT
>>>>>>     *Sent:* Saturday, 28 June 2014 1:42 AM
>>>>>>     *To:* viktor ?lang; dholmes at ieee.org
>>>>>>     *Cc:* concurrency-interest
>>>>>>     *Subject:* Re: [concurrency-interest] Custom
>>>>>>     ThreadPoolExecutor implementation to handle priority based
>>>>>>     Callable tasks
>>>>>>
>>>>>>                     We see several scenarios how to approach:
>>>>>>
>>>>>>     1-st approach is to use a PriorityBlockingQueue, provide
>>>>>>     Comparator for Callable tasks to change the order based on the
>>>>>>     Callable priority. Then when we call invokeAll( Collections of
>>>>>>     Callable tasks) on TPE to execute tasks from queue based on
>>>>>>     the priority, 1-st, 2-d, n which means that tasks will be
>>>>>>     executed based on abstract groups (for instance1-st group --
>>>>>>     all tasks with high priority go first according to TPE
>>>>>>     scheduling. tpe.execute(thread) will be called and tasks will
>>>>>>     be assigned to available tpe worker threads.
>>>>>>
>>>>>>     Following this logic tpe does not guarantee for sure that all
>>>>>>     high priority tasks will be executed first. Or this guarantee
>>>>>>     is not enough enforced (maybe there are some policy to enforce
>>>>>>     this)
>>>>>>
>>>>>>     2-d approach is to modify invokeAll() to handle priority
>>>>>>     explicitly which does not look like a clean solution.
>>>>>>
>>>>>>     3-d approach is to have multiple TPEs and each of these tpe
>>>>>>     will handle only one type of priority without mix, for example
>>>>>>     tpe1 will invokeAll(tasks priority 1 only) , tpe2 will
>>>>>>     invokeAll(tasks priority 2 only). Which tpe should be
>>>>>>     instantiated -1^st should follow business logic.
>>>>>>
>>>>>>     Here is an example what we trying to implement. Assuming we
>>>>>>     have different types of tasks which are bounded to get some
>>>>>>     resources using udp/tcp/http/sql. Something like
>>>>>>
>>>>>>     sqlWorker needs to execute SQLs and return results, httpWorker
>>>>>>     type has to handle http calls and return some results, etc.
>>>>>>     the issue is that even if we assign some priorities to these
>>>>>>     workers to perform similar tasks following some business logic
>>>>>>     what data needs to get first within our workflow we would have
>>>>>>     to change priorities dynamically just because same type of
>>>>>>     workers mightfinish execution within different timeslot. So we
>>>>>>     have to wait until all workers are executed (practically tune
>>>>>>     timeslot based on the longest execution time) or drop some
>>>>>>     tasks due to the timing limits. If we follow 3-d approach it
>>>>>>     would be hard to manage TPEs. If we follow 2-d approach it
>>>>>>     would be hard to maintain tasks and guarantee execution group
>>>>>>     by group.And 1-st approach seems does not completely satisfy
>>>>>>     the problem concurrency execution logic.
>>>>>>
>>>>>>     On 6/27/2014 3:12 AM, ?iktor ?lang wrote:
>>>>>>>     "  public abstract void setPriority(int priority);" ?
>>>>>>>     Changing priority on an already submitted task seems strange.
>>>>>>>
>>>>>>>     I agree with David, just use a PriorityBlockingQueue
>>>>>>> [http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html]
>>>>>>>     for the submission queue of the TPE and provide a Comparator
>>>>>>>     that checks for your PriorityCallable and puts
>>>>>>>     non-PriorityCallable-Callables at an appropriate default
>>>>>>>     priority.
>>>>>>>
>>>>>>>
>>>>>>>     On Fri, Jun 27, 2014 at 6:16 AM, David Holmes
>>>>>>>     <davidcholmes at aapt.net.au <mailto:davidcholmes at aapt.net.au>>
>>>>>>>     wrote:
>>>>>>>
>>>>>>>         Can't you just use your PriorityCallable with a custom
>>>>>>>         PriorityQueue and
>>>>>>>         standard ThreadPoolExecutor?
>>>>>>>
>>>>>>>         David
>>>>>>>
>>>>>>>         > -----Original Message-----
>>>>>>>         > From: concurrency-interest-bounces at cs.oswego.edu
>>>>>>> <mailto:concurrency-interest-bounces at cs.oswego.edu>
>>>>>>>         > [mailto:concurrency-interest-bounces at cs.oswego.edu
>>>>>>> <mailto:concurrency-interest-bounces at cs.oswego.edu>]On
>>>>>>>         Behalf Of DT
>>>>>>>         > Sent: Friday, 27 June 2014 2:08 PM
>>>>>>>         > To: concurrency-interest at cs.oswego.edu
>>>>>>> <mailto:concurrency-interest at cs.oswego.edu>
>>>>>>>         > Subject: [concurrency-interest] Custom
>>>>>>>         ThreadPoolExecutor implementation
>>>>>>>         > to handle priority based Callable tasks
>>>>>>>         >
>>>>>>>         >
>>>>>>>         > If we were to control Callable Tasks with Priorities
>>>>>>>         and to manage
>>>>>>>         > execution of those callable tasks through
>>>>>>>         ThreadPoolExecutor we would
>>>>>>>         > have to implement custom ThreadPoolExecutor and
>>>>>>>         PriorityCallable (see a
>>>>>>>         > suggestion below). What other ways to accomplish this
>>>>>>>         do you see ?
>>>>>>>         >
>>>>>>>         > public interface PriorityCallable extends
>>>>>>>         Callable<Object> {
>>>>>>>         >      public abstract void setPriority(int priority);
>>>>>>>         >      ...
>>>>>>>         > }
>>>>>>>         >
>>>>>>>         > public class CustomThreadPoolExecutor extends
>>>>>>>         ThreadPoolExecutor {
>>>>>>>         > ...
>>>>>>>         > ...
>>>>>>>         > public <T> List<Future<T>> customInvokeAll(
>>>>>>>         >                  Collection<? extends Callable<T>>
>>>>>>>         tasks, long timeout,
>>>>>>>         > TimeUnit unit) // pass PriorityCallable
>>>>>>>         >                  throws InterruptedException {
>>>>>>>         >              if (tasks == null || unit == null)
>>>>>>>         >                  throw new NullPointerException();
>>>>>>>         >              long nanos = unit.toNanos(timeout);
>>>>>>>         >  List<Future<T>> futures = new
>>>>>>>         > ArrayList<Future<T>>(tasks.size());
>>>>>>>         >              boolean done = false;
>>>>>>>         >              try {
>>>>>>>         >                  // handle Priority based Callable
>>>>>>>         tasks here, though
>>>>>>>         > can expect timing issues
>>>>>>>         >                  // custom logic to group Callable
>>>>>>>         tasks , if (priority
>>>>>>>         > == 1, 2, 3 create group of callables, etc )
>>>>>>>         >                  for (Callable<T> t : tasks)
>>>>>>>         >  futures.add(newTaskFor(t));
>>>>>>>         >
>>>>>>>         >                  long lastTime = System.nanoTime();
>>>>>>>         >
>>>>>>>         >                  // Interleave time checks and calls to
>>>>>>>         execute in case
>>>>>>>         >                  // executor doesn't have any/much
>>>>>>>         parallelism.
>>>>>>>         >  Iterator<Future<T>> it = futures.iterator();
>>>>>>>         >                  while (it.hasNext()) {
>>>>>>>         >  execute((Runnable)(it.next())); // should we expect
>>>>>>>         > futures get executed within the same timeframe for
>>>>>>>         Callables with
>>>>>>>         > different priorities
>>>>>>>         >                      long now = System.nanoTime();
>>>>>>>         >                      nanos -= now - lastTime;
>>>>>>>         >                      lastTime = now;
>>>>>>>         >                      if (nanos <= 0)
>>>>>>>         >                          return futures;
>>>>>>>         >                  }
>>>>>>>         >
>>>>>>>         >                  for (Future<T> f : futures) {
>>>>>>>         >                      if (!f.isDone()) {
>>>>>>>         >                          if (nanos <= 0)
>>>>>>>         >                              return futures;
>>>>>>>         >                          try {
>>>>>>>         >  f.get(nanos, TimeUnit.NANOSECONDS);
>>>>>>>         >                          } catch (CancellationException
>>>>>>>         ignore) {
>>>>>>>         >                          } catch (ExecutionException
>>>>>>>         ignore) { // Should
>>>>>>>         > we cancel tasks based on the priority as well?
>>>>>>>         >                          } catch (TimeoutException toe) {
>>>>>>>         >                              return futures;
>>>>>>>         >                          }
>>>>>>>         >                          long now = System.nanoTime();
>>>>>>>         >                          nanos -= now - lastTime;
>>>>>>>         >                          lastTime = now;
>>>>>>>         >                      }
>>>>>>>         >                  }
>>>>>>>         >                  done = true;
>>>>>>>         >                  return futures;
>>>>>>>         >              } finally {
>>>>>>>         >                  if (!done)
>>>>>>>         >                      for (Future<T> f : futures)
>>>>>>>         >  f.cancel(true);
>>>>>>>         >              }
>>>>>>>         >          }
>>>>>>>         > ...
>>>>>>>         > }
>>>>>>>         >
>>>>>>>         > Thank you,
>>>>>>>         > dt
>>>>>>>         > http://www.flyingtroika.com/
>>>>>>>         >
>>>>>>>         > _______________________________________________
>>>>>>>         > Concurrency-interest mailing list
>>>>>>>         > Concurrency-interest at cs.oswego.edu
>>>>>>> <mailto:Concurrency-interest at cs.oswego.edu>
>>>>>>>         > 
>>>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>>>
>>>>>>> _______________________________________________
>>>>>>>         Concurrency-interest mailing list
>>>>>>>         Concurrency-interest at cs.oswego.edu
>>>>>>> <mailto:Concurrency-interest at cs.oswego.edu>
>>>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>     --     Cheers,
>>>>>>>     ?
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> _______________________________________________
>>>>> Concurrency-interest mailing list
>>>>> Concurrency-interest at cs.oswego.edu
>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>
>>>
>>>
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.oswego.edu
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>
>



More information about the Concurrency-interest mailing list