[concurrency-interest] Custom ThreadPoolExecutor implementation to handle priority based Callable tasks

David Holmes davidcholmes at aapt.net.au
Fri Jun 27 00:16:15 EDT 2014


Can't you just use your PriorityCallable with a custom PriorityQueue and
standard ThreadPoolExecutor?

David

> -----Original Message-----
> From: concurrency-interest-bounces at cs.oswego.edu
> [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of DT
> Sent: Friday, 27 June 2014 2:08 PM
> To: concurrency-interest at cs.oswego.edu
> Subject: [concurrency-interest] Custom ThreadPoolExecutor implementation
> to handle priority based Callable tasks
>
>
> If we were to control Callable Tasks with Priorities and to manage
> execution of those callable tasks through ThreadPoolExecutor we would
> have to implement custom ThreadPoolExecutor and PriorityCallable (see a
> suggestion below). What other ways to accomplish this do you see ?
>
> public interface PriorityCallable extends Callable<Object> {
>      public abstract void setPriority(int priority);
>      ...
> }
>
> public class CustomThreadPoolExecutor  extends ThreadPoolExecutor {
> ...
> ...
> public <T> List<Future<T>> customInvokeAll(
>                  Collection<? extends Callable<T>> tasks, long timeout,
> TimeUnit unit) // pass PriorityCallable
>                  throws InterruptedException {
>              if (tasks == null || unit == null)
>                  throw new NullPointerException();
>              long nanos = unit.toNanos(timeout);
>              List<Future<T>> futures = new
> ArrayList<Future<T>>(tasks.size());
>              boolean done = false;
>              try {
>                  // handle Priority based Callable tasks here, though
> can expect timing issues
>                  // custom logic to group Callable tasks , if (priority
> == 1, 2, 3 create group of callables, etc )
>                  for (Callable<T> t : tasks)
>                      futures.add(newTaskFor(t));
>
>                  long lastTime = System.nanoTime();
>
>                  // Interleave time checks and calls to execute in case
>                  // executor doesn't have any/much parallelism.
>                  Iterator<Future<T>> it = futures.iterator();
>                  while (it.hasNext()) {
>                      execute((Runnable)(it.next())); // should we expect
> futures get executed within the same timeframe for Callables with
> different priorities
>                      long now = System.nanoTime();
>                      nanos -= now - lastTime;
>                      lastTime = now;
>                      if (nanos <= 0)
>                          return futures;
>                  }
>
>                  for (Future<T> f : futures) {
>                      if (!f.isDone()) {
>                          if (nanos <= 0)
>                              return futures;
>                          try {
>                              f.get(nanos, TimeUnit.NANOSECONDS);
>                          } catch (CancellationException ignore) {
>                          } catch (ExecutionException ignore) { // Should
> we cancel tasks based on the priority as well?
>                          } catch (TimeoutException toe) {
>                              return futures;
>                          }
>                          long now = System.nanoTime();
>                          nanos -= now - lastTime;
>                          lastTime = now;
>                      }
>                  }
>                  done = true;
>                  return futures;
>              } finally {
>                  if (!done)
>                      for (Future<T> f : futures)
>                          f.cancel(true);
>              }
>          }
> ...
> }
>
> Thank you,
> dt
> http://www.flyingtroika.com/
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest



More information about the Concurrency-interest mailing list