[concurrency-interest] Canceling Futures - Callable implementations

Joe Bowbeer joe.bowbeer at gmail.com
Wed Apr 8 18:17:52 EDT 2009


Correction: The internal task should add the outer task (Worker) to the
completion queue, as follows, so that the tasks you take from the queue are
the same ones you constructed:

    private final FutureTask<V> task =
        new FutureTask<V>(new Callable<V>() {
            public V call() throws Exception {
                return compute();
            }
        }) {
            @Override protected void done() {
                completionQueue.add(Worker.this);
            }
        };

Joe

On Wed, Apr 8, 2009 at 3:05 PM, Joe Bowbeer wrote:

> Peter,
>
> ExecutorCompletionService is dead simple.
>
> It's based on a trivial FutureTask extension that hooks the done() method
> in order to enqueue the original task on a completion queue:
>
>   private class QueueingFuture extends FutureTask<Void> {
>       QueueingFuture(RunnableFuture<V> task) {
>           super(task, null);
>           this.task = task;
>       }
>       protected void done() { completionQueue.add(task); }
>       private final Future<V> task;
>   }
>
> This functionality could easily be added to MyTask.  (See one approach
> below.)  Then you can eliminate the completion service and "take" directly
> from the completion queue.
>
> A few more things to point out:
>
> If you construct the completion service with an AbstractExecutorService,
> the completion service asks the your executor service to construct the
> task.  (See newTaskFor method).  This task, created by your executor
> service, is then returned to you by the completion service, and this is the
> same task that is enqueued.  You might explore this avenue for task
> customization.
>
> A custom "completion task" designed for subclassing follows.  Your subclass
> overrides the "compute" method.  Note that your compute method
> implementation can query isCancelled.
>
> public abstract class Worker<V> implements RunnableFuture<V> {
>
>     protected Worker(BlockingQueue<Future<V>> completionQueue) {
>         this.completionQueue = completionQueue;
>     }
>
>     private final BlockingQueue<Future<V>> completionQueue;
>
>     /**
>      * Calls the <tt>compute</tt> method to compute the result.
>      */
>     private final FutureTask<V> task =
>         new FutureTask<V>(new Callable<V>() {
>             public V call() throws Exception {
>                 return compute();
>             }
>         }) {
>             @Override protected void done() {
>                 completionQueue.add(task);
>             }
>         };
>
>     /**
>      * Computes the value to be returned by the <tt>get</tt> method.
>      */
>     protected abstract V compute() throws Exception;
>
>     /* RunnableFuture implementation. */
>
>     public void run() {
>         task.run();
>     }
>
>     public boolean cancel(boolean mayInterruptIfRunning) {
>         return task.cancel(mayInterruptIfRunning);
>     }
>
>     public boolean isCancelled() {
>         return task.isCancelled();
>     }
>
>     public boolean isDone() {
>         return task.isDone();
>     }
>
>     public V get() throws InterruptedException, ExecutionException {
>         return task.get();
>     }
>
>     public V get(long timeout, TimeUnit unit)
>         throws InterruptedException, ExecutionException, TimeoutException {
>         return task.get(timeout, unit);
>     }
> }
>
> Sample construction:
>
>   BlockingQueue<Future<Integer>> queue =
>           new LinkedBlockingQueue<Future<Integer>>();
>
>   Worker<Integer> worker = new Worker<Integer>(queue) {
>       protected Integer compute() {
>           while (isCancelled()) {
>               Thread.yield(); // Please!!
>           }
>           return 0;
>       }
>   };
>
> Joe
>
>
> 2009/4/8 Péter Kovács
>
>> One application of this approach where I cannot find an elegant
>> solution is when I want to use an ExecutorCompletionService for
>> "joining" a definite number of "cancelable" concurrent tasks:
>>
>> With a hypothetical collaboration of future-aware Callables, Futures
>> and Executors built into the API, I could (1) store the futures
>> returned by ExecutorCompletionService.submit in a list, (2) remove
>> from the list each future returned by ExecutorCompletionService.take,
>> (3) get the result off of the same future and (4) check if the list is
>> empty to see if more futures need to be taken from the ECS. Something
>> like:
>>
>> while (myFutures.size() > 0) {
>>    Future<String> f = executorCompletionService.take();
>>    myFutures.remove(f);
>>    f.get();
>> }
>>
>> Using the "MyTask approach" with ECS, I have to do something like this:
>>
>> while (nrCompletedTasks < myTasks.size()) {
>>    Future<String> f = executorCompletionService.take();
>>    nrCompletedTasks++;
>>    f.get();
>> }
>>
>> Here, I have the extra job of keeping track of the number of finished
>> tasks -- in addition to storing references to the tasks (MyTasks) for
>> potential canceling. Also -- as it is unknown which MyTask instance
>> corresponds to a given future returned by ECS.take() --, I end up
>> using MyTask only for cancellation and the ECS-generated futures for
>> getting the results. As Future is basically meant to be a "handle" to
>> an asynchronous task, having to unrelated "handle" to the same task
>> with split functionality is (conceptually) confusing.
>>
>> Comments appreciated.
>>
>> Thanks
>> Peter
>>
>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20090408/82fd6aea/attachment.html>


More information about the Concurrency-interest mailing list