[concurrency-interest] Canceling Futures - Callable implementations

Joe Bowbeer joe.bowbeer at gmail.com
Wed Apr 8 18:33:21 EDT 2009


And yet another correction, this time in the sample construction:

  Worker<Integer> worker = new Worker<Integer>(queue) {
      protected Integer compute() {
          while (!isCancelled()) {
              // ...
          }
          return 0;
      }
  };

Should be while(!isCancelled()) ...

Joe

On Wed, Apr 8, 2009 at 3:17 PM, Joe Bowbeer <joe.bowbeer at gmail.com> wrote:

> Correction: The internal task should add the outer task (Worker) to the
> completion queue, as follows, so that the tasks you take from the queue are
> the same ones you constructed:
>
>     private final FutureTask<V> task =
>         new FutureTask<V>(new Callable<V>() {
>             public V call() throws Exception {
>                 return compute();
>             }
>         }) {
>             @Override protected void done() {
>                 completionQueue.add(Worker.this);
>             }
>         };
>
> Joe
>
>
> On Wed, Apr 8, 2009 at 3:05 PM, Joe Bowbeer wrote:
>
>> Peter,
>>
>> ExecutorCompletionService is dead simple.
>>
>> It's based on a trivial FutureTask extension that hooks the done() method
>> in order to enqueue the original task on a completion queue:
>>
>>   private class QueueingFuture extends FutureTask<Void> {
>>       QueueingFuture(RunnableFuture<V> task) {
>>           super(task, null);
>>           this.task = task;
>>       }
>>       protected void done() { completionQueue.add(task); }
>>       private final Future<V> task;
>>   }
>>
>> This functionality could easily be added to MyTask.  (See one approach
>> below.)  Then you can eliminate the completion service and "take" directly
>> from the completion queue.
>>
>> A few more things to point out:
>>
>> If you construct the completion service with an AbstractExecutorService,
>> the completion service asks your executor service to construct the task.
>> (See newTaskFor method).  This task, created by your executor service, is
>> then returned to you by the completion service, and this is the same task
>> that is enqueued.  You might explore this avenue for task customization.
>>
>> A custom "completion task" designed for subclassing follows.  Your
>> subclass overrides the "compute" method.  Note that your compute method
>> implementation can query isCancelled.
>>
>> public abstract class Worker<V> implements RunnableFuture<V> {
>>
>>     protected Worker(BlockingQueue<Future<V>> completionQueue) {
>>         this.completionQueue = completionQueue;
>>     }
>>
>>     private final BlockingQueue<Future<V>> completionQueue;
>>
>>     /**
>>      * Calls the <tt>compute</tt> method to compute the result.
>>      */
>>     private final FutureTask<V> task =
>>         new FutureTask<V>(new Callable<V>() {
>>             public V call() throws Exception {
>>                 return compute();
>>             }
>>         }) {
>>             @Override protected void done() {
>>                 completionQueue.add(Worker.this);
>>             }
>>         };
>>
>>     /**
>>      * Computes the value to be returned by the <tt>get</tt> method.
>>      */
>>     protected abstract V compute() throws Exception;
>>
>>     /* RunnableFuture implementation. */
>>     public void run() {
>>         task.run();
>>     }
>>
>>     public boolean cancel(boolean mayInterruptIfRunning) {
>>         return task.cancel(mayInterruptIfRunning);
>>     }
>>
>>     public boolean isCancelled() {
>>         return task.isCancelled();
>>     }
>>
>>     public boolean isDone() {
>>         return task.isDone();
>>     }
>>
>>     public V get() throws InterruptedException, ExecutionException {
>>         return task.get();
>>     }
>>
>>     public V get(long timeout, TimeUnit unit)
>>         throws InterruptedException, ExecutionException, TimeoutException
>> {
>>         return task.get(timeout, unit);
>>     }
>> }
>>
>> Sample construction:
>>
>>   BlockingQueue<Future<Integer>> queue =
>>           new LinkedBlockingQueue<Future<Integer>>();
>>
>>   Worker<Integer> worker = new Worker<Integer>(queue) {
>>       protected Integer compute() {
>>           while (!isCancelled()) {
>>               Thread.yield(); // Please!!
>>           }
>>           return 0;
>>       }
>>   };
>>
>> Joe
>>
>>
>> 2009/4/8 Péter Kovács
>>
>>> One application of this approach where I cannot find an elegant
>>> solution is when I want to use an ExecutorCompletionService for
>>> "joining" a definite number of "cancelable" concurrent tasks:
>>>
>>> With a hypothetical collaboration of future-aware Callables, Futures
>>> and Executors built into the API, I could (1) store the futures
>>> returned by ExecutorCompletionService.submit in a list, (2) remove
>>> from the list each future returned by ExecutorCompletionService.take,
>>> (3) get the result off of the same future and (4) check if the list is
>>> empty to see if more futures need to be taken from the ECS. Something
>>> like:
>>>
>>> while (myFutures.size() > 0) {
>>>    Future<String> f = executorCompletionService.take();
>>>    myFutures.remove(f);
>>>    f.get();
>>> }
>>>
>>> Using the "MyTask approach" with ECS, I have to do something like this:
>>>
>>> while (nrCompletedTasks < myTasks.size()) {
>>>    Future<String> f = executorCompletionService.take();
>>>    nrCompletedTasks++;
>>>    f.get();
>>> }
>>>
>>> Here, I have the extra job of keeping track of the number of finished
>>> tasks -- in addition to storing references to the tasks (MyTasks) for
>>> potential canceling. Also -- as it is unknown which MyTask instance
>>> corresponds to a given future returned by ECS.take() --, I end up
>>> using MyTask only for cancellation and the ECS-generated futures for
>>> getting the results. As Future is basically meant to be a "handle" to
>>> an asynchronous task, having to unrelated "handle" to the same task
>>> with split functionality is (conceptually) confusing.
>>>
>>> Comments appreciated.
>>>
>>> Thanks
>>> Peter
>>>
>>
>>
>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20090408/c3bd0874/attachment-0001.html>


More information about the Concurrency-interest mailing list