[concurrency-interest] ForkJoinPool not designed for nested Java 8 streams.parallel().forEach( ... )

Aleksey Shipilev aleksey.shipilev at oracle.com
Mon May 5 16:50:42 EDT 2014


Hi Christian,

You are on the correct list.

On 05/05/2014 11:58 PM, Christian Fries wrote:
> Java 8 introduced parallel streams which use a common ForkJoinPool.
> I believe that the implementation of ForkJoinTask has a problem, 
> which becomes relevant in a situation, which is likely to occur if 
> Java 8 parallel streams are used a lot. Specifically the problem 
> arises if one uses a nested Java 8 parallel stream foreach, e.g. as
> in
> 	// Outer loop
> 	IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
> 		// Inner loop
> 		IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
> 			// Work done here loop
> 		});
> 
> 	});

To my knowledge of FJP mechanics, this should work perfectly well, since
submitters from the external loop will participate in the computation
and making progress. My throw-away tests, like:

    @GenerateMicroBenchmark
    public void test(BlackHole bh) {
        IntStream.range(0, TASKS).parallel().forEach(i -> {
            IntStream.range(0, TASKS).parallel().forEach(j -> {
                bh.consume(i + j);
            });
        });
    }

...do not manage to deadlock.


> For a detailed test code run http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/ForkJoinPoolTest.java

This test does not appear to deadlock with (isUseSemaphore = false)
either... Can you check again?

The (isUseSemaphore = true) seems to deplete the worker pool on my
machine, and the underlying FJP can not make progress. As others have
noted, you should use FJP.ManagedBlocker to tell the pool you are about
to block. If I wrap your Semaphore with:

this.concurrentExecutions = new ForkJoinSemaphore(new
Semaphore(concurrentExecusionsLimitInOuterLoop));


  private static class ForkJoinSemaphore implements
ForkJoinPool.ManagedBlocker {
        private final Semaphore s;

        public ForkJoinSemaphore(Semaphore s) {
            this.s = s;
        }

        @Override
        public boolean block() throws InterruptedException {
            s.acquire();
            return true;
        }

        @Override
        public boolean isReleasable() {
            return s.availablePermits() > 0;
        }

        public int availablePermits() {
            return s.availablePermits();
        }

        public void release() {
            s.release();
        }
    }

...and do ForkJoin.managedBlock(concurrentExecutions) instead of
acquire(), the deadlock is also gone.

> That said, it appears as if the current Java implementation of 
> ForkJoinPool was written without having „Nesting“ in mind
> (submitting tasks from a worker tasks to a common pool), but Java 8
> streams makes that kind of nesting a natural thing.

Looking very briefly at this, I think you just have a simple resource
starvation problem, not a FJP bug.

-Aleksey.


More information about the Concurrency-interest mailing list