[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

DT dt at flyingtroika.com
Fri Jun 27 11:41:43 EDT 2014


                 We see several scenarios how to approach:

1-st approach is to use a PriorityBlockingQueue, provide Comparator for 
Callable tasks to change the order based on the Callable priority. Then 
when we call invokeAll( Collections of Callable tasks) on TPE to execute 
tasks from queue based on the priority, 1-st, 2-d, n which means that 
tasks will be executed based on abstract groups (for instance1-st group 
– all tasks with high priority go first according to TPE scheduling. 
tpe.execute(thread) will be called and tasks will be assigned to 
available tpe worker threads.

Following this logic tpe does not guarantee for sure that all high 
priority tasks will be executed first. Or this guarantee is not enough 
enforced (maybe there are some policy to enforce this)

2-d approach is to modify invokeAll() to handle priority explicitly 
which does not look like a clean solution.

3-d approach is to have multiple TPEs and each of these tpe will handle 
only one type of priority without mix, for example tpe1 will 
invokeAll(tasks priority 1 only) , tpe2 will invokeAll(tasks priority 2 
only). Which tpe should be instantiated -1^st should follow business logic.

Here is an example what we trying to implement. Assuming we have 
different types of tasks which are bounded to get some resources using 
udp/tcp/http/sql. Something like

sqlWorker needs to execute SQLs and return results, httpWorker type has 
to handle http calls and return some results, etc. the issue is that 
even if we assign some priorities to these workers to perform similar 
tasks following some business logic what data needs to get first within 
our workflow we would have to change priorities dynamically just because 
same type of workers mightfinish execution within different timeslot. So 
we have to wait until all workers are executed (practically tune 
timeslot based on the longest execution time) or drop some tasks due to 
the timing limits. If we follow 3-d approach it would be hard to manage 
TPEs. If we follow 2-d approach it would be hard to maintain tasks and 
guarantee execution group by group.And 1-st approach seems does not 
completely satisfy the problem concurrency execution logic.

On 6/27/2014 3:12 AM, √iktor Ҡlang wrote:
> "public abstract void setPriority(int priority);" ? Changing priority 
> on an already submitted task seems strange.
>
> I agree with David, just use a PriorityBlockingQueue 
> [http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html] 
> for the submission queue of the TPE and provide a Comparator that 
> checks for your PriorityCallable and puts 
> non-PriorityCallable-Callables at an appropriate default priority.
>
>
> On Fri, Jun 27, 2014 at 6:16 AM, David Holmes 
> <davidcholmes at aapt.net.au <mailto:davidcholmes at aapt.net.au>> wrote:
>
>     Can't you just use your PriorityCallable with a custom
>     PriorityQueue and
>     standard ThreadPoolExecutor?
>
>     David
>
>     > -----Original Message-----
>     > From: concurrency-interest-bounces at cs.oswego.edu
>     <mailto:concurrency-interest-bounces at cs.oswego.edu>
>     > [mailto:concurrency-interest-bounces at cs.oswego.edu
>     <mailto:concurrency-interest-bounces at cs.oswego.edu>]On Behalf Of DT
>     > Sent: Friday, 27 June 2014 2:08 PM
>     > To: concurrency-interest at cs.oswego.edu
>     <mailto:concurrency-interest at cs.oswego.edu>
>     > Subject: [concurrency-interest] Custom ThreadPoolExecutor
>     implementation
>     > to handle priority based Callable tasks
>     >
>     >
>     > If we were to control Callable Tasks with Priorities and to manage
>     > execution of those callable tasks through ThreadPoolExecutor we
>     would
>     > have to implement custom ThreadPoolExecutor and PriorityCallable
>     (see a
>     > suggestion below). What other ways to accomplish this do you see ?
>     >
>     > public interface PriorityCallable extends Callable<Object> {
>     >      public abstract void setPriority(int priority);
>     >      ...
>     > }
>     >
>     > public class CustomThreadPoolExecutor  extends ThreadPoolExecutor {
>     > ...
>     > ...
>     > public <T> List<Future<T>> customInvokeAll(
>     >                  Collection<? extends Callable<T>> tasks, long
>     timeout,
>     > TimeUnit unit) // pass PriorityCallable
>     >                  throws InterruptedException {
>     >              if (tasks == null || unit == null)
>     >                  throw new NullPointerException();
>     >              long nanos = unit.toNanos(timeout);
>     >              List<Future<T>> futures = new
>     > ArrayList<Future<T>>(tasks.size());
>     >              boolean done = false;
>     >              try {
>     >                  // handle Priority based Callable tasks here,
>     though
>     > can expect timing issues
>     >                  // custom logic to group Callable tasks , if
>     (priority
>     > == 1, 2, 3 create group of callables, etc )
>     >                  for (Callable<T> t : tasks)
>     >                      futures.add(newTaskFor(t));
>     >
>     >                  long lastTime = System.nanoTime();
>     >
>     >                  // Interleave time checks and calls to execute
>     in case
>     >                  // executor doesn't have any/much parallelism.
>     >                  Iterator<Future<T>> it = futures.iterator();
>     >                  while (it.hasNext()) {
>     >  execute((Runnable)(it.next())); // should we expect
>     > futures get executed within the same timeframe for Callables with
>     > different priorities
>     >                      long now = System.nanoTime();
>     >                      nanos -= now - lastTime;
>     >                      lastTime = now;
>     >                      if (nanos <= 0)
>     >                          return futures;
>     >                  }
>     >
>     >                  for (Future<T> f : futures) {
>     >                      if (!f.isDone()) {
>     >                          if (nanos <= 0)
>     >                              return futures;
>     >                          try {
>     >                              f.get(nanos, TimeUnit.NANOSECONDS);
>     >                          } catch (CancellationException ignore) {
>     >                          } catch (ExecutionException ignore) {
>     // Should
>     > we cancel tasks based on the priority as well?
>     >                          } catch (TimeoutException toe) {
>     >                              return futures;
>     >                          }
>     >                          long now = System.nanoTime();
>     >                          nanos -= now - lastTime;
>     >                          lastTime = now;
>     >                      }
>     >                  }
>     >                  done = true;
>     >                  return futures;
>     >              } finally {
>     >                  if (!done)
>     >                      for (Future<T> f : futures)
>     >                          f.cancel(true);
>     >              }
>     >          }
>     > ...
>     > }
>     >
>     > Thank you,
>     > dt
>     > http://www.flyingtroika.com/
>     >
>     > _______________________________________________
>     > Concurrency-interest mailing list
>     > Concurrency-interest at cs.oswego.edu
>     <mailto:Concurrency-interest at cs.oswego.edu>
>     > http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>     _______________________________________________
>     Concurrency-interest mailing list
>     Concurrency-interest at cs.oswego.edu
>     <mailto:Concurrency-interest at cs.oswego.edu>
>     http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>
>
>
> -- 
> Cheers,
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140627/309cbe9b/attachment-0001.html>


More information about the Concurrency-interest mailing list