[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

DT dt at flyingtroika.com
Tue Jul 1 00:23:16 EDT 2014


Peter,

Is this code (ExpirableFutureTask ) based on java 7 or it requires 8? I 
need to perform some experiments with CompletableFutures, its for sure. 
We are using java 7 and moving to 8 can take some time. I am a slightly 
conservative in terms of moving to the next version.

DT

On 6/29/2014 4:06 PM, Peter Levart wrote:
>
> On 06/28/2014 05:49 AM, DT wrote:
>> I believe that
>> 1 -  should be ok to accomplish
>> 2 -  ok to satisfy or at least get very close to it as we know all 
>> dependencies at the beginning
>> 3 -  I like idea with counters: simple, straight and easy to follow
>>
>> However, the issue is that all the tasks (with dependency, etc) have 
>> to finish execution within the same time slot as a whole. We have to 
>> enforce some sort of timing constraint. Same task can take different 
>> time to execute (we can apply some statistical methods to get the 
>> average for all the tasks based on the historical execution time).
>>
>> Would it be better to apply some sort of cyclic barriers or counters?
>>
>> Thanks,
>> DT
>
> Do you know the time slot in advance, when you submit the 1st task in 
> a group of dependent tasks? If so, you could combine the Oleksandr's 
> suggestion (which is how JDK8 CompletableFuture works) with the notion 
> of "deadline" that you assign to a group. Deadline can serve two purposes:
> - expires the tree of unfinished dependent tasks in a group when 
> deadline is reached, skipping their execution
> - can be used to prioritize de-queueing of tasks by TPE that will 
> expire earlier (using PriorityBlockingQueue)
>
> See here:
>
> https://github.com/plevart/concurrent-utils/blob/master/src/si/pele/concurrent/ExpirableFutureTask.java
>
> And an example here:
>
> https://github.com/plevart/concurrent-utils/blob/master/src/si/pele/concurrent/test/EFTTest.java
>
>
> Regards, Peter
>
>> On 6/27/2014 3:30 PM, Oleksandr Otenko wrote:
>>> 1. Each task must know how many dependencies it has.
>>>
>>> 2. Each task knows which other tasks depend on it.
>>>
>>> 3. Upon finishing a task, decrement the counters of all dependent 
>>> tasks. Those that reach zero, get scheduled with the TPE.
>>>
>>>
>>> If you don't know which tasks depend on which before executing them 
>>> (ie you don't have answer for (2)), there is no efficient way of 
>>> planning their execution.
>>>
>>> Alex
>>>
>>>
>>> On 27/06/2014 22:04, DT wrote:
>>>> if we dispatch tasks based on queuing we would not get guarantees 
>>>> that the tasks first in line will finish execution first and 
>>>> futures are done to return results (in general none of the 
>>>> dispatching mechanism will give me such behavior, can be wrong of 
>>>> course, sorry if I am mangled callable priorities which I wanted to 
>>>> use to control execution of the tasks in some order with overall 
>>>> thread priorities)
>>>> I think that I need some sort of scheduling constraints what task 
>>>> should be done first so I can use futures results for dispatching 
>>>> next set of tasks. I am not sure if for example CompletableFuture 
>>>> in java 8 meant to be used for such problems. Namely tasks have to 
>>>> be executed in certain order/deliver results and they can have 
>>>> dependency/relationships between each other.
>>>>
>>>> By the way in what circumstances would you recommend to use TPE 
>>>> constructor with own ThreadFactory implementation and not to relly 
>>>> on the default one?
>>>>
>>>> DT
>>>>
>>>> On 6/27/2014 1:09 PM, David Holmes wrote:
>>>>> Priority can only really be used for the dispatching/queueing 
>>>>> mechanism. Once tasks are executing priority is essentially 
>>>>> meaningless. Thread priorities have no significance in general.
>>>>> David
>>>>>
>>>>>     -----Original Message-----
>>>>>     *From:* concurrency-interest-bounces at cs.oswego.edu
>>>>>     [mailto:concurrency-interest-bounces at cs.oswego.edu]*On Behalf
>>>>>     Of *DT
>>>>>     *Sent:* Saturday, 28 June 2014 1:42 AM
>>>>>     *To:* viktor ?lang; dholmes at ieee.org
>>>>>     *Cc:* concurrency-interest
>>>>>     *Subject:* Re: [concurrency-interest] Custom
>>>>>     ThreadPoolExecutor implementation to handle priority based
>>>>>     Callable tasks
>>>>>
>>>>>                     We see several scenarios how to approach:
>>>>>
>>>>>     1-st approach is to use a PriorityBlockingQueue, provide
>>>>>     Comparator for Callable tasks to change the order based on the
>>>>>     Callable priority. Then when we call invokeAll( Collections of
>>>>>     Callable tasks) on TPE to execute tasks from queue based on
>>>>>     the priority, 1-st, 2-d, n which means that tasks will be
>>>>>     executed based on abstract groups (for instance1-st group --
>>>>>     all tasks with high priority go first according to TPE
>>>>>     scheduling. tpe.execute(thread) will be called and tasks will
>>>>>     be assigned to available tpe worker threads.
>>>>>
>>>>>     Following this logic tpe does not guarantee for sure that all
>>>>>     high priority tasks will be executed first. Or this guarantee
>>>>>     is not enough enforced (maybe there are some policy to enforce
>>>>>     this)
>>>>>
>>>>>     2-d approach is to modify invokeAll() to handle priority
>>>>>     explicitly which does not look like a clean solution.
>>>>>
>>>>>     3-d approach is to have multiple TPEs and each of these tpe
>>>>>     will handle only one type of priority without mix, for example
>>>>>     tpe1 will invokeAll(tasks priority 1 only) , tpe2 will
>>>>>     invokeAll(tasks priority 2 only). Which tpe should be
>>>>>     instantiated -1^st should follow business logic.
>>>>>
>>>>>     Here is an example what we trying to implement. Assuming we
>>>>>     have different types of tasks which are bounded to get some
>>>>>     resources using udp/tcp/http/sql. Something like
>>>>>
>>>>>     sqlWorker needs to execute SQLs and return results, httpWorker
>>>>>     type has to handle http calls and return some results, etc.
>>>>>     the issue is that even if we assign some priorities to these
>>>>>     workers to perform similar tasks following some business logic
>>>>>     what data needs to get first within our workflow we would have
>>>>>     to change priorities dynamically just because same type of
>>>>>     workers mightfinish execution within different timeslot. So we
>>>>>     have to wait until all workers are executed (practically tune
>>>>>     timeslot based on the longest execution time) or drop some
>>>>>     tasks due to the timing limits. If we follow 3-d approach it
>>>>>     would be hard to manage TPEs. If we follow 2-d approach it
>>>>>     would be hard to maintain tasks and guarantee execution group
>>>>>     by group.And 1-st approach seems does not completely satisfy
>>>>>     the problem concurrency execution logic.
>>>>>
>>>>>     On 6/27/2014 3:12 AM, ?iktor ?lang wrote:
>>>>>>     "  public abstract void setPriority(int priority);" ?
>>>>>>     Changing priority on an already submitted task seems strange.
>>>>>>
>>>>>>     I agree with David, just use a PriorityBlockingQueue
>>>>>>     [http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html]
>>>>>>     for the submission queue of the TPE and provide a Comparator
>>>>>>     that checks for your PriorityCallable and puts
>>>>>>     non-PriorityCallable-Callables at an appropriate default
>>>>>>     priority.
>>>>>>
>>>>>>
>>>>>>     On Fri, Jun 27, 2014 at 6:16 AM, David Holmes
>>>>>>     <davidcholmes at aapt.net.au <mailto:davidcholmes at aapt.net.au>>
>>>>>>     wrote:
>>>>>>
>>>>>>         Can't you just use your PriorityCallable with a custom
>>>>>>         PriorityQueue and
>>>>>>         standard ThreadPoolExecutor?
>>>>>>
>>>>>>         David
>>>>>>
>>>>>>         > -----Original Message-----
>>>>>>         > From: concurrency-interest-bounces at cs.oswego.edu
>>>>>>         <mailto:concurrency-interest-bounces at cs.oswego.edu>
>>>>>>         > [mailto:concurrency-interest-bounces at cs.oswego.edu
>>>>>>         <mailto:concurrency-interest-bounces at cs.oswego.edu>]On
>>>>>>         Behalf Of DT
>>>>>>         > Sent: Friday, 27 June 2014 2:08 PM
>>>>>>         > To: concurrency-interest at cs.oswego.edu
>>>>>>         <mailto:concurrency-interest at cs.oswego.edu>
>>>>>>         > Subject: [concurrency-interest] Custom
>>>>>>         ThreadPoolExecutor implementation
>>>>>>         > to handle priority based Callable tasks
>>>>>>         >
>>>>>>         >
>>>>>>         > If we were to control Callable Tasks with Priorities
>>>>>>         and to manage
>>>>>>         > execution of those callable tasks through
>>>>>>         ThreadPoolExecutor we would
>>>>>>         > have to implement custom ThreadPoolExecutor and
>>>>>>         PriorityCallable (see a
>>>>>>         > suggestion below). What other ways to accomplish this
>>>>>>         do you see ?
>>>>>>         >
>>>>>>         > public interface PriorityCallable extends
>>>>>>         Callable<Object> {
>>>>>>         >      public abstract void setPriority(int priority);
>>>>>>         >      ...
>>>>>>         > }
>>>>>>         >
>>>>>>         > public class CustomThreadPoolExecutor  extends
>>>>>>         ThreadPoolExecutor {
>>>>>>         > ...
>>>>>>         > ...
>>>>>>         > public <T> List<Future<T>> customInvokeAll(
>>>>>>         >                  Collection<? extends Callable<T>>
>>>>>>         tasks, long timeout,
>>>>>>         > TimeUnit unit) // pass PriorityCallable
>>>>>>         >                  throws InterruptedException {
>>>>>>         >              if (tasks == null || unit == null)
>>>>>>         >                  throw new NullPointerException();
>>>>>>         >              long nanos = unit.toNanos(timeout);
>>>>>>         >  List<Future<T>> futures = new
>>>>>>         > ArrayList<Future<T>>(tasks.size());
>>>>>>         >              boolean done = false;
>>>>>>         >              try {
>>>>>>         >                  // handle Priority based Callable
>>>>>>         tasks here, though
>>>>>>         > can expect timing issues
>>>>>>         >                  // custom logic to group Callable
>>>>>>         tasks , if (priority
>>>>>>         > == 1, 2, 3 create group of callables, etc )
>>>>>>         >                  for (Callable<T> t : tasks)
>>>>>>         >  futures.add(newTaskFor(t));
>>>>>>         >
>>>>>>         >                  long lastTime = System.nanoTime();
>>>>>>         >
>>>>>>         >                  // Interleave time checks and calls to
>>>>>>         execute in case
>>>>>>         >                  // executor doesn't have any/much
>>>>>>         parallelism.
>>>>>>         >  Iterator<Future<T>> it = futures.iterator();
>>>>>>         >                  while (it.hasNext()) {
>>>>>>         >  execute((Runnable)(it.next())); // should we expect
>>>>>>         > futures get executed within the same timeframe for
>>>>>>         Callables with
>>>>>>         > different priorities
>>>>>>         >                      long now = System.nanoTime();
>>>>>>         >                      nanos -= now - lastTime;
>>>>>>         >                      lastTime = now;
>>>>>>         >                      if (nanos <= 0)
>>>>>>         >                          return futures;
>>>>>>         >                  }
>>>>>>         >
>>>>>>         >                  for (Future<T> f : futures) {
>>>>>>         >                      if (!f.isDone()) {
>>>>>>         >                          if (nanos <= 0)
>>>>>>         >                              return futures;
>>>>>>         >                          try {
>>>>>>         >  f.get(nanos, TimeUnit.NANOSECONDS);
>>>>>>         >                          } catch (CancellationException
>>>>>>         ignore) {
>>>>>>         >                          } catch (ExecutionException
>>>>>>         ignore) { // Should
>>>>>>         > we cancel tasks based on the priority as well?
>>>>>>         >                          } catch (TimeoutException toe) {
>>>>>>         >                              return futures;
>>>>>>         >                          }
>>>>>>         >                          long now = System.nanoTime();
>>>>>>         >                          nanos -= now - lastTime;
>>>>>>         >                          lastTime = now;
>>>>>>         >                      }
>>>>>>         >                  }
>>>>>>         >                  done = true;
>>>>>>         >                  return futures;
>>>>>>         >              } finally {
>>>>>>         >                  if (!done)
>>>>>>         >                      for (Future<T> f : futures)
>>>>>>         >  f.cancel(true);
>>>>>>         >              }
>>>>>>         >          }
>>>>>>         > ...
>>>>>>         > }
>>>>>>         >
>>>>>>         > Thank you,
>>>>>>         > dt
>>>>>>         > http://www.flyingtroika.com/
>>>>>>         >
>>>>>>         > _______________________________________________
>>>>>>         > Concurrency-interest mailing list
>>>>>>         > Concurrency-interest at cs.oswego.edu
>>>>>>         <mailto:Concurrency-interest at cs.oswego.edu>
>>>>>>         > http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>>
>>>>>>         _______________________________________________
>>>>>>         Concurrency-interest mailing list
>>>>>>         Concurrency-interest at cs.oswego.edu
>>>>>>         <mailto:Concurrency-interest at cs.oswego.edu>
>>>>>>         http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>     -- 
>>>>>>     Cheers,
>>>>>>     ?
>>>>>
>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> Concurrency-interest mailing list
>>>> Concurrency-interest at cs.oswego.edu
>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>
>>
>>
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140630/4dc15d1d/attachment-0001.html>


More information about the Concurrency-interest mailing list