[concurrency-interest] Awaiting a set of tasks on an ExecutorService

Josh Humphries jhump at bluegosling.com
Sat Aug 19 12:44:11 EDT 2017

On Fri, Aug 18, 2017 at 5:38 PM, Shevek <shevek at anarres.org> wrote:

> Thank you all.
> I now have this:
> https://gist.github.com/shevek/5f8b3fad55c346ca1391349404a2c0fa
> It's meant to allow multiple submits and awaits from various threads; the
> caller is responsible for making sure that people stop calling submit()
> before anyone calls await() in seriousness.

Thinking about this a bit more, I think Nathan Reynold's suggestion to use
a Phaser, instead of my CompletionService idea, would be better. This will
result in less garbage (since you don't need to create and queue futures --
"execute" the tasks instead of "submit" them) and allows you to skip the
logic that drains the completion service queue as tasks are submitted, to
prevent too many from accumulating (which is non-deterministic and could
still result in a large number of futures queued up if many tasks can
complete in between calls to add new tasks).

As tasks are submitted, "register" with the phaser. Then you need only
"await" the phaser's current/latest phase at the end. You'd then decorate
each task to catch any throwables (and record up to 100, or whatever limit,
in a concurrent or synchronized collection) as well as finally "arrive"
with the phaser on completion.

