[concurrency-interest] Coroutine-like control transfer?

Howard Lovatt howard.lovatt at gmail.com
Wed Aug 28 21:57:56 EDT 2013


Hi,

I was interested in a similar problem and wrote the microbenchmark below to
investigate. This problem suspends a thread until both its arguments are
ready and then the thread runs and produces a result. I found that double
check locking was the quickest, class DoubleCheck__Sum below. Code may be
helpful.

  -- Howard.

====================================================================================

package cyclicbarriertest;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.lang.System.*;

/**
 * Timing test for cyclic barrier implementations.
 *
 * @author  Howard Lovatt
 */
public class CyclicBarrierTest {
  private static final int seconds = 20; // Might need to tweak this value
to allow enough time for slow versions to complete
  private static final int loops = 1000 * 1000;

  public static void main( final String... args ) throws Exception {
    test( new CyclicBarrierSum() );
    test( new Volatile_____Sum() );
    test( new _____________Sum() );
    test( new Synchronized_Sum() );
    test( new SynchYield___Sum() );
    test( new DoubleCheck__Sum() );
    test( new DbleChYield__Sum() );
    test( new Lock_________Sum() );
    test( new LockDoubleCh_Sum() );
    test( new AtomicRefLongSum() );
    test( new LockAtomicRefSum() );
    test( new BlockingQueueSum() );
    test( new BlockingDequeSum() );
    test( new ArrayBlockingSum() );
  }

