[concurrency-interest] ConcurrentLinkedBlockingQueue ?

Hanson Char hanson.char at gmail.com
Sun Sep 10 02:58:56 EDT 2006


Some initial load testings (with 100,000 inserts by 10 concurrent threads
into the same queue) shows the ConcurrentLinkedBlockingQueue is consistently
faster than the LinkedBlockingQueue.  If interested, I can also post the
junit tests code.

Hanson

On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
>
> There is a bug in poll(long, TimeUnit).  See below for the fix.
>
> H
>
>     public E poll(long timeout, TimeUnit unit) throws InterruptedException
> {
>         if (timeout < 0)
>             return take();    // treat -ve timeout same as to wait forever
>
>         long t0=0;
>
>
>         for (;;) {
>             E e = q.poll();
>
>             if (e != null)
>                 return e;
>             if (t0 > 0 && System.nanoTime() >= (t0 + unit.toNanos
> (timeout)))
>                 return null;    // time out
>
>             ThreadMarker m = new ThreadMarker(Thread.currentThread());
>             parkq.offer(m);
>             e = q.poll();
>
>             if (e != null) {
>                 m.parked = false;
>                 return e;
>             }
>             t0 = System.nanoTime();
>
>             LockSupport.parkNanos(unit.toNanos(timeout));
>         }
>     }
>
>
> On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
> >
> > One benefit of such ConcurrentLinkedBlockingQueue is that it is
> > unbounded, where as even LinkedBlockingQueue has a max capacity of
> > Integer.MAX_VALUE.
> >
> > Hanson
> >
> >
> > On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
> > >
> > > Hi Doug,
> > >
> > >
> > > >This way works, but reduces concurrency by using a single semaphore,
> > > >so is a bit less scalable than current LinkedBlockingQueue.
> > >
> > > I think I found a better way that seems promising enough to provide
> > > better scalability than LinkedBlockingQueue.   Details below.
> > >
> > > This time the constructs I use include
> > > * an extra ConcurrentLinkedQueue for the parking threads
> > > * volatile for marking if a thread is potentially parked or not
> > > * LockSupport.[un]park[Nano])
> > >
> > > Does this work ?
> > >
> > >
> > > Hanson
> > >
> > > public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
> > >         implements java.io.Serializable
> > > {
> > >     private static class ThreadMarker {
> > >         final Thread thread;
> > >         // assumed parked until found otherwise.
> > >         volatile boolean parked = true;
> > >
> > >         ThreadMarker(Thread thread)
> > >         {
> > >             this.thread = thread;
> > >         }
> > >     }
> > >
> > >     private final ConcurrentLinkedQueue<ThreadMarker> parkq = new
> > > ConcurrentLinkedQueue<ThreadMarker>();
> > >
> > >
> > >     private final ConcurrentLinkedQueue<E> q;
> > >
> > >     public ConcurrentLinkedBlockingQueue() {
> > >         q = new ConcurrentLinkedQueue<E>();
> > >     }
> > >
> > >     public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
> > >         q = new ConcurrentLinkedQueue<E>(c);
> > >     }
> > >
> > >     @Override
> > >     public Iterator<E> iterator() {
> > >         return q.iterator();
> > >     }
> > >
> > >     @Override
> > >     public int size() {
> > >         return q.size();
> > >     }
> > >
> > >     public boolean offer(E e) {
> > >         boolean b = q.offer(e);
> > >
> > >         for (;;)
> > >         {
> > >             ThreadMarker marker = parkq.poll();
> > >
> > >             if (marker == null)
> > >                 break;
> > >             if ( marker.parked)
> > >             {
> > >                 LockSupport.unpark(marker.thread);
> > >                 break;
> > >
> > >             }
> > >         }
> > >         return b;
> > >     }
> > >
> > >     public E peek() {
> > >         return q.peek ();
> > >     }
> > >
> > >     public E poll() {
> > >         return q.poll();
> > >     }
> > >
> > >     /**
> > >      * Retrieves and removes the head of this queue, waiting if
> > > necessary until
> > >      * an element becomes available.
> > >      *
> > >      * @return the head of this queue
> > >      * @throws InterruptedException
> > >      *             if interrupted while waiting
> > >      */
> > >     public E take() throws InterruptedException {
> > >         for (;;) {
> > >             E e = q.poll();
> > >
> > >             if (e != null)
> > >                 return e;
> > >             ThreadMarker m = new ThreadMarker(Thread.currentThread());
> > >
> > >             parkq.offer(m);
> > >             // check again in case there is data race
> > >
> > >             e = q.poll();
> > >
> > >             if (e != null)
> > >             {
> > >                 // data race indeed
> > >                 m.parked = false;
> > >                 return e;
> > >             }
> > >             LockSupport.park ();
> > >
> > >         }
> > >     }
> > >
> > >     /**
> > >      * Retrieves and removes the head of this queue, waiting up to the
> > > specified
> > >      * wait time if necessary for an element to become available.
> > >      *
> > >      * @param timeout
> > >      *            how long to wait before giving up, in units of
> > > <tt>unit</tt>
> > >      * @param unit
> > >      *            a <tt>TimeUnit</tt> determining how to interpret the
> > >      *            <tt>timeout</tt> parameter
> > >      * @return the head of this queue, or <tt>null</tt> if the
> > > specified
> > >      *         waiting time elapses before an element is available
> > >      * @throws InterruptedException
> > >      *             if interrupted while waiting
> > >      */
> > >     public E poll(long timeout, TimeUnit unit) {
> > >
> > >         for (;;) {
> > >             E e = q.poll();
> > >
> > >             if (e != null)
> > >                 return e;
> > >             ThreadMarker m = new ThreadMarker( Thread.currentThread());
> > >
> > >             parkq.offer(m);
> > >             e = q.poll();
> > >
> > >             if (e != null) {
> > >                 m.parked = false;
> > >                 return e;
> > >             }
> > >             LockSupport.parkNanos (unit.toNanos(timeout));
> > >         }
> > >     }
> > > }
> > >
> > >
> >
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060909/7b79fc43/attachment.html 


More information about the Concurrency-interest mailing list