[concurrency-interest] Canceling Futures - Callable implementations

Péter Kovács peter.kovacs.1.0rc at gmail.com
Thu Apr 9 13:56:42 EDT 2009


Just one more question:

I noticed the Thread.yield() calls in your samples. I thought this
call had any real effect only back when green threads were used. Can
it still make any difference?

Thanks
Peter

2009/4/9 Péter Kovács <peter.kovacs.1.0rc at gmail.com>:
> Joe,
>
> Thank you so much for all your suggestions. I found them very helpful indeed.
>
> Peter
>
> On Thu, Apr 9, 2009 at 12:33 AM, Joe Bowbeer <joe.bowbeer at gmail.com> wrote:
>> And yet another correction, this time in the sample construction:
>>
>>   Worker<Integer> worker = new Worker<Integer>(queue) {
>>       protected Integer compute() {
>>           while (!isCancelled()) {
>>               // ...
>>           }
>>           return 0;
>>       }
>>   };
>>
>> Should be while(!isCancelled()) ...
>>
>> Joe
>>
>> On Wed, Apr 8, 2009 at 3:17 PM, Joe Bowbeer <joe.bowbeer at gmail.com> wrote:
>>>
>>> Correction: The internal task should add the outer task (Worker) to the
>>> completion queue, as follows, so that the tasks you take from the queue are
>>> the same ones you constructed:
>>>
>>>     private final FutureTask<V> task =
>>>         new FutureTask<V>(new Callable<V>() {
>>>             public V call() throws Exception {
>>>                 return compute();
>>>             }
>>>         }) {
>>>             @Override protected void done() {
>>>                 completionQueue.add(Worker.this);
>>>             }
>>>         };
>>>
>>> Joe
>>>
>>> On Wed, Apr 8, 2009 at 3:05 PM, Joe Bowbeer wrote:
>>>>
>>>> Peter,
>>>>
>>>> ExecutorCompletionService is dead simple.
>>>>
>>>> It's based on a trivial FutureTask extension that hooks the done() method
>>>> in order to enqueue the original task on a completion queue:
>>>>
>>>>   private class QueueingFuture extends FutureTask<Void> {
>>>>       QueueingFuture(RunnableFuture<V> task) {
>>>>           super(task, null);
>>>>           this.task = task;
>>>>       }
>>>>       protected void done() { completionQueue.add(task); }
>>>>       private final Future<V> task;
>>>>   }
>>>>
>>>> This functionality could easily be added to MyTask.  (See one approach
>>>> below.)  Then you can eliminate the completion service and "take" directly
>>>> from the completion queue.
>>>>
>>>> A few more things to point out:
>>>>
>>>> If you construct the completion service with an AbstractExecutorService,
>>>> the completion service asks your executor service to construct the task.
>>>> (See newTaskFor method).  This task, created by your executor service, is
>>>> then returned to you by the completion service, and this is the same task
>>>> that is enqueued.  You might explore this avenue for task customization.
>>>>
>>>> A custom "completion task" designed for subclassing follows.  Your
>>>> subclass overrides the "compute" method.  Note that your compute method
>>>> implementation can query isCancelled.
>>>>
>>>> public abstract class Worker<V> implements RunnableFuture<V> {
>>>>
>>>>     protected Worker(BlockingQueue<Future<V>> completionQueue) {
>>>>         this.completionQueue = completionQueue;
>>>>     }
>>>>
>>>>     private final BlockingQueue<Future<V>> completionQueue;
>>>>
>>>>     /**
>>>>      * Calls the <tt>compute</tt> method to compute the result.
>>>>      */
>>>>     private final FutureTask<V> task =
>>>>         new FutureTask<V>(new Callable<V>() {
>>>>             public V call() throws Exception {
>>>>                 return compute();
>>>>             }
>>>>         }) {
>>>>             @Override protected void done() {
>>>>                 completionQueue.add(Worker.this);
>>>>             }
>>>>         };
>>>>
>>>>     /**
>>>>      * Computes the value to be returned by the <tt>get</tt> method.
>>>>      */
>>>>     protected abstract V compute() throws Exception;
>>>>
>>>>     /* RunnableFuture implementation. */
>>>>     public void run() {
>>>>         task.run();
>>>>     }
>>>>
>>>>     public boolean cancel(boolean mayInterruptIfRunning) {
>>>>         return task.cancel(mayInterruptIfRunning);
>>>>     }
>>>>
>>>>     public boolean isCancelled() {
>>>>         return task.isCancelled();
>>>>     }
>>>>
>>>>     public boolean isDone() {
>>>>         return task.isDone();
>>>>     }
>>>>
>>>>     public V get() throws InterruptedException, ExecutionException {
>>>>         return task.get();
>>>>     }
>>>>
>>>>     public V get(long timeout, TimeUnit unit)
>>>>         throws InterruptedException, ExecutionException, TimeoutException
>>>> {
>>>>         return task.get(timeout, unit);
>>>>     }
>>>> }
>>>>
>>>> Sample construction:
>>>>
>>>>   BlockingQueue<Future<Integer>> queue =
>>>>           new LinkedBlockingQueue<Future<Integer>>();
>>>>
>>>>   Worker<Integer> worker = new Worker<Integer>(queue) {
>>>>       protected Integer compute() {
>>>>           while (!isCancelled()) {
>>>>               Thread.yield(); // Please!!
>>>>           }
>>>>           return 0;
>>>>       }
>>>>   };
>>>>
>>>> Joe
>>>>
>>>> 2009/4/8 Péter Kovács
>>>>>
>>>>> One application of this approach where I cannot find an elegant
>>>>> solution is when I want to use an ExecutorCompletionService for
>>>>> "joining" a definite number of "cancelable" concurrent tasks:
>>>>>
>>>>> With a hypothetical collaboration of future-aware Callables, Futures
>>>>> and Executors built into the API, I could (1) store the futures
>>>>> returned by ExecutorCompletionService.submit in a list, (2) remove
>>>>> from the list each future returned by ExecutorCompletionService.take,
>>>>> (3) get the result off of the same future and (4) check if the list is
>>>>> empty to see if more futures need to be taken from the ECS. Something
>>>>> like:
>>>>>
>>>>> while (myFutures.size() > 0) {
>>>>>    Future<String> f = executorCompletionService.take();
>>>>>    myFutures.remove(f);
>>>>>    f.get();
>>>>> }
>>>>>
>>>>> Using the "MyTask approach" with ECS, I have to do something like this:
>>>>>
>>>>> while (nrCompletedTasks < myTasks.size()) {
>>>>>    Future<String> f = executorCompletionService.take();
>>>>>    nrCompletedTasks++;
>>>>>    f.get();
>>>>> }
>>>>>
>>>>> Here, I have the extra job of keeping track of the number of finished
>>>>> tasks -- in addition to storing references to the tasks (MyTasks) for
>>>>> potential canceling. Also -- as it is unknown which MyTask instance
>>>>> corresponds to a given future returned by ECS.take() --, I end up
>>>>> using MyTask only for cancellation and the ECS-generated futures for
>>>>> getting the results. As Future is basically meant to be a "handle" to
>>>>> an asynchronous task, having to unrelated "handle" to the same task
>>>>> with split functionality is (conceptually) confusing.
>>>>>
>>>>> Comments appreciated.
>>>>>
>>>>> Thanks
>>>>> Peter
>>>>
>>>>
>>>
>>
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>>
>



More information about the Concurrency-interest mailing list