[concurrency-interest] Canceling Futures - Callable implementations

Joe Bowbeer joe.bowbeer at gmail.com
Wed Apr 8 18:05:07 EDT 2009


Peter,

ExecutorCompletionService is dead simple.

It's based on a trivial FutureTask extension that hooks the done() method in
order to enqueue the original task on a completion queue:

  private class QueueingFuture extends FutureTask<Void> {
      QueueingFuture(RunnableFuture<V> task) {
          super(task, null);
          this.task = task;
      }
      protected void done() { completionQueue.add(task); }
      private final Future<V> task;
  }

This functionality could easily be added to MyTask.  (See one approach
below.)  Then you can eliminate the completion service and "take" directly
from the completion queue.

A few more things to point out:

If you construct the completion service with an AbstractExecutorService, the
completion service asks the your executor service to construct the task.
(See newTaskFor method).  This task, created by your executor service, is
then returned to you by the completion service, and this is the same task
that is enqueued.  You might explore this avenue for task customization.

A custom "completion task" designed for subclassing follows.  Your subclass
overrides the "compute" method.  Note that your compute method
implementation can query isCancelled.

public abstract class Worker<V> implements RunnableFuture<V> {

    protected Worker(BlockingQueue<Future<V>> completionQueue) {
        this.completionQueue = completionQueue;
    }

    private final BlockingQueue<Future<V>> completionQueue;

    /**
     * Calls the <tt>compute</tt> method to compute the result.
     */
    private final FutureTask<V> task =
        new FutureTask<V>(new Callable<V>() {
            public V call() throws Exception {
                return compute();
            }
        }) {
            @Override protected void done() {
                completionQueue.add(task);
            }
        };

    /**
     * Computes the value to be returned by the <tt>get</tt> method.
     */
    protected abstract V compute() throws Exception;

    /* RunnableFuture implementation. */

    public void run() {
        task.run();
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return task.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return task.isCancelled();
    }

    public boolean isDone() {
        return task.isDone();
    }

    public V get() throws InterruptedException, ExecutionException {
        return task.get();
    }

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return task.get(timeout, unit);
    }
}

Sample construction:

  BlockingQueue<Future<Integer>> queue =
          new LinkedBlockingQueue<Future<Integer>>();

  Worker<Integer> worker = new Worker<Integer>(queue) {
      protected Integer compute() {
          while (isCancelled()) {
              Thread.yield(); // Please!!
          }
          return 0;
      }
  };

Joe

2009/4/8 Péter Kovács

> One application of this approach where I cannot find an elegant
> solution is when I want to use an ExecutorCompletionService for
> "joining" a definite number of "cancelable" concurrent tasks:
>
> With a hypothetical collaboration of future-aware Callables, Futures
> and Executors built into the API, I could (1) store the futures
> returned by ExecutorCompletionService.submit in a list, (2) remove
> from the list each future returned by ExecutorCompletionService.take,
> (3) get the result off of the same future and (4) check if the list is
> empty to see if more futures need to be taken from the ECS. Something
> like:
>
> while (myFutures.size() > 0) {
>    Future<String> f = executorCompletionService.take();
>    myFutures.remove(f);
>    f.get();
> }
>
> Using the "MyTask approach" with ECS, I have to do something like this:
>
> while (nrCompletedTasks < myTasks.size()) {
>    Future<String> f = executorCompletionService.take();
>    nrCompletedTasks++;
>    f.get();
> }
>
> Here, I have the extra job of keeping track of the number of finished
> tasks -- in addition to storing references to the tasks (MyTasks) for
> potential canceling. Also -- as it is unknown which MyTask instance
> corresponds to a given future returned by ECS.take() --, I end up
> using MyTask only for cancellation and the ECS-generated futures for
> getting the results. As Future is basically meant to be a "handle" to
> an asynchronous task, having to unrelated "handle" to the same task
> with split functionality is (conceptually) confusing.
>
> Comments appreciated.
>
> Thanks
> Peter
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20090408/dde0a755/attachment-0001.html>


More information about the Concurrency-interest mailing list