Phaser's API is a bit confusing IMO (capable of supporting far more complex
use cases than this one). I've written simple "up-and-down" latches (on top
of AbstractQueuedSynchronizer) that result in easier to read and understand
usages (more-or-less like Go's sync.WaitGroup), but I can't seem to find a
link to one right now (I guess the ones I've written weren't in any
open-source projects). If I find one, I'll send it in case it helps.

> I do need backpressure; I've been achieving that with an
> ArrayBlockingQueue(ncpus * 10) and a CallerRunsPolicy, and the *10 is meant
> to deal with the uneven-ness in the task sizes. Is that sane?
> S.
> On 08/18/2017 02:06 PM, Josh Humphries wrote:
>> Sorry that my suggestion got broken into two message. Indeed my first
>> suggestion was incomplete. But the second suggestion, of using a
>> CompletionService should suffice. Would you queue up 50 million tasks in
>> the scenario you describe or do you also need a solution that provides
>> back-pressure?
>> Without back-pressure, it's simple enough to submit everything to a
>> CompletionService and then poll as many as you submitted when done. But
>> that also will buffer up all results as they complete (and if using a
>> limited-capacity queue, it could deadlock if it's the same thread
>> submitting as polling).
>> If you need backpressure, I'd imagine something more like this:
>>   * One thread pulling tasks as they come and submitting to the
>>     CompletionService.
>>       o Before submitting each task:
>>           + Acquire a permit from a semaphore that has been setup with
>>             the maximum number of in-flight/queued tasks you want.
>>           + Increment a counter (e.g. AtomicLong).
>>       o After submitting the last task (e.g. end of stream has been
>>         detected), set a flag (AtomicBoolean) and then also submit a
>>         no-op task (sentinel) to the CompletionService.
>>           + Acquire permit and increment counter, even for the no-op task
>>   * Have a second thread that is awaiting completion of all tasks. It is
>>     in a loop polling the CompletionService.
>>       o After polling a task, add exception to list of failures if list
>>         is not already maxed out (e.g. already extracted 100 failures).
>>       o Also release a permit back to the semaphore and decrement the
>>         counter.
>>       o If counter has reached zero and flag is set, all tasks are
>>         processed. Done!
>>       o Otherwise, go back to top of loop, to poll next task.
>> ----
>> *Josh Humphries*
>> jhump at bluegosling.com <mailto:jhump at bluegosling.com>
>> On Fri, Aug 18, 2017 at 4:54 PM, Shevek <shevek at anarres.org <mailto:
>> shevek at anarres.org>> wrote:
>>     I can't use CountDownLatch because of the streamy nature of the
>>     source. I have no idea how many tasks there are going to be up
>>     front. I read an incoming stream, it turns out there's about 50
>>     million in it, and this is going to go up by the usual orders of
>>     magnitude.
>>     I could use a List of CountDownLatches each with ... but then I
>>     rapidly get into "Someone cleverer than me MUST have solved this
>>     before" territory.
>>     I can do my own tricks with an AtomicLong and wait()/notify(), but I
>>     have to make sure the master thread calls get() on the relevant
>>     Future each time a job finishes, otherwise there isn't a
>>     happens-before relationship with the master thread, so I'd need the
>>     job to put its own Future onto a Deque, or something...?
>>     The tasks all return Void, but it's nice to collect the first few
>>     exceptions (currently there are about 750,000 total exceptions
>> thrown).
>>     S.
>>     On 08/18/2017 01:48 PM, Josh Humphries wrote:
>>         I think the easiest thing would be to decorate each task to call
>>         "latch.countDown()" on a CountDownLatch that is initialized with
>>         the total number of tasks. After they are all submitted, the
>>         code that wants to wait for them to finish would simply await
>>         the latch. This decouples completion of all tasks from the
>>         actual ExecutorService that is running them, so you can share
>>         the same ExecutorService for multiple, even overlapping, sets of
>>         tasks.
>>         ----
>>         *Josh Humphries*
>>         jhump at bluegosling.com <mailto:jhump at bluegosling.com>
>>         <mailto:jhump at bluegosling.com <mailto:jhump at bluegosling.com>>
>>         On Fri, Aug 18, 2017 at 4:42 PM, Shevek <shevek at anarres.org
>>         <mailto:shevek at anarres.org> <mailto:shevek at anarres.org
>>         <mailto:shevek at anarres.org>>> wrote:
>>              Hi,
>>              I need to build a complicated object on a multiprocessor
>>         system. The
>>              target object uses synchronization to guarantee thread-safe
>>         access.
>>              However, the individual tasks which build elements of the
>>         target
>>              object are expensive, so I farm them out to an
>>         ExecutorService. The
>>              question is:
>>              What is the recommended way to submit a stream of tasks to an
>>              ExecutorService, and then at a point, wait until they're
>>         all done.
>>              * I don't know the list of tasks up front. I read them
>>         iteratively
>>              from a set of files or events.
>>              * A big collection of Future(s) isn't realistic, there are
>>         just too
>>              many tasks.
>>              * Reaping completed futures on submission means that I end
>>         up with
>>              exceptions in weird places; I really need to gather max 100
>>              suppressed exceptions, and discard the rest.
>>              * ForkJoinPool has invoke() but I think for a million-task
>>         job, I
>>              still end up with a huge list of futures. This also assumes
>>         I ignore
>>              the note about synchronization.
>>              * CompletionService allows me to wait for _any_ of the
>>         submitted
>>              tasks, but not for _all_ of them.
>>              * Bonus points for sharing a single ExecutorService and
>> having
>>              "sets" of tasks which can be independently awaited. This
>>         starts to
>>              sound very like FJP, except for the blocking/I/O and the
>>         stream-ness
>>              of my task set.
>>              Mostly, dear list, I'm expecting a one-liner from some API
>>         class
>>              that I've missed because this is NOT an original request, I
>>         just
>>              can't find a standard solution.
>>              Thank you.
>>              S.
>>              _______________________________________________
>>              Concurrency-interest mailing list
>>         Concurrency-interest at cs.oswego.edu
>>         <mailto:Concurrency-interest at cs.oswego.edu>
>>              <mailto:Concurrency-interest at cs.oswego.edu
>>         <mailto:Concurrency-interest at cs.oswego.edu>>
>>         http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>         <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>              <http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>         <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>>
>> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170819/15bebf27/attachment.html>

More information about the Concurrency-interest mailing list