[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

√iktor Ҡlang viktor.klang at gmail.com
Fri Jun 27 06:12:04 EDT 2014


"  public abstract void setPriority(int priority);" ? Changing priority on
an already submitted task seems strange.

I agree with David, just use a PriorityBlockingQueue [
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/PriorityBlockingQueue.html]
for the submission queue of the TPE and provide a Comparator that checks
for your PriorityCallable and puts non-PriorityCallable-Callables at an
appropriate default priority.


On Fri, Jun 27, 2014 at 6:16 AM, David Holmes <davidcholmes at aapt.net.au>
wrote:

> Can't you just use your PriorityCallable with a custom PriorityQueue and
> standard ThreadPoolExecutor?
>
> David
>
> > -----Original Message-----
> > From: concurrency-interest-bounces at cs.oswego.edu
> > [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of DT
> > Sent: Friday, 27 June 2014 2:08 PM
> > To: concurrency-interest at cs.oswego.edu
> > Subject: [concurrency-interest] Custom ThreadPoolExecutor implementation
> > to handle priority based Callable tasks
> >
> >
> > If we were to control Callable Tasks with Priorities and to manage
> > execution of those callable tasks through ThreadPoolExecutor we would
> > have to implement custom ThreadPoolExecutor and PriorityCallable (see a
> > suggestion below). What other ways to accomplish this do you see ?
> >
> > public interface PriorityCallable extends Callable<Object> {
> >      public abstract void setPriority(int priority);
> >      ...
> > }
> >
> > public class CustomThreadPoolExecutor  extends ThreadPoolExecutor {
> > ...
> > ...
> > public <T> List<Future<T>> customInvokeAll(
> >                  Collection<? extends Callable<T>> tasks, long timeout,
> > TimeUnit unit) // pass PriorityCallable
> >                  throws InterruptedException {
> >              if (tasks == null || unit == null)
> >                  throw new NullPointerException();
> >              long nanos = unit.toNanos(timeout);
> >              List<Future<T>> futures = new
> > ArrayList<Future<T>>(tasks.size());
> >              boolean done = false;
> >              try {
> >                  // handle Priority based Callable tasks here, though
> > can expect timing issues
> >                  // custom logic to group Callable tasks , if (priority
> > == 1, 2, 3 create group of callables, etc )
> >                  for (Callable<T> t : tasks)
> >                      futures.add(newTaskFor(t));
> >
> >                  long lastTime = System.nanoTime();
> >
> >                  // Interleave time checks and calls to execute in case
> >                  // executor doesn't have any/much parallelism.
> >                  Iterator<Future<T>> it = futures.iterator();
> >                  while (it.hasNext()) {
> >                      execute((Runnable)(it.next())); // should we expect
> > futures get executed within the same timeframe for Callables with
> > different priorities
> >                      long now = System.nanoTime();
> >                      nanos -= now - lastTime;
> >                      lastTime = now;
> >                      if (nanos <= 0)
> >                          return futures;
> >                  }
> >
> >                  for (Future<T> f : futures) {
> >                      if (!f.isDone()) {
> >                          if (nanos <= 0)
> >                              return futures;
> >                          try {
> >                              f.get(nanos, TimeUnit.NANOSECONDS);
> >                          } catch (CancellationException ignore) {
> >                          } catch (ExecutionException ignore) { // Should
> > we cancel tasks based on the priority as well?
> >                          } catch (TimeoutException toe) {
> >                              return futures;
> >                          }
> >                          long now = System.nanoTime();
> >                          nanos -= now - lastTime;
> >                          lastTime = now;
> >                      }
> >                  }
> >                  done = true;
> >                  return futures;
> >              } finally {
> >                  if (!done)
> >                      for (Future<T> f : futures)
> >                          f.cancel(true);
> >              }
> >          }
> > ...
> > }
> >
> > Thank you,
> > dt
> > http://www.flyingtroika.com/
> >
> > _______________________________________________
> > Concurrency-interest mailing list
> > Concurrency-interest at cs.oswego.edu
> > http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>



-- 
Cheers,
√
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140627/f59ed7d8/attachment-0001.html>


More information about the Concurrency-interest mailing list