[concurrency-interest] Awaiting a set of tasks on an ExecutorService

Josh Humphries jhump at bluegosling.com
Fri Aug 18 17:54:12 EDT 2017

On Fri, Aug 18, 2017 at 5:38 PM, Shevek <shevek at anarres.org> wrote:

> Thank you all.
> I now have this:
> https://gist.github.com/shevek/5f8b3fad55c346ca1391349404a2c0fa

> It's meant to allow multiple submits and awaits from various threads; the
> caller is responsible for making sure that people stop calling submit()
> before anyone calls await() in seriousness.
> I do need backpressure; I've been achieving that with an
> ArrayBlockingQueue(ncpus * 10) and a CallerRunsPolicy, and the *10 is meant
> to deal with the uneven-ness in the task sizes. Is that sane?

Definitely sane! ThreadPoolExecutor's rejection policy + fixed capacity
queue is definitely the way to do backpressure. I would have suggested it,
but wasn't sure if you needed to re-use ExecutorService because it was some
sort of otherwise shared resource. (In which case, relying on that for
backpressure vs. a separate mechanism could starve other clients of the
thread pool.)

> S.
> On 08/18/2017 02:06 PM, Josh Humphries wrote:
>> Sorry that my suggestion got broken into two message. Indeed my first
>> suggestion was incomplete. But the second suggestion, of using a
>> CompletionService should suffice. Would you queue up 50 million tasks in
>> the scenario you describe or do you also need a solution that provides
>> back-pressure?
>> Without back-pressure, it's simple enough to submit everything to a
>> CompletionService and then poll as many as you submitted when done. But
>> that also will buffer up all results as they complete (and if using a
>> limited-capacity queue, it could deadlock if it's the same thread
>> submitting as polling).
>> If you need backpressure, I'd imagine something more like this:
>>   * One thread pulling tasks as they come and submitting to the
>>     CompletionService.
>>       o Before submitting each task:
>>           + Acquire a permit from a semaphore that has been setup with
>>             the maximum number of in-flight/queued tasks you want.
>>           + Increment a counter (e.g. AtomicLong).
>>       o After submitting the last task (e.g. end of stream has been
>>         detected), set a flag (AtomicBoolean) and then also submit a
>>         no-op task (sentinel) to the CompletionService.
>>           + Acquire permit and increment counter, even for the no-op task
>>   * Have a second thread that is awaiting completion of all tasks. It is
>>     in a loop polling the CompletionService.
>>       o After polling a task, add exception to list of failures if list
>>         is not already maxed out (e.g. already extracted 100 failures).
>>       o Also release a permit back to the semaphore and decrement the
>>         counter.
>>       o If counter has reached zero and flag is set, all tasks are
>>         processed. Done!
>>       o Otherwise, go back to top of loop, to poll next task.
>> ----
>> *Josh Humphries*
>> jhump at bluegosling.com <mailto:jhump at bluegosling.com>
>> On Fri, Aug 18, 2017 at 4:54 PM, Shevek <shevek at anarres.org <mailto:
>> shevek at anarres.org>> wrote:
>>     I can't use CountDownLatch because of the streamy nature of the
>>     source. I have no idea how many tasks there are going to be up
>>     front. I read an incoming stream, it turns out there's about 50
>>     million in it, and this is going to go up by the usual orders of
>>     magnitude.
>>     I could use a List of CountDownLatches each with ... but then I
>>     rapidly get into "Someone cleverer than me MUST have solved this
>>     before" territory.
>>     I can do my own tricks with an AtomicLong and wait()/notify(), but I
>>     have to make sure the master thread calls get() on the relevant
>>     Future each time a job finishes, otherwise there isn't a
>>     happens-before relationship with the master thread, so I'd need the
>>     job to put its own Future onto a Deque, or something...?
>>     The tasks all return Void, but it's nice to collect the first few
>>     exceptions (currently there are about 750,000 total exceptions
>> thrown).
>>     S.
>>     On 08/18/2017 01:48 PM, Josh Humphries wrote:
>>         I think the easiest thing would be to decorate each task to call
>>         "latch.countDown()" on a CountDownLatch that is initialized with
>>         the total number of tasks. After they are all submitted, the
>>         code that wants to wait for them to finish would simply await
>>         the latch. This decouples completion of all tasks from the
>>         actual ExecutorService that is running them, so you can share
>>         the same ExecutorService for multiple, even overlapping, sets of
>>         tasks.
>>         ----
>>         *Josh Humphries*
>>         jhump at bluegosling.com <mailto:jhump at bluegosling.com>
>>         <mailto:jhump at bluegosling.com <mailto:jhump at bluegosling.com>>
>>         On Fri, Aug 18, 2017 at 4:42 PM, Shevek <shevek at anarres.org
>>         <mailto:shevek at anarres.org> <mailto:shevek at anarres.org
>>         <mailto:shevek at anarres.org>>> wrote:
>>              Hi,
>>              I need to build a complicated object on a multiprocessor
>>         system. The
>>              target object uses synchronization to guarantee thread-safe
>>         access.
>>              However, the individual tasks which build elements of the
>>         target
>>              object are expensive, so I farm them out to an
>>         ExecutorService. The
>>              question is:
>>              What is the recommended way to submit a stream of tasks to an
>>              ExecutorService, and then at a point, wait until they're
>>         all done.
>>              * I don't know the list of tasks up front. I read them
>>         iteratively
>>              from a set of files or events.
>>              * A big collection of Future(s) isn't realistic, there are
>>         just too
>>              many tasks.
>>              * Reaping completed futures on submission means that I end
>>         up with
>>              exceptions in weird places; I really need to gather max 100
>>              suppressed exceptions, and discard the rest.
>>              * ForkJoinPool has invoke() but I think for a million-task
>>         job, I
>>              still end up with a huge list of futures. This also assumes
>>         I ignore
>>              the note about synchronization.
>>              * CompletionService allows me to wait for _any_ of the
>>         submitted
>>              tasks, but not for _all_ of them.
>>              * Bonus points for sharing a single ExecutorService and
>> having
>>              "sets" of tasks which can be independently awaited. This
>>         starts to
>>              sound very like FJP, except for the blocking/I/O and the
>>         stream-ness
>>              of my task set.
>>              Mostly, dear list, I'm expecting a one-liner from some API
>>         class
>>              that I've missed because this is NOT an original request, I
>>         just
>>              can't find a standard solution.
>>              Thank you.
>>              S.
>>              _______________________________________________
>>              Concurrency-interest mailing list
>>         Concurrency-interest at cs.oswego.edu
>>         <mailto:Concurrency-interest at cs.oswego.edu>
>>              <mailto:Concurrency-interest at cs.oswego.edu
>>         <mailto:Concurrency-interest at cs.oswego.edu>>
>>         http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>         <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>              <http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>         <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>>
>> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170818/37b0f546/attachment-0001.html>

More information about the Concurrency-interest mailing list