[concurrency-interest] Awaiting a set of tasks on an ExecutorService

Shevek shevek at anarres.org
Fri Aug 18 17:38:26 EDT 2017

Thank you all.

I now have this:


It's meant to allow multiple submits and awaits from various threads; 
the caller is responsible for making sure that people stop calling 
submit() before anyone calls await() in seriousness.

I do need backpressure; I've been achieving that with an 
ArrayBlockingQueue(ncpus * 10) and a CallerRunsPolicy, and the *10 is 
meant to deal with the uneven-ness in the task sizes. Is that sane?


On 08/18/2017 02:06 PM, Josh Humphries wrote:
> Sorry that my suggestion got broken into two message. Indeed my first 
> suggestion was incomplete. But the second suggestion, of using a 
> CompletionService should suffice. Would you queue up 50 million tasks in 
> the scenario you describe or do you also need a solution that provides 
> back-pressure?
> Without back-pressure, it's simple enough to submit everything to a 
> CompletionService and then poll as many as you submitted when done. But 
> that also will buffer up all results as they complete (and if using a 
> limited-capacity queue, it could deadlock if it's the same thread 
> submitting as polling).
> If you need backpressure, I'd imagine something more like this:
>   * One thread pulling tasks as they come and submitting to the
>     CompletionService.
>       o Before submitting each task:
>           + Acquire a permit from a semaphore that has been setup with
>             the maximum number of in-flight/queued tasks you want.
>           + Increment a counter (e.g. AtomicLong).
>       o After submitting the last task (e.g. end of stream has been
>         detected), set a flag (AtomicBoolean) and then also submit a
>         no-op task (sentinel) to the CompletionService.
>           + Acquire permit and increment counter, even for the no-op task
>   * Have a second thread that is awaiting completion of all tasks. It is
>     in a loop polling the CompletionService.
>       o After polling a task, add exception to list of failures if list
>         is not already maxed out (e.g. already extracted 100 failures).
>       o Also release a permit back to the semaphore and decrement the
>         counter.
>       o If counter has reached zero and flag is set, all tasks are
>         processed. Done!
>       o Otherwise, go back to top of loop, to poll next task.
> ----
> *Josh Humphries*
> jhump at bluegosling.com <mailto:jhump at bluegosling.com>
> On Fri, Aug 18, 2017 at 4:54 PM, Shevek <shevek at anarres.org 
> <mailto:shevek at anarres.org>> wrote:
>     I can't use CountDownLatch because of the streamy nature of the
>     source. I have no idea how many tasks there are going to be up
>     front. I read an incoming stream, it turns out there's about 50
>     million in it, and this is going to go up by the usual orders of
>     magnitude.
>     I could use a List of CountDownLatches each with ... but then I
>     rapidly get into "Someone cleverer than me MUST have solved this
>     before" territory.
>     I can do my own tricks with an AtomicLong and wait()/notify(), but I
>     have to make sure the master thread calls get() on the relevant
>     Future each time a job finishes, otherwise there isn't a
>     happens-before relationship with the master thread, so I'd need the
>     job to put its own Future onto a Deque, or something...?
>     The tasks all return Void, but it's nice to collect the first few
>     exceptions (currently there are about 750,000 total exceptions thrown).
>     S.
>     On 08/18/2017 01:48 PM, Josh Humphries wrote:
>         I think the easiest thing would be to decorate each task to call
>         "latch.countDown()" on a CountDownLatch that is initialized with
>         the total number of tasks. After they are all submitted, the
>         code that wants to wait for them to finish would simply await
>         the latch. This decouples completion of all tasks from the
>         actual ExecutorService that is running them, so you can share
>         the same ExecutorService for multiple, even overlapping, sets of
>         tasks.
>         ----
>         *Josh Humphries*
>         jhump at bluegosling.com <mailto:jhump at bluegosling.com>
>         <mailto:jhump at bluegosling.com <mailto:jhump at bluegosling.com>>
>         On Fri, Aug 18, 2017 at 4:42 PM, Shevek <shevek at anarres.org
>         <mailto:shevek at anarres.org> <mailto:shevek at anarres.org
>         <mailto:shevek at anarres.org>>> wrote:
>              Hi,
>              I need to build a complicated object on a multiprocessor
>         system. The
>              target object uses synchronization to guarantee thread-safe
>         access.
>              However, the individual tasks which build elements of the
>         target
>              object are expensive, so I farm them out to an
>         ExecutorService. The
>              question is:
>              What is the recommended way to submit a stream of tasks to an
>              ExecutorService, and then at a point, wait until they're
>         all done.
>              * I don't know the list of tasks up front. I read them
>         iteratively
>              from a set of files or events.
>              * A big collection of Future(s) isn't realistic, there are
>         just too
>              many tasks.
>              * Reaping completed futures on submission means that I end
>         up with
>              exceptions in weird places; I really need to gather max 100
>              suppressed exceptions, and discard the rest.
>              * ForkJoinPool has invoke() but I think for a million-task
>         job, I
>              still end up with a huge list of futures. This also assumes
>         I ignore
>              the note about synchronization.
>              * CompletionService allows me to wait for _any_ of the
>         submitted
>              tasks, but not for _all_ of them.
>              * Bonus points for sharing a single ExecutorService and having
>              "sets" of tasks which can be independently awaited. This
>         starts to
>              sound very like FJP, except for the blocking/I/O and the
>         stream-ness
>              of my task set.
>              Mostly, dear list, I'm expecting a one-liner from some API
>         class
>              that I've missed because this is NOT an original request, I
>         just
>              can't find a standard solution.
>              Thank you.
>              S.
>              _______________________________________________
>              Concurrency-interest mailing list
>         Concurrency-interest at cs.oswego.edu
>         <mailto:Concurrency-interest at cs.oswego.edu>
>              <mailto:Concurrency-interest at cs.oswego.edu
>         <mailto:Concurrency-interest at cs.oswego.edu>>
>         http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>         <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>              <http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>         <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>>

More information about the Concurrency-interest mailing list