[concurrency-interest] Low latency queue of length 1

Howard Lovatt howard.lovatt at gmail.com
Mon Dec 19 21:09:26 EST 2011


Hi,

I saw this post:

http://vanillajava.blogspot.com/2011/11/java-puzzle-low-latency-queue.html

and experimented myself with various implementations of a producer consumer
queue (which I have an interest in for passing arguments to methods run in
parallel). My test code is:

public final class HangingTest {
  private static final long loops = 1000 * 1000;

  private static final long poison = 0;

  public static void main( final String... notUsed ) throws
InterruptedException {
    test( new SynchronousPT(), true );
    test( new SynchronousPT(), false );
    test( new BlockingPT(), true );
    test( new BlockingPT(), false );
    test( new AtomicPT(), true );
    test( new AtomicPT(), false );
    test( new VolatilePT(), true );
    test( new VolatilePT(), false );
    test( new NothingPT(), true );
    test( new NothingPT(), false );
  }

  private static void test( final AbstractPT pT, final boolean noError )
      throws InterruptedException {
    System.gc();
    System.gc();
    final class Put implements Callable<Void> {
      final boolean isMax;
      Put( final boolean isMax ) { this.isMax = isMax; }
      @Override public Void call() throws InterruptedException {
        if ( !noError ) { pT.put( poison ); }
        for ( long l = poison + 1; l < loops; l++ ) {
          pT.put( isMax ? l : -l );
        }
        return null;
      }
    }

    final class Take implements Callable<Void> {
      @Override public Void call() throws InterruptedException,
IllegalStateException {
        for ( long max = poison, min = poison;; ) {
          final long value = pT.take();
          if ( value >= 0 ) {
            if ( value <= max ) { throw new IllegalStateException(); }
            max = value;
          } else {
            if ( value >= min ) { throw new IllegalStateException(); }
            min = value;
          }
        }
      }
    }
    final ExecutorService pool = Executors.newCachedThreadPool();
    final long start = System.nanoTime();
    try {
//      pool.invokeAny( Arrays.asList( ... ) ); // Does not work!
      myInvokeAny( pool, new Put( true ), new Put( false ), new Take(), new
Take() );
    } catch ( final ExecutionException e ) {
      System.err.println( e );
    }
    final long finish = System.nanoTime();
    System.out.println( pT.toString() + " (" + noError + ")" + " took on
average " +
                            ( ( finish - start ) / loops ) + " ns" );
    pool.shutdown();
  }

  // Not same semantics as ExecutorService invokeAny!
  @SafeVarargs private static void myInvokeAny( final ExecutorService pool,
                                                final Callable<Void>...
callables )
      throws InterruptedException, ExecutionException {
    final List<Future<Void>> futures = new ArrayList<Future<Void>>();
    for ( final Callable<Void> callable : callables ) {
      futures.add( pool.submit( callable ) );
    }
    for ( ;; ) {
      for ( final Future<Void> future : futures ) {
        if ( future.isDone() ) {
          for ( final Future<Void> toCancel : futures ) {
            toCancel.cancel( true );
          }
          for ( final Future<Void> toTest : futures ) {
            if ( !toTest.isCancelled() ) {
              toTest.get(); // Test for any exceptions
            }
          }
          return;
        }
      }
      Thread.sleep( 0 ); // Wait for other threads
    }
  }

  private abstract static class AbstractPT {
    abstract void put( final long value ) throws InterruptedException;

    abstract long take() throws InterruptedException;

    @Override public final String toString() { return
getClass().getSimpleName(); }
  }

  private static final class SynchronousPT extends AbstractPT {
    final BlockingQueue<Long> queue = new SynchronousQueue<Long>();

    @Override void put( final long value ) throws InterruptedException {
queue.put( value ); }

    @Override long take() throws InterruptedException { return
queue.take(); }
  }

  private static final class BlockingPT extends AbstractPT {
    final BlockingQueue<Long> queue = new ArrayBlockingQueue<Long>( 1 );

    @Override void put( final long value ) throws InterruptedException {
queue.put( value ); }

    @Override long take() throws InterruptedException { return
queue.take(); }
  }

  private static final class AtomicPT extends AbstractPT {
    final AtomicReference<Long> queue = new AtomicReference<Long>();

    @Override void put( final long value ) throws InterruptedException {
      while ( !queue.compareAndSet( null, value ) ) {
        Thread.sleep( 0 ); // Wait for other thread
      }
    }

    @Override long take() throws InterruptedException {
      Long t;
      do {
        Thread.sleep( 0 ); // Wait for other thread
        t = queue.getAndSet( null );
      } while ( t == null );
      return t;
    }
  }

  private static final class VolatilePT extends AbstractPT {
    volatile Long queue = null;

    @Override void put( final long value ) throws InterruptedException {
      while ( queue != null ) {
        Thread.sleep( 0 ); // Wait for other thread
      }
      queue = value;
    }

    @Override long take() throws InterruptedException {
      Long t;
      do {
        Thread.sleep( 0 ); // Wait for other thread
        t = queue;
      } while ( t == null );
      queue = null;
      return t;
    }
  }

  private static final class NothingPT extends AbstractPT {
    Long queue = null;

    @Override void put( final long value ) throws InterruptedException {
      while ( queue != null ) {
        Thread.sleep( 0 ); // Wait for other thread
      }
      queue = value;
    }

    @Override long take() throws InterruptedException {
      Long t;
      do {
        Thread.sleep( 0 ); // Wait for other thread
        t = queue;
      } while ( t == null );
      queue = null;
      return t;
    }
  }
}

The above gives:

SynchronousPT (true) took on average 2759 ns
SynchronousPT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
BlockingPT (true) took on average 16203 ns
BlockingPT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
AtomicPT (true) took on average 562 ns
AtomicPT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
VolatilePT (true) took on average 492 ns
VolatilePT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
NothingPT (true) took on average 549 ns
NothingPT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
BUILD SUCCESSFUL (total time: 21 seconds)

when run from Netbeans via Ant on my quad core MacBookPro running a 64 bit
server JVM (JDK7).

I noticed that:

   1. Atomic, Volatile, and Nothing were all about the same time. I was
   surprised that Nothing wasn't quicker than the others (also see next point)?
   2. I don't no why Volatile and Nothing work! There is
   no synchronisation or similar?
   3. I don't no why sleep is needed to prevent hanging? If I remove any of
   the calls to sleep in the above code the program hangs.
   4. I don't no why ExecutorService.invokeAny didn't work? It hung.
   5. Why is Synchronous so slow?
   6. Why is Blocking even slower?

Can anyone shed light on the above notes? Do you get the same results on
your machine?

Thanks in advance for any words of wisdom,

  -- Howard.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20111220/3f42edaf/attachment-0001.html>


More information about the Concurrency-interest mailing list