[concurrency-interest] Canceling Futures - Callable implementations

Péter Kovács peter.kovacs.1.0rc at gmail.com
Thu Apr 9 06:50:47 EDT 2009


Joe,

Thank you so much for all your suggestions. I found them very helpful indeed.

Peter

On Thu, Apr 9, 2009 at 12:33 AM, Joe Bowbeer <joe.bowbeer at gmail.com> wrote:
> And yet another correction, this time in the sample construction:
>
>   Worker<Integer> worker = new Worker<Integer>(queue) {
>       protected Integer compute() {
>           while (!isCancelled()) {
>               // ...
>           }
>           return 0;
>       }
>   };
>
> Should be while(!isCancelled()) ...
>
> Joe
>
> On Wed, Apr 8, 2009 at 3:17 PM, Joe Bowbeer <joe.bowbeer at gmail.com> wrote:
>>
>> Correction: The internal task should add the outer task (Worker) to the
>> completion queue, as follows, so that the tasks you take from the queue are
>> the same ones you constructed:
>>
>>     private final FutureTask<V> task =
>>         new FutureTask<V>(new Callable<V>() {
>>             public V call() throws Exception {
>>                 return compute();
>>             }
>>         }) {
>>             @Override protected void done() {
>>                 completionQueue.add(Worker.this);
>>             }
>>         };
>>
>> Joe
>>
>> On Wed, Apr 8, 2009 at 3:05 PM, Joe Bowbeer wrote:
>>>
>>> Peter,
>>>
>>> ExecutorCompletionService is dead simple.
>>>
>>> It's based on a trivial FutureTask extension that hooks the done() method
>>> in order to enqueue the original task on a completion queue:
>>>
>>>   private class QueueingFuture extends FutureTask<Void> {
>>>       QueueingFuture(RunnableFuture<V> task) {
>>>           super(task, null);
>>>           this.task = task;
>>>       }
>>>       protected void done() { completionQueue.add(task); }
>>>       private final Future<V> task;
>>>   }
>>>
>>> This functionality could easily be added to MyTask.  (See one approach
>>> below.)  Then you can eliminate the completion service and "take" directly
>>> from the completion queue.
>>>
>>> A few more things to point out:
>>>
>>> If you construct the completion service with an AbstractExecutorService,
>>> the completion service asks your executor service to construct the task.
>>> (See newTaskFor method).  This task, created by your executor service, is
>>> then returned to you by the completion service, and this is the same task
>>> that is enqueued.  You might explore this avenue for task customization.
>>>
>>> A custom "completion task" designed for subclassing follows.  Your
>>> subclass overrides the "compute" method.  Note that your compute method
>>> implementation can query isCancelled.
>>>
>>> public abstract class Worker<V> implements RunnableFuture<V> {
>>>
>>>     protected Worker(BlockingQueue<Future<V>> completionQueue) {
>>>         this.completionQueue = completionQueue;
>>>     }
>>>
>>>     private final BlockingQueue<Future<V>> completionQueue;
>>>
>>>     /**
>>>      * Calls the <tt>compute</tt> method to compute the result.
>>>      */
>>>     private final FutureTask<V> task =
>>>         new FutureTask<V>(new Callable<V>() {
>>>             public V call() throws Exception {
>>>                 return compute();
>>>             }
>>>         }) {
>>>             @Override protected void done() {
>>>                 completionQueue.add(Worker.this);
>>>             }
>>>         };
>>>
>>>     /**
>>>      * Computes the value to be returned by the <tt>get</tt> method.
>>>      */
>>>     protected abstract V compute() throws Exception;
>>>
>>>     /* RunnableFuture implementation. */
>>>     public void run() {
>>>         task.run();
>>>     }
>>>
>>>     public boolean cancel(boolean mayInterruptIfRunning) {
>>>         return task.cancel(mayInterruptIfRunning);
>>>     }
>>>
>>>     public boolean isCancelled() {
>>>         return task.isCancelled();
>>>     }
>>>
>>>     public boolean isDone() {
>>>         return task.isDone();
>>>     }
>>>
>>>     public V get() throws InterruptedException, ExecutionException {
>>>         return task.get();
>>>     }
>>>
>>>     public V get(long timeout, TimeUnit unit)
>>>         throws InterruptedException, ExecutionException, TimeoutException
>>> {
>>>         return task.get(timeout, unit);
>>>     }
>>> }
>>>
>>> Sample construction:
>>>
>>>   BlockingQueue<Future<Integer>> queue =
>>>           new LinkedBlockingQueue<Future<Integer>>();
>>>
>>>   Worker<Integer> worker = new Worker<Integer>(queue) {
>>>       protected Integer compute() {
>>>           while (!isCancelled()) {
>>>               Thread.yield(); // Please!!
>>>           }
>>>           return 0;
>>>       }
>>>   };
>>>
>>> Joe
>>>
>>> 2009/4/8 Péter Kovács
>>>>
>>>> One application of this approach where I cannot find an elegant
>>>> solution is when I want to use an ExecutorCompletionService for
>>>> "joining" a definite number of "cancelable" concurrent tasks:
>>>>
>>>> With a hypothetical collaboration of future-aware Callables, Futures
>>>> and Executors built into the API, I could (1) store the futures
>>>> returned by ExecutorCompletionService.submit in a list, (2) remove
>>>> from the list each future returned by ExecutorCompletionService.take,
>>>> (3) get the result off of the same future and (4) check if the list is
>>>> empty to see if more futures need to be taken from the ECS. Something
>>>> like:
>>>>
>>>> while (myFutures.size() > 0) {
>>>>    Future<String> f = executorCompletionService.take();
>>>>    myFutures.remove(f);
>>>>    f.get();
>>>> }
>>>>
>>>> Using the "MyTask approach" with ECS, I have to do something like this:
>>>>
>>>> while (nrCompletedTasks < myTasks.size()) {
>>>>    Future<String> f = executorCompletionService.take();
>>>>    nrCompletedTasks++;
>>>>    f.get();
>>>> }
>>>>
>>>> Here, I have the extra job of keeping track of the number of finished
>>>> tasks -- in addition to storing references to the tasks (MyTasks) for
>>>> potential canceling. Also -- as it is unknown which MyTask instance
>>>> corresponds to a given future returned by ECS.take() --, I end up
>>>> using MyTask only for cancellation and the ECS-generated futures for
>>>> getting the results. As Future is basically meant to be a "handle" to
>>>> an asynchronous task, having to unrelated "handle" to the same task
>>>> with split functionality is (conceptually) confusing.
>>>>
>>>> Comments appreciated.
>>>>
>>>> Thanks
>>>> Peter
>>>
>>>
>>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>



More information about the Concurrency-interest mailing list