[concurrency-interest] Awaiting a set of tasks on an ExecutorService

Josh Humphries jhump at bluegosling.com
Fri Aug 18 17:06:25 EDT 2017


Sorry that my suggestion got broken into two message. Indeed my first
suggestion was incomplete. But the second suggestion, of using a
CompletionService should suffice. Would you queue up 50 million tasks in
the scenario you describe or do you also need a solution that provides
back-pressure?

Without back-pressure, it's simple enough to submit everything to a
CompletionService and then poll as many as you submitted when done. But
that also will buffer up all results as they complete (and if using a
limited-capacity queue, it could deadlock if it's the same thread
submitting as polling).

If you need backpressure, I'd imagine something more like this:

   - One thread pulling tasks as they come and submitting to the
   CompletionService.
      - Before submitting each task:
         - Acquire a permit from a semaphore that has been setup with the
         maximum number of in-flight/queued tasks you want.
         - Increment a counter (e.g. AtomicLong).
      - After submitting the last task (e.g. end of stream has been
      detected), set a flag (AtomicBoolean) and then also submit a no-op task
      (sentinel) to the CompletionService.
         - Acquire permit and increment counter, even for the no-op task
      - Have a second thread that is awaiting completion of all tasks. It
   is in a loop polling the CompletionService.
      - After polling a task, add exception to list of failures if list is
      not already maxed out (e.g. already extracted 100 failures).
      - Also release a permit back to the semaphore and decrement the
      counter.
      - If counter has reached zero and flag is set, all tasks are
      processed. Done!
      - Otherwise, go back to top of loop, to poll next task.


----
*Josh Humphries*
jhump at bluegosling.com

On Fri, Aug 18, 2017 at 4:54 PM, Shevek <shevek at anarres.org> wrote:

> I can't use CountDownLatch because of the streamy nature of the source. I
> have no idea how many tasks there are going to be up front. I read an
> incoming stream, it turns out there's about 50 million in it, and this is
> going to go up by the usual orders of magnitude.
>
> I could use a List of CountDownLatches each with ... but then I rapidly
> get into "Someone cleverer than me MUST have solved this before" territory.
>
> I can do my own tricks with an AtomicLong and wait()/notify(), but I have
> to make sure the master thread calls get() on the relevant Future each time
> a job finishes, otherwise there isn't a happens-before relationship with
> the master thread, so I'd need the job to put its own Future onto a Deque,
> or something...?
>
> The tasks all return Void, but it's nice to collect the first few
> exceptions (currently there are about 750,000 total exceptions thrown).
>
> S.
>
> On 08/18/2017 01:48 PM, Josh Humphries wrote:
>
>> I think the easiest thing would be to decorate each task to call
>> "latch.countDown()" on a CountDownLatch that is initialized with the total
>> number of tasks. After they are all submitted, the code that wants to wait
>> for them to finish would simply await the latch. This decouples completion
>> of all tasks from the actual ExecutorService that is running them, so you
>> can share the same ExecutorService for multiple, even overlapping, sets of
>> tasks.
>>
>> ----
>> *Josh Humphries*
>> jhump at bluegosling.com <mailto:jhump at bluegosling.com>
>>
>>
>> On Fri, Aug 18, 2017 at 4:42 PM, Shevek <shevek at anarres.org <mailto:
>> shevek at anarres.org>> wrote:
>>
>>     Hi,
>>
>>     I need to build a complicated object on a multiprocessor system. The
>>     target object uses synchronization to guarantee thread-safe access.
>>     However, the individual tasks which build elements of the target
>>     object are expensive, so I farm them out to an ExecutorService. The
>>     question is:
>>
>>     What is the recommended way to submit a stream of tasks to an
>>     ExecutorService, and then at a point, wait until they're all done.
>>
>>     * I don't know the list of tasks up front. I read them iteratively
>>     from a set of files or events.
>>     * A big collection of Future(s) isn't realistic, there are just too
>>     many tasks.
>>     * Reaping completed futures on submission means that I end up with
>>     exceptions in weird places; I really need to gather max 100
>>     suppressed exceptions, and discard the rest.
>>     * ForkJoinPool has invoke() but I think for a million-task job, I
>>     still end up with a huge list of futures. This also assumes I ignore
>>     the note about synchronization.
>>     * CompletionService allows me to wait for _any_ of the submitted
>>     tasks, but not for _all_ of them.
>>
>>     * Bonus points for sharing a single ExecutorService and having
>>     "sets" of tasks which can be independently awaited. This starts to
>>     sound very like FJP, except for the blocking/I/O and the stream-ness
>>     of my task set.
>>
>>     Mostly, dear list, I'm expecting a one-liner from some API class
>>     that I've missed because this is NOT an original request, I just
>>     can't find a standard solution.
>>
>>     Thank you.
>>
>>     S.
>>     _______________________________________________
>>     Concurrency-interest mailing list
>>     Concurrency-interest at cs.oswego.edu
>>     <mailto:Concurrency-interest at cs.oswego.edu>
>>     http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>     <http://cs.oswego.edu/mailman/listinfo/concurrency-interest>
>>
>>
>>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170818/54c49f98/attachment-0001.html>


More information about the Concurrency-interest mailing list