[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

David Holmes davidcholmes at aapt.net.au
Fri Jun 27 16:09:56 EDT 2014


Priority can only really be used for the dispatching/queueing mechanism. Once tasks are executing priority is essentially meaningless. Thread priorities have no significance in general.

David
  -----Original Message-----
  From: concurrency-interest-bounces at cs.oswego.edu [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of DT
  Sent: Saturday, 28 June 2014 1:42 AM
  To: viktor ?lang; dholmes at ieee.org
  Cc: concurrency-interest
  Subject: Re: [concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks


                  We see several scenarios how to approach:

  1-st approach is to use a PriorityBlockingQueue, provide Comparator for Callable tasks to change the order based on the Callable priority. Then when we call invokeAll( Collections of Callable tasks) on TPE to execute tasks from queue based on the priority, 1-st, 2-d, n which means that tasks will be executed based on abstract groups (for instance  1-st group – all tasks with high priority go first according to TPE scheduling. tpe.execute(thread) will be called and tasks will be assigned to available tpe worker threads. 

  Following this logic tpe does not guarantee for sure that all high priority tasks will be executed first. Or this guarantee is not enough enforced (maybe there are some policy to enforce this)



  2-d approach is to modify invokeAll() to handle priority explicitly which does not look like a clean solution. 



  3-d approach is to have multiple TPEs and each of these tpe will handle only one type of priority without mix, for example tpe1 will invokeAll(tasks priority 1 only) , tpe2 will invokeAll(tasks priority 2 only). Which tpe should be instantiated -1st should follow business logic.





  Here is an example what we trying to implement. Assuming we have different types of tasks which are bounded to get some resources using udp/tcp/http/sql. Something like 

  sqlWorker needs to execute SQLs and return results, httpWorker type has to handle http calls and return some results, etc. the issue is that even if we assign some priorities to these workers to perform similar tasks following some business logic what data needs to get first within our workflow we would have to change priorities dynamically just because same type of workers might  finish execution within different timeslot. So we have to wait until all workers are executed (practically tune timeslot based on the longest execution time) or drop some tasks due to the timing limits. If we follow 3-d approach it would be hard to manage TPEs. If we follow 2-d approach it would be hard to maintain tasks and guarantee execution group by group.  And 1-st approach seems does not completely satisfy the problem concurrency execution logic.

  On 6/27/2014 3:12 AM, √iktor Ҡlang wrote:

    "  public abstract void setPriority(int priority);" ? Changing priority on an already submitted task seems strange.

    I agree with David, just use a PriorityBlockingQueue [http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html] for the submission queue of the TPE and provide a Comparator that checks for your PriorityCallable and puts non-PriorityCallable-Callables at an appropriate default priority.



    On Fri, Jun 27, 2014 at 6:16 AM, David Holmes <davidcholmes at aapt.net.au> wrote:

      Can't you just use your PriorityCallable with a custom PriorityQueue and
      standard ThreadPoolExecutor?

      David


      > -----Original Message-----
      > From: concurrency-interest-bounces at cs.oswego.edu
      > [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of DT
      > Sent: Friday, 27 June 2014 2:08 PM
      > To: concurrency-interest at cs.oswego.edu
      > Subject: [concurrency-interest] Custom ThreadPoolExecutor implementation
      > to handle priority based Callable tasks
      >
      >
      > If we were to control Callable Tasks with Priorities and to manage
      > execution of those callable tasks through ThreadPoolExecutor we would
      > have to implement custom ThreadPoolExecutor and PriorityCallable (see a
      > suggestion below). What other ways to accomplish this do you see ?
      >
      > public interface PriorityCallable extends Callable<Object> {
      >      public abstract void setPriority(int priority);
      >      ...
      > }
      >
      > public class CustomThreadPoolExecutor  extends ThreadPoolExecutor {
      > ...
      > ...
      > public <T> List<Future<T>> customInvokeAll(
      >                  Collection<? extends Callable<T>> tasks, long timeout,
      > TimeUnit unit) // pass PriorityCallable
      >                  throws InterruptedException {
      >              if (tasks == null || unit == null)
      >                  throw new NullPointerException();
      >              long nanos = unit.toNanos(timeout);
      >              List<Future<T>> futures = new
      > ArrayList<Future<T>>(tasks.size());
      >              boolean done = false;
      >              try {
      >                  // handle Priority based Callable tasks here, though
      > can expect timing issues
      >                  // custom logic to group Callable tasks , if (priority
      > == 1, 2, 3 create group of callables, etc )
      >                  for (Callable<T> t : tasks)
      >                      futures.add(newTaskFor(t));
      >
      >                  long lastTime = System.nanoTime();
      >
      >                  // Interleave time checks and calls to execute in case
      >                  // executor doesn't have any/much parallelism.
      >                  Iterator<Future<T>> it = futures.iterator();
      >                  while (it.hasNext()) {
      >                      execute((Runnable)(it.next())); // should we expect
      > futures get executed within the same timeframe for Callables with
      > different priorities
      >                      long now = System.nanoTime();
      >                      nanos -= now - lastTime;
      >                      lastTime = now;
      >                      if (nanos <= 0)
      >                          return futures;
      >                  }
      >
      >                  for (Future<T> f : futures) {
      >                      if (!f.isDone()) {
      >                          if (nanos <= 0)
      >                              return futures;
      >                          try {
      >                              f.get(nanos, TimeUnit.NANOSECONDS);
      >                          } catch (CancellationException ignore) {
      >                          } catch (ExecutionException ignore) { // Should
      > we cancel tasks based on the priority as well?
      >                          } catch (TimeoutException toe) {
      >                              return futures;
      >                          }
      >                          long now = System.nanoTime();
      >                          nanos -= now - lastTime;
      >                          lastTime = now;
      >                      }
      >                  }
      >                  done = true;
      >                  return futures;
      >              } finally {
      >                  if (!done)
      >                      for (Future<T> f : futures)
      >                          f.cancel(true);
      >              }
      >          }
      > ...
      > }
      >
      > Thank you,
      > dt
      > http://www.flyingtroika.com/
      >
      > _______________________________________________
      > Concurrency-interest mailing list
      > Concurrency-interest at cs.oswego.edu
      > http://cs.oswego.edu/mailman/listinfo/concurrency-interest

      _______________________________________________
      Concurrency-interest mailing list
      Concurrency-interest at cs.oswego.edu
      http://cs.oswego.edu/mailman/listinfo/concurrency-interest






    -- 

    Cheers,
    √

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140628/7b4b2838/attachment-0001.html>


More information about the Concurrency-interest mailing list