[concurrency-interest] ForkJoinPool.managedBlock() not spawning new thread

Alex Lam S.L. alexlamsl at gmail.com
Thu Jul 19 13:31:52 EDT 2012


I have cut it down to a simpler (but arguably not equivalent) example
and see no effective difference in behaviour (blocked in
ForkJoinTask.join() instead of BlockingQueue.take()):

  ForkJoinPool pool = new ForkJoinPool();
  BlockingQueue<Future<V>> queue = new LinkedBlockingQueue<>();
  Callable<V> workload = ...;

  final int N = ...;

  pool.execute(new Runnable() {
    public void run() {
      for (int i = 0; i < N; i++)
        queue.add(pool.submit(workload));

      for (int i = 0; i < N; i++) {
        process(queue.poll().join());
      }
    }
  });

I have also made some measurements using AtomicXXX and
System.nanoTime() on the first version of the code, i.e. the one which
uses ManagedBlocker.

Under my specific workload, ManagedBlocker.block() is being called
~75% of the time, with an average duration of ~3.5ms when called, i.e.
does not count when ManagedBlocker.isReleasable() returns true.


Alex.



On Thu, Jul 19, 2012 at 4:41 PM, Alex Lam S.L. <alexlamsl at gmail.com> wrote:
> I am trying to get my application to use ForkJoinPool. Specifically, I
> want it to automatically occupy all 8 CPUs even when one of the thread
> is blocked by BlockingQueue.take() - below is an implementation based
> on what I can understand from the javadocs:
>
>
>   ForkJoinPool pool = new ForkJoinPool();
>   BlockingQueue<V> queue = new LinkedBlockingQueue<>();
>   Callable<V> workload = ...;
>
>   class QueueBlocker<V> implements ForkJoinPool.ManagedBlocker {
>     private V value;
>
>     public V get() {
>       return value;
>     }
>
>     public boolean block() throws InterruptedException {
>       value = queue.take();
>       return true;
>     }
>
>     public boolean isReleasable() {
>       return value != null || (value = queue.poll()) != null;
>     }
>   }
>
>   class Task<V> extends ForkJoinTask<Void> {
>     private final Callable<V> callable;
>
>     Task(Callable<V> callable) {
>       this.callable = callable;
>     }
>
>     public Void getRawResult() {
>       return null;
>     }
>
>     protected void setRawResultVoid value) {
>     }
>
>     protected boolean exec() {
>       try {
>         queue.add(callable.call());
>         return true;
>       } catch (Exception ex) {
>         throw new RuntimeException(ex);
>       }
>     }
>   }
>
>   final int N = ...;
>
>   pool.execute(new Runnable() {
>     public void run() {
>       for (int i = 0; i < N; i++)
>         pool.execute(new Task<V>(workload));
>
>       for (int i = 0; i < N; i++) {
>         final QueueBlocker blocker = new QueueBlocker();
>         ForkJoinPool.managedBlock(blocker);
>         process(blocker.get());
>       }
>     }
>   });
>
>
> Now on my 8-core machine, I can see from JConsole that FJ pool have 8
> threads, with one of them consistently blocking on queue.take(), thus
> only leaving 7 CPUs busy.
>
> Is this behaviour intentional? If so, what am I doing wrong here?
>
>
> Thanks,
> Alex.


More information about the Concurrency-interest mailing list