[concurrency-interest] ConcurrentLinkedBlockingQueue ?

Hanson Char hanson.char at gmail.com
Thu Apr 12 04:53:10 EDT 2007


> I plugged this into some of our test programs, and noticed that you
> have a liveness failure (i.e., freeze) surrounding park/unpark
> logic.

Thanks for pointing this out.  The liveness failure is now fixed:

http://svn.sourceforge.net/viewvc/beanlib/trunk/beanlib/src/net/sf/beanlib/util/concurrent/ConcurrentLinkedBlockingQueue.java?view=markup

I plugged CLBQ to every test programs (that contains LBQ) I can find
in http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/loops/
and they all run successfully.

Hanson Char

On 4/8/07, Doug Lea <dl at cs.oswego.edu> wrote:
> Hanson Char wrote:
> > Hi Doug,
> >
> >
> > I've been asked recently (again) if the proposed CLBQ in it's current
> > form is ready to be included into Java 7.
> >
>
> (Sorry for the delays on this!)
>
> I plugged this into some of our test programs, and noticed that you
> have a liveness failure (i.e., freeze) surrounding park/unpark
> logic. You can find the performance-oriented  tests in
> http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/loops/
> Try the ones that embed or are scripted to run with LinkedBlockingQueue:
> (ProducerConsumerLoops,  SingleProducerMultipleConsumerLoops,
> MultipleProducersSingleConsumerLoops, ConcurrentQueueLoops,
> CancelledProducerConsumerLoops, and so on. There are also
> further tests you can adapt from tests/tck and tests/jtreg.
> As indicated by Brian et al's JavaOne slides, you can never
> have too many tests for these kinds of classes ...
>
> I haven't tried to fully diagnose cause, but I think it might be due
> to failing to clean out parkq node if recheck succeeds.
>
> Even with a workaround for this, this version of CLBQ hits the
> same limitation as some others I've explored: On machines
> with more than a few CPUs, CAS contention/failures start dominating
> performance, especially when run with more threads that CPUs
> (in which case CAS failures tend to propagate).
> So, under some test loads that are not uncommon for blocking queues,
> it becomes increasingly much slower than LBQ. There are good ways
> out of this. Investigating them (in my usual way of letting the
> problem sit in background mode for a few months :-) has led
> preliminary release to be delayed.
>
> Also, when contemplating releasing yet another queue implementation,
> it occurred to us that it would be worthwhile to also add support
> for some functionality that people have said they needed but could
> not get with LBQ. So the upcoming LinkedTransferQueue implements
> the following interface that also provides a synchronous-style mode.
> Comments and suggestions on this would be welcome!
>
>
> /*
>   * Written by Doug Lea with assistance from members of JCP JSR-166
>   * Expert Group and released to the public domain, as explained at
>   * http://creativecommons.org/licenses/publicdomain
>   */
>
> //package jsr166y;
> import java.util.concurrent.*;
>
> /**
>   * A {@link BlockingQueue} in which producers may wait for consumers
>   * to receive elements.  A <tt>TransferQueue</tt> may be useful for
>   * example in message passing applications in which producers
>   * sometimes (using method <tt>transfer</tt>) await receipt of
>   * elements by consumers invoking <tt>take</tt> or <tt>poll<tt>, while
>   * at other times enqueue elements (via method <tt>put</tt>) without
>   * waiting for receipt . Non-blocking and time-out versions of
>   * <tt>tryTransfer</tt> are also available.
>   *
>   * <p>Like any <tt>BlockingQueue</tt>, a <tt>TransferQueue</tt> may be
>   * capacity bounded. If so, an attempted <tt>transfer</tt> operation
>   * may initially block waiting for available space, and/or
>   * subsequently block waiting for reception by a consumer.  Note that
>   * in a queue with zero capacity, such as {@link SynchronousQueue},
>   * <tt>put</tt> and <tt>transfer</tt> are effectively synonymous.
>   *
>   * <p>This interface is a member of the
>   * <a href="{@docRoot}/../technotes/guides/collections/index.html">
>   * Java Collections Framework</a>.
>   *
>   * @since 1.7
>   * @author Doug Lea
>   * @param <E> the type of elements held in this collection
>   */
> interface TransferQueue<E> extends BlockingQueue<E> {
>      /**
>       * Transfers the specified element if there exists a consumer
>       * already waiting to receive it, otherwise returning <tt>false</tt>
>       * without enqueuing the element.
>       *
>       * @param e the element to transfer
>       * @return <tt>true</tt> if the element was transferred, else
>       *         <tt>false</tt>
>       * @throws ClassCastException if the class of the specified element
>       *         prevents it from being added to this queue
>       * @throws NullPointerException if the specified element is null
>       * @throws IllegalArgumentException if some property of the specified
>       *         element prevents it from being added to this queue
>       */
>      boolean tryTransfer(E e);
>
>      /**
>       * Inserts the specified element into this queue, waiting if necessary
>       * for space to become available, and subsequently waiting until
>       * the element is dequeued by a consumer invoking <tt>take</tt> or
>       * <tt>poll</tt>.
>       *
>       * @param e the element to transfer
>       * @throws InterruptedException if interrupted while waiting
>       * @throws ClassCastException if the class of the specified element
>       *         prevents it from being added to this queue
>       * @throws NullPointerException if the specified element is null
>       * @throws IllegalArgumentException if some property of the specified
>       *         element prevents it from being added to this queue
>       */
>      void transfer(E e) throws InterruptedException;
>
>      /**
>       * Inserts the specified element into this queue, waiting up to the
>       * specified wait time if necessary for space to become available
>       * and/or for the element to be dequeued.
>       *
>       * @param e the element to transfer
>       * @param timeout how long to wait before giving up, in units of
>       *        <tt>unit</tt>
>       * @param unit a <tt>TimeUnit</tt> determining how to interpret the
>       *        <tt>timeout</tt> parameter
>       * @return <tt>true</tt> if successful, or <tt>false</tt> if
>       *         the specified waiting time elapses before completion
>       * @throws InterruptedException if interrupted while waiting
>       * @throws ClassCastException if the class of the specified element
>       *         prevents it from being added to this queue
>       * @throws NullPointerException if the specified element is null
>       * @throws IllegalArgumentException if some property of the specified
>       *         element prevents it from being added to this queue
>       */
>      boolean tryTransfer(E e, long timeout, TimeUnit unit)
>          throws InterruptedException;
>
>      /**
>       * Returns true if there is at least one consumer waiting to
>       * dequeue an element via <tt>take</tt> or <tt>poll</tt>. The
>       * return value represents a momentary state of affairs, that
>       * may be useful for monitoring and heuristics, but not
>       * for synchronization control.
>       * @return true if there is at least one waiting consumer.
>       */
>      boolean hasWaitingConsumer();
>
>
>      /**
>       * Returns the number of consumers waiting to dequeue elements
>       * via <tt>take</tt> or <tt>poll</tt>. The return value represents
>       * a momentary state of affairs, that may be useful for monitoring
>       * and heuristics, but not for synchronization control. Implementations
>       * of this method are likely to be noticeably slower than
>       * those for <tt>hasWaitingConsumer</tt>.
>       * @return the number of consumers waiting to dequeue elements
>       */
>      int getWaitingConsumerCount();
> }
>
>
>
>
>
>
>


More information about the Concurrency-interest mailing list