[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

DT dt at flyingtroika.com
Fri Jun 27 00:08:26 EDT 2014


If we were to control Callable Tasks with Priorities and to manage 
execution of those callable tasks through ThreadPoolExecutor we would 
have to implement custom ThreadPoolExecutor and PriorityCallable (see a 
suggestion below). What other ways to accomplish this do you see ?

public interface PriorityCallable extends Callable<Object> {
     public abstract void setPriority(int priority);
     ...
}

public class CustomThreadPoolExecutor  extends ThreadPoolExecutor {
...
...
public <T> List<Future<T>> customInvokeAll(
                 Collection<? extends Callable<T>> tasks, long timeout, 
TimeUnit unit) // pass PriorityCallable
                 throws InterruptedException {
             if (tasks == null || unit == null)
                 throw new NullPointerException();
             long nanos = unit.toNanos(timeout);
             List<Future<T>> futures = new 
ArrayList<Future<T>>(tasks.size());
             boolean done = false;
             try {
                 // handle Priority based Callable tasks here, though 
can expect timing issues
                 // custom logic to group Callable tasks , if (priority 
== 1, 2, 3 create group of callables, etc )
                 for (Callable<T> t : tasks)
                     futures.add(newTaskFor(t));

                 long lastTime = System.nanoTime();

                 // Interleave time checks and calls to execute in case
                 // executor doesn't have any/much parallelism.
                 Iterator<Future<T>> it = futures.iterator();
                 while (it.hasNext()) {
                     execute((Runnable)(it.next())); // should we expect 
futures get executed within the same timeframe for Callables with 
different priorities
                     long now = System.nanoTime();
                     nanos -= now - lastTime;
                     lastTime = now;
                     if (nanos <= 0)
                         return futures;
                 }

                 for (Future<T> f : futures) {
                     if (!f.isDone()) {
                         if (nanos <= 0)
                             return futures;
                         try {
                             f.get(nanos, TimeUnit.NANOSECONDS);
                         } catch (CancellationException ignore) {
                         } catch (ExecutionException ignore) { // Should 
we cancel tasks based on the priority as well?
                         } catch (TimeoutException toe) {
                             return futures;
                         }
                         long now = System.nanoTime();
                         nanos -= now - lastTime;
                         lastTime = now;
                     }
                 }
                 done = true;
                 return futures;
             } finally {
                 if (!done)
                     for (Future<T> f : futures)
                         f.cancel(true);
             }
         }
...
}

Thank you,
dt
http://www.flyingtroika.com/



More information about the Concurrency-interest mailing list