[concurrency-interest] Awaiting a set of tasks on an ExecutorService

Dávid Karnok akarnokd at gmail.com
Fri Aug 18 17:25:14 EDT 2017


I don't think there is a one liner solution for your case given the amount
of tasks, exception handling and other constraints.

You may consider using a reactive library for the job, which are data
centric instead of task centric and can manage an unknow number of data
items, asynchrony, parallelism and error handling for you.

Due to this data centric viewpoint, I can only give a general feel how I'd
do a files-lines processing with RxJava:

ExpensiveObject o = new ExpensiveObject();
List<Throwable> errors = new ArrayList<Throwable>();

Flowable.fromIterable(listOfFiles)
    .flatMapIterable(file -> Files.readLines(file))
    .parallel()
    .runOn(Schedulers.computation())
    .map(line -> {
         List<Runnable> toExecute = new ArrayList<>();
         try {
             toExecute.add(() -> { o.lines++ });

             if (line.contains("error")) {
                 toExecute.add(() -> { o.errorLines++ });
             }
         } catch (Throwable ex) {
             toExecute.add(() -> { if (errors.size() < 100) {
errors.add(ex); } });
         }
         return toExecute;
    })
    .sequential()
    .blockingSubscribe(list -> list.forEach(Runnable::run),
Throwable::printStackTrace);

In your case, you'd probably calculate something in "map", then create a
Runnable that sets a field/calls a setter on "o" with the calculated value.

2017-08-18 22:42 GMT+02:00 Shevek <shevek at anarres.org>:

> Hi,
>
> I need to build a complicated object on a multiprocessor system. The
> target object uses synchronization to guarantee thread-safe access.
> However, the individual tasks which build elements of the target object are
> expensive, so I farm them out to an ExecutorService. The question is:
>
> What is the recommended way to submit a stream of tasks to an
> ExecutorService, and then at a point, wait until they're all done.
>
> * I don't know the list of tasks up front. I read them iteratively from a
> set of files or events.
> * A big collection of Future(s) isn't realistic, there are just too many
> tasks.
> * Reaping completed futures on submission means that I end up with
> exceptions in weird places; I really need to gather max 100 suppressed
> exceptions, and discard the rest.
> * ForkJoinPool has invoke() but I think for a million-task job, I still
> end up with a huge list of futures. This also assumes I ignore the note
> about synchronization.
> * CompletionService allows me to wait for _any_ of the submitted tasks,
> but not for _all_ of them.
>
> * Bonus points for sharing a single ExecutorService and having "sets" of
> tasks which can be independently awaited. This starts to sound very like
> FJP, except for the blocking/I/O and the stream-ness of my task set.
>
> Mostly, dear list, I'm expecting a one-liner from some API class that I've
> missed because this is NOT an original request, I just can't find a
> standard solution.
>
> Thank you.
>
> S.
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>



-- 
Best regards,
David Karnok
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170818/7cc47ea6/attachment.html>


More information about the Concurrency-interest mailing list