[concurrency-interest] ConcurrentLinkedBlockingQueue ?

Doug Lea dl at cs.oswego.edu
Sun Apr 8 06:49:01 EDT 2007


Hanson Char wrote:
> Hi Doug,
> 
> 
> I've been asked recently (again) if the proposed CLBQ in it's current 
> form is ready to be included into Java 7.
>     

(Sorry for the delays on this!)

I plugged this into some of our test programs, and noticed that you
have a liveness failure (i.e., freeze) surrounding park/unpark
logic. You can find the performance-oriented  tests in
http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/loops/
Try the ones that embed or are scripted to run with LinkedBlockingQueue:
(ProducerConsumerLoops,  SingleProducerMultipleConsumerLoops,
MultipleProducersSingleConsumerLoops, ConcurrentQueueLoops,
CancelledProducerConsumerLoops, and so on. There are also
further tests you can adapt from tests/tck and tests/jtreg.
As indicated by Brian et al's JavaOne slides, you can never
have too many tests for these kinds of classes ...

I haven't tried to fully diagnose cause, but I think it might be due
to failing to clean out parkq node if recheck succeeds.

Even with a workaround for this, this version of CLBQ hits the
same limitation as some others I've explored: On machines
with more than a few CPUs, CAS contention/failures start dominating
performance, especially when run with more threads that CPUs
(in which case CAS failures tend to propagate).
So, under some test loads that are not uncommon for blocking queues,
it becomes increasingly much slower than LBQ. There are good ways
out of this. Investigating them (in my usual way of letting the
problem sit in background mode for a few months :-) has led
preliminary release to be delayed.

Also, when contemplating releasing yet another queue implementation,
it occurred to us that it would be worthwhile to also add support
for some functionality that people have said they needed but could
not get with LBQ. So the upcoming LinkedTransferQueue implements
the following interface that also provides a synchronous-style mode.
Comments and suggestions on this would be welcome!


/*
  * Written by Doug Lea with assistance from members of JCP JSR-166
  * Expert Group and released to the public domain, as explained at
  * http://creativecommons.org/licenses/publicdomain
  */

//package jsr166y;
import java.util.concurrent.*;

/**
  * A {@link BlockingQueue} in which producers may wait for consumers
  * to receive elements.  A <tt>TransferQueue</tt> may be useful for
  * example in message passing applications in which producers
  * sometimes (using method <tt>transfer</tt>) await receipt of
  * elements by consumers invoking <tt>take</tt> or <tt>poll<tt>, while
  * at other times enqueue elements (via method <tt>put</tt>) without
  * waiting for receipt . Non-blocking and time-out versions of
  * <tt>tryTransfer</tt> are also available.
  *
  * <p>Like any <tt>BlockingQueue</tt>, a <tt>TransferQueue</tt> may be
  * capacity bounded. If so, an attempted <tt>transfer</tt> operation
  * may initially block waiting for available space, and/or
  * subsequently block waiting for reception by a consumer.  Note that
  * in a queue with zero capacity, such as {@link SynchronousQueue},
  * <tt>put</tt> and <tt>transfer</tt> are effectively synonymous.
  *
  * <p>This interface is a member of the
  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
  * Java Collections Framework</a>.
  *
  * @since 1.7
  * @author Doug Lea
  * @param <E> the type of elements held in this collection
  */
interface TransferQueue<E> extends BlockingQueue<E> {
     /**
      * Transfers the specified element if there exists a consumer
      * already waiting to receive it, otherwise returning <tt>false</tt>
      * without enqueuing the element.
      *
      * @param e the element to transfer
      * @return <tt>true</tt> if the element was transferred, else
      *         <tt>false</tt>
      * @throws ClassCastException if the class of the specified element
      *         prevents it from being added to this queue
      * @throws NullPointerException if the specified element is null
      * @throws IllegalArgumentException if some property of the specified
      *         element prevents it from being added to this queue
      */
     boolean tryTransfer(E e);

     /**
      * Inserts the specified element into this queue, waiting if necessary
      * for space to become available, and subsequently waiting until
      * the element is dequeued by a consumer invoking <tt>take</tt> or
      * <tt>poll</tt>.
      *
      * @param e the element to transfer
      * @throws InterruptedException if interrupted while waiting
      * @throws ClassCastException if the class of the specified element
      *         prevents it from being added to this queue
      * @throws NullPointerException if the specified element is null
      * @throws IllegalArgumentException if some property of the specified
      *         element prevents it from being added to this queue
      */
     void transfer(E e) throws InterruptedException;

     /**
      * Inserts the specified element into this queue, waiting up to the
      * specified wait time if necessary for space to become available
      * and/or for the element to be dequeued.
      *
      * @param e the element to transfer
      * @param timeout how long to wait before giving up, in units of
      *        <tt>unit</tt>
      * @param unit a <tt>TimeUnit</tt> determining how to interpret the
      *        <tt>timeout</tt> parameter
      * @return <tt>true</tt> if successful, or <tt>false</tt> if
      *         the specified waiting time elapses before completion
      * @throws InterruptedException if interrupted while waiting
      * @throws ClassCastException if the class of the specified element
      *         prevents it from being added to this queue
      * @throws NullPointerException if the specified element is null
      * @throws IllegalArgumentException if some property of the specified
      *         element prevents it from being added to this queue
      */
     boolean tryTransfer(E e, long timeout, TimeUnit unit)
         throws InterruptedException;

     /**
      * Returns true if there is at least one consumer waiting to
      * dequeue an element via <tt>take</tt> or <tt>poll</tt>. The
      * return value represents a momentary state of affairs, that
      * may be useful for monitoring and heuristics, but not
      * for synchronization control.
      * @return true if there is at least one waiting consumer.
      */
     boolean hasWaitingConsumer();


     /**
      * Returns the number of consumers waiting to dequeue elements
      * via <tt>take</tt> or <tt>poll</tt>. The return value represents
      * a momentary state of affairs, that may be useful for monitoring
      * and heuristics, but not for synchronization control. Implementations
      * of this method are likely to be noticeably slower than
      * those for <tt>hasWaitingConsumer</tt>.
      * @return the number of consumers waiting to dequeue elements
      */
     int getWaitingConsumerCount();
}








More information about the Concurrency-interest mailing list