[concurrency-interest] Re: new task coordination operations (Doug Lea)

Doug Lea dl@cs.oswego.edu
Wed, 17 Dec 2003 08:14:41 -0500


> Whenever I see two (and only two) options in an API, I think "looks like
> an opportunity for a strategy pattern". 

Agreed, almost. While it might look like it, ExecutorCompletionService
exists so that you can apply your own strategies/policies across
different contexts cases while still using a common framework/pattern.

The reason for not using a single pluggable strategy object here is
that the policy variation can encompass not only how many results to
process, but also whether and how the results should be combined,
whether to ignore those tasks that enounter exceptions, and whether to
cancel unfinished tasks upon return. So the only reasonable option we
know is to allow people to write the code dealing with each of these,
as appropriate.

The invokeAny and invokeAll methods provide ready-made solutions using
the most common policy choices for the two most common cases.  Some
new usage examples in ExecutorCompletionService show some additional
variations. Pasting them in:


 Usage Examples.  Suppose you have a set of solvers for a certain problem, and would like to run them concurrently, using the results of each of them that return a non-null value. You could write this as:

    void solve(Executor e, Collection<Callable<Result>> solvers) 
        throws InterruptedException, ExecutionException {
        ExecutorCompletionService<Result> ecs = new
            ExecutorCompletionService<Result>(e);
        for (Callable<Result> s : solvers)
            ecs.submit(s);
        int n = solvers.size();
        for (int i = 0; i < n; ++i) {
            Result r = ecs.take().get();
            if (r != null) 
                use(r);
        }
    }
 

Suppose instead that you would like to use the first non-null result of a set of tasks, ignoring any of those that encounter exceptions and cancelling all of the other tasks when the first one is ready:

    void solve(Executor e, Collection<Callable<Result>> solvers) 
        throws InterruptedException {
        ExecutorCompletionService<Result> ecs = 
            new ExecutorCompletionService<Result>(e);
        int n = solvers.size();
        ArrayList<Future<Result>> futures = 
            new ArrayList<Future<Result>>(n);
        Result result = null;
        try {
            for (Callable<Result> s : solvers)
                futures.add(ecs.submit(s));
            for (int i = 0; i < n; ++i) {
                try {
                    Result r = ecs.take().get();
                    if (r != null) {
                        result = r;
                        break;
                    }
                } catch(ExecutionException ignore) {}
            }
        }
        finally {
            for (<Future<Result> f : futures)
                f.cancel(true);
        }

        if (result != null)
            use(result);
    }