  private static void test( final Sum method ) throws Exception {
    gc();
    gc();
    final Callable<Void> setA1 = new Callable<Void>() {
      @Override public Void call() throws Exception {
        for ( int l = 0; l < loops; l++ ) { method.setA1( l ); }
        return null;
      }
    };
    final Callable<Void> setA2 = new Callable<Void>() {
      @Override public Void call() throws Exception {
        for ( int l = 0; l < loops; l++ ) { method.setA2( l ); }
        return null;
      }
    };
    final ExecutorService pool = Executors.newCachedThreadPool();
    final long start = nanoTime();
    pool.submit( setA1 );
    pool.submit( setA2 );
    pool.shutdown();
    final boolean ok = pool.awaitTermination( seconds, TimeUnit.SECONDS );
    final long time = ( nanoTime() - start ) / loops;
    pool.shutdownNow();
    out.println( method + ": result = " + method.getSum() + ", av. time = "
+ ( time / 1000.0 ) +
                     " ms, " + ( ok ? "and terminated OK" : "but failed to
terminate" ) );
  }
}


abstract class Sum {
  protected long sum = 0;

  public static void checkForInterrupt() throws InterruptedException {
    if ( Thread.interrupted() ) { throw new InterruptedException(); }
  }

  public abstract void setA1( long a1 ) throws Exception;

  public abstract void setA2( long a2 ) throws Exception;

  public long getSum() { return sum; }

  @Override public String toString() { return getClass().getSimpleName(); }
}


final class CyclicBarrierSum extends Sum {
  private long a1;
  private long a2;
  private final Runnable summer =
    new Runnable() {
      @Override public void run() { sum += a1 + a2; }
    };
  private final CyclicBarrier sumBarrier = new CyclicBarrier( 2, summer );

  @Override public void setA1( final long a1Arg ) throws Exception {
    a1 = a1Arg;
    sumBarrier.await();
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    a2 = a2Arg;
    sumBarrier.await();
  }
}


final class Volatile_____Sum extends Sum {
  private volatile Long a1 = null;
  private volatile Long a2 = null;

  @Override public void setA1( final long a1Arg ) throws Exception {
    for ( ;; ) {
      if ( a1 == null ) {
        a1 = a1Arg;
        checkFinished();
        return;
      }
      checkForInterrupt();
    }
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    for ( ;; ) {
      if ( a2 == null ) {
        a2 = a2Arg;
        checkFinished();
        return;
      }
      checkForInterrupt();
    }
  }

  private synchronized void checkFinished() {
    if ( ( a1 == null ) || ( a2 == null ) ) { return; }
    sum += a1 + a2;
    a1 = a2 = null;
  }
}


final class _____________Sum extends Sum { // Could fail because a set
could be missed, but works on OSX
  private Long a1 = null;
  private Long a2 = null;

  @Override public void setA1( final long a1Arg ) throws Exception {
    for ( ;; ) {
      if ( a1 == null ) {
        a1 = a1Arg;
        checkFinished();
        return;
      }
      checkForInterrupt();
    }
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    for ( ;; ) {
      if ( a2 == null ) {
        a2 = a2Arg;
        checkFinished();
        return;
      }
      checkForInterrupt();
    }
  }

  private synchronized void checkFinished() {
    if ( ( a1 == null ) || ( a2 == null ) ) { return; }
    sum += a1 + a2;
    a1 = a2 = null;
  }
}


abstract class AbstractSynchronized_Sum extends Sum {
  protected Long a1 = null;
  protected Long a2 = null;

  public synchronized boolean maybeSetA1( final long a1Arg ) {
    if ( a1 == null ) {
      a1 = a1Arg;
      checkFinished();
      return true;
    }
    return false;
  }

  public synchronized boolean maybeSetA2( final long a2Arg ) {
    if ( a2 == null ) {
      a2 = a2Arg;
      checkFinished();
      return true;
    }
    return false;
  }

  private void checkFinished() {
    if ( ( a1 == null ) || ( a2 == null ) ) { return; }
    sum += a1 + a2;
    a1 = a2 = null;
  }
}


final class Synchronized_Sum extends AbstractSynchronized_Sum {
  @Override public void setA1( final long a1Arg ) throws Exception {
    for ( ;; ) {
      if ( maybeSetA1( a1Arg ) ) { return; }
      checkForInterrupt();
    }
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    for ( ;; ) {
      if ( maybeSetA2( a2Arg ) ) { return; }
      checkForInterrupt();
    }
  }
}


final class SynchYield___Sum extends AbstractSynchronized_Sum {
  @Override public void setA1( final long a1Arg ) throws Exception {
    for ( ;; ) {
      if ( maybeSetA1( a1Arg ) ) { return; }
      checkForInterrupt();
      Thread.yield();
    }
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    for ( ;; ) {
      if ( maybeSetA2( a2Arg ) ) { return; }
      checkForInterrupt();
      Thread.yield();
    }
  }
}


final class DoubleCheck__Sum extends AbstractSynchronized_Sum {
  @Override public void setA1( final long a1Arg ) throws Exception {
    for ( ;; ) {
      if ( a1 == null ) {
        if ( maybeSetA1( a1Arg ) ) { return; }
      }
      checkForInterrupt();
    }
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    for ( ;; ) {
      if ( a2 == null ) {
        if ( maybeSetA2( a2Arg ) ) { return; }
      }
      checkForInterrupt();
    }
  }
}

final class DbleChYield__Sum extends AbstractSynchronized_Sum {
  @Override public void setA1( final long a1Arg ) throws Exception {
    for ( ;; ) {
      if ( a1 == null ) {
        if ( maybeSetA1( a1Arg ) ) { return; }
      }
      checkForInterrupt();
      Thread.yield();
    }
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    for ( ;; ) {
      if ( a2 == null ) {
        if ( maybeSetA2( a2Arg ) ) { return; }
      }
      checkForInterrupt();
      Thread.yield();
    }
  }
}


abstract class AbstractLock_________Sum extends Sum {
  protected Long a1 = null;
  protected Long a2 = null;
  private final Lock lock = new ReentrantLock();

  public boolean maybeSetA1( final long a1Arg ) {
    lock.lock();
    try {
      if ( a1 == null ) {
        a1 = a1Arg;
        checkFinished();
        return true;
      }
      return false;
    } finally {
      lock.unlock();
    }
  }

  public boolean maybeSetA2( final long a2Arg ) {
    lock.lock();
    try {
      if ( a2 == null ) {
        a2 = a2Arg;
        checkFinished();
        return true;
      }
      return false;
    } finally {
      lock.unlock();
    }
  }

  private void checkFinished() {
    if ( ( a1 == null ) || ( a2 == null ) ) { return; }
    sum += a1 + a2;
    a1 = a2 = null;
  }
}


final class Lock_________Sum extends AbstractLock_________Sum {
  @Override public void setA1( final long a1Arg ) throws Exception {
    for ( ;; ) {
      if ( maybeSetA1( a1Arg ) ) { return; }
      checkForInterrupt();
    }
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    for ( ;; ) {
      if ( maybeSetA2( a2Arg ) ) { return; }
      checkForInterrupt();
    }
  }
}


final class LockDoubleCh_Sum extends AbstractLock_________Sum {
  @Override public void setA1( final long a1Arg ) throws Exception {
    for ( ;; ) {
      if ( a1 == null ) {
        if ( maybeSetA1( a1Arg ) ) { return; }
      }
      checkForInterrupt();
    }
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    for ( ;; ) {
      if ( a2 == null ) {
        if ( maybeSetA2( a2Arg ) ) { return; }
      }
      checkForInterrupt();
    }
  }
}


final class AtomicRefLongSum extends Sum {
  private final AtomicReference<Long> a1 = new AtomicReference<>( null );
  private final AtomicReference<Long> a2 = new AtomicReference<>( null );

  @Override public void setA1( final long a1Arg ) throws Exception {
    while ( !a1.compareAndSet( null, a1Arg ) ) {
      checkForInterrupt();
    }
    checkFinished();
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    while ( !a2.compareAndSet( null, a2Arg ) ) {
      checkForInterrupt();
    }
    checkFinished();
  }

  private synchronized void checkFinished() {
    if ( ( a1.get() == null ) || ( a2.get() == null ) ) { return; }
    final long a1T = a1.getAndSet( null );
    final long a2T = a2.getAndSet( null );
    sum += a1T + a2T;
  }
}


final class LockAtomicRefSum extends Sum {
  private final AtomicReference<Long> a1 = new AtomicReference<>( null );
  private final AtomicReference<Long> a2 = new AtomicReference<>( null );
  private final Lock lock = new ReentrantLock();

  @Override public void setA1( final long a1Arg ) throws Exception {
    while ( !a1.compareAndSet( null, a1Arg ) ) {
      checkForInterrupt();
    }
    checkFinished();
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    while ( !a2.compareAndSet( null, a2Arg ) ) {
      checkForInterrupt();
    }
    checkFinished();
  }

  private void checkFinished() {
    lock.lock();
    try {
      if ( ( a1.get() == null ) || ( a2.get() == null ) ) { return; }
      final long a1T = a1.getAndSet( null );
      final long a2T = a2.getAndSet( null );
      sum += a1T + a2T;
    } finally {
      lock.unlock();
    }
  }
}


abstract class AbstractBlockingQueueSum extends Sum {
  protected final BlockingQueue<Long> a1;
  protected final BlockingQueue<Long> a2;

  protected AbstractBlockingQueueSum( final BlockingQueue<Long> a1Arg,
                                      final BlockingQueue<Long> a2Arg ) {
    a1 = a1Arg;
    a2 = a2Arg;
  }

  @Override public void setA1( final long a1Arg ) throws Exception {
    a1.put( a1Arg );
    checkFinished();
  }

  @Override public void setA2( final long a2Arg ) throws Exception {
    a2.put( a2Arg );
    checkFinished();
  }

  private synchronized void checkFinished() throws Exception {
    if ( ( a1.peek() == null ) || ( a2.peek() == null ) ) { return; }
    final long a1T = a1.take();
    final long a2T = a2.take();
    sum += a1T + a2T;
  }
}


final class BlockingQueueSum extends AbstractBlockingQueueSum {
  public BlockingQueueSum() {
    super( new LinkedBlockingQueue<Long>(), new LinkedBlockingQueue<Long>()
);
  }
}


final class BlockingDequeSum extends AbstractBlockingQueueSum {
  public BlockingDequeSum() {
    super( new LinkedBlockingDeque<Long>(), new LinkedBlockingDeque<Long>()
);
  }
}


final class ArrayBlockingSum extends AbstractBlockingQueueSum {
  public ArrayBlockingSum() {
    super( new ArrayBlockingQueue<Long>( 10 ), new
ArrayBlockingQueue<Long>( 10 ) );
  }
}


On 29 August 2013 07:36, Charles Oliver Nutter <headius at headius.com> wrote:

> I've been struggling for years to find the right (i.e. fastest) way to
> do a coroutine-like control transfer between two threads, and I
> figured I'd try asking here again...
>
> What I need to emulate is a true coroutine, where a thread transfers
> execution (ip and stack) to a different piece of code, that code runs
> for a while, and then execution transfers back. There's always exactly
> one piece of code executing at a given time, since there's literally
> only one thread bouncing between stacks.
>
> On the JVM, where we don't have real coroutines, the best we can do is
> to emulate this with threads (ignoring bytecode-weaving stack tricks
> like Kilim for the moment). I do not believe any of the structures in
> j.u.concurrent currently have this exact pattern in mind.
>
> The patterns I've used to emulate this:
>
> 1. Explicit park/unpark.
>
> The parent thread starts up the coroutine, which immediately parks
> itself. Parent thread wakes up coroutine by unparking it and giving it
> an initial value, at which point parent parks itself. Child runs for a
> bit, then unparks parent, gives it a value, and parks itself.
>
> This is logically closest to what I want, but the park/unpark
> operations are too expensive for fast transfers. This mechanism ended
> up being the slowest way when measuring raw transfer rate (i.e. very
> little work being done between transfers)
>
> 2. SynchronousQueue
>
> Instead of using explicit parking and unparking, the parent pushes a
> value on child's sync queue, and then waits on its own sync queue.
> Child signals parent by pushing a value on parent's sync queue and
> then waits on its own.
>
> This was about 3x faster than explicit park/unpark.
>
> 3. Exchanger
>
> Substitute Exchanger for SynchronousQueue, where the "take" operation
> just exchanges null and the "put" operation ignores the result. This
> was the fastest...around 15x faster than explicit park/unpark and 5x
> faster than SynchronousQueue
>
> 4. Just spin
>
> Do nothing but spin on a null volatile field waiting for it to become
> non-null. Another 3x faster than Exchanger.
>
> ...
>
> It seems like I'm building a conceptually simple pattern on top of
> structures not designed for it (or rather, designed for more complex
> patterns). In SynchronousQueue's case, I have to do two put/take
> operations for every transfer. With Exchanger, two exchanges. What I
> really want is something like LockSupport.park/unpark that's more like
> LockSupport.parkMeAndUnparkHimPassingValue(value), explicitly passing
> control and a value to the other thread.
>
> What am I missing? Do I need to hand-roll this?
>
> - Charlie
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>



-- 
  -- Howard.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20130829/c7267c85/attachment-0001.html>


More information about the Concurrency-interest mailing list