[concurrency-interest] ConcurrentLinkedBlockingQueue ?

Hanson Char hanson.char at gmail.com
Sun Sep 10 02:21:55 EDT 2006


There is a bug in poll(long, TimeUnit).  See below for the fix.

H

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        if (timeout < 0)
            return take();    // treat -ve timeout same as to wait forever
        long t0=0;

        for (;;) {
            E e = q.poll();

            if (e != null)
                return e;
            if (t0 > 0 && System.nanoTime() >= (t0 + unit.toNanos(timeout)))
                return null;    // time out
            ThreadMarker m = new ThreadMarker(Thread.currentThread());
            parkq.offer(m);
            e = q.poll();

            if (e != null) {
                m.parked = false;
                return e;
            }
            t0 = System.nanoTime();
            LockSupport.parkNanos(unit.toNanos(timeout));
        }
    }


On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
>
> One benefit of such ConcurrentLinkedBlockingQueue is that it is unbounded,
> where as even LinkedBlockingQueue has a max capacity of Integer.MAX_VALUE.
>
> Hanson
>
>
> On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
> >
> > Hi Doug,
> >
> >
> > >This way works, but reduces concurrency by using a single semaphore,
> > >so is a bit less scalable than current LinkedBlockingQueue.
> >
> > I think I found a better way that seems promising enough to provide
> > better scalability than LinkedBlockingQueue.   Details below.
> >
> > This time the constructs I use include
> > * an extra ConcurrentLinkedQueue for the parking threads
> > * volatile for marking if a thread is potentially parked or not
> > * LockSupport.[un]park[Nano])
> >
> > Does this work ?
> >
> >
> > Hanson
> >
> > public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
> >         implements java.io.Serializable
> > {
> >     private static class ThreadMarker {
> >         final Thread thread;
> >         // assumed parked until found otherwise.
> >         volatile boolean parked = true;
> >
> >         ThreadMarker(Thread thread)
> >         {
> >             this.thread = thread;
> >         }
> >     }
> >
> >     private final ConcurrentLinkedQueue<ThreadMarker> parkq = new
> > ConcurrentLinkedQueue<ThreadMarker>();
> >
> >
> >     private final ConcurrentLinkedQueue<E> q;
> >
> >     public ConcurrentLinkedBlockingQueue() {
> >         q = new ConcurrentLinkedQueue<E>();
> >     }
> >
> >     public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
> >         q = new ConcurrentLinkedQueue<E>(c);
> >     }
> >
> >     @Override
> >     public Iterator<E> iterator() {
> >         return q.iterator();
> >     }
> >
> >     @Override
> >     public int size() {
> >         return q.size();
> >     }
> >
> >     public boolean offer(E e) {
> >         boolean b = q.offer(e);
> >
> >         for (;;)
> >         {
> >             ThreadMarker marker = parkq.poll();
> >
> >             if (marker == null)
> >                 break;
> >             if ( marker.parked)
> >             {
> >                 LockSupport.unpark(marker.thread);
> >                 break;
> >
> >             }
> >         }
> >         return b;
> >     }
> >
> >     public E peek() {
> >         return q.peek ();
> >     }
> >
> >     public E poll() {
> >         return q.poll();
> >     }
> >
> >     /**
> >      * Retrieves and removes the head of this queue, waiting if
> > necessary until
> >      * an element becomes available.
> >      *
> >      * @return the head of this queue
> >      * @throws InterruptedException
> >      *             if interrupted while waiting
> >      */
> >     public E take() throws InterruptedException {
> >         for (;;) {
> >             E e = q.poll();
> >
> >             if (e != null)
> >                 return e;
> >             ThreadMarker m = new ThreadMarker(Thread.currentThread());
> >             parkq.offer(m);
> >             // check again in case there is data race
> >
> >             e = q.poll();
> >
> >             if (e != null)
> >             {
> >                 // data race indeed
> >                 m.parked = false;
> >                 return e;
> >             }
> >             LockSupport.park ();
> >
> >         }
> >     }
> >
> >     /**
> >      * Retrieves and removes the head of this queue, waiting up to the
> > specified
> >      * wait time if necessary for an element to become available.
> >      *
> >      * @param timeout
> >      *            how long to wait before giving up, in units of
> > <tt>unit</tt>
> >      * @param unit
> >      *            a <tt>TimeUnit</tt> determining how to interpret the
> >      *            <tt>timeout</tt> parameter
> >      * @return the head of this queue, or <tt>null</tt> if the specified
> >      *         waiting time elapses before an element is available
> >      * @throws InterruptedException
> >      *             if interrupted while waiting
> >      */
> >     public E poll(long timeout, TimeUnit unit) {
> >
> >         for (;;) {
> >             E e = q.poll();
> >
> >             if (e != null)
> >                 return e;
> >             ThreadMarker m = new ThreadMarker( Thread.currentThread());
> >             parkq.offer(m);
> >             e = q.poll();
> >
> >             if (e != null) {
> >                 m.parked = false;
> >                 return e;
> >             }
> >             LockSupport.parkNanos (unit.toNanos(timeout));
> >         }
> >     }
> > }
> >
> >
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060909/fede6011/attachment-0001.html 


More information about the Concurrency-interest mailing list