[concurrency-interest] ConcurrentLinkedBlockingQueue ?

Hanson Char hanson.char at gmail.com
Sat Sep 9 23:51:35 EDT 2006


One benefit of such ConcurrentLinkedBlockingQueue is that it is unbounded,
where as even LinkedBlockingQueue has a max capacity of Integer.MAX_VALUE.

Hanson

On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
>
> Hi Doug,
>
>
> >This way works, but reduces concurrency by using a single semaphore,
> >so is a bit less scalable than current LinkedBlockingQueue.
>
> I think I found a better way that seems promising enough to provide better
> scalability than LinkedBlockingQueue.   Details below.
>
> This time the constructs I use include
> * an extra ConcurrentLinkedQueue for the parking threads
> * volatile for marking if a thread is potentially parked or not
> * LockSupport.[un]park[Nano])
>
> Does this work ?
>
>
> Hanson
>
> public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
>         implements java.io.Serializable
> {
>     private static class ThreadMarker {
>         final Thread thread;
>         // assumed parked until found otherwise.
>         volatile boolean parked = true;
>
>         ThreadMarker(Thread thread)
>         {
>             this.thread = thread;
>         }
>     }
>
>     private final ConcurrentLinkedQueue<ThreadMarker> parkq = new
> ConcurrentLinkedQueue<ThreadMarker>();
>
>
>     private final ConcurrentLinkedQueue<E> q;
>
>     public ConcurrentLinkedBlockingQueue() {
>         q = new ConcurrentLinkedQueue<E>();
>     }
>
>     public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
>         q = new ConcurrentLinkedQueue<E>(c);
>     }
>
>     @Override
>     public Iterator<E> iterator() {
>         return q.iterator();
>     }
>
>     @Override
>     public int size() {
>         return q.size();
>     }
>
>     public boolean offer(E e) {
>         boolean b = q.offer(e);
>
>         for (;;)
>         {
>             ThreadMarker marker = parkq.poll();
>
>             if (marker == null)
>                 break;
>             if ( marker.parked)
>             {
>                 LockSupport.unpark(marker.thread);
>                 break;
>
>             }
>         }
>         return b;
>     }
>
>     public E peek() {
>         return q.peek ();
>     }
>
>     public E poll() {
>         return q.poll();
>     }
>
>     /**
>      * Retrieves and removes the head of this queue, waiting if necessary
> until
>      * an element becomes available.
>      *
>      * @return the head of this queue
>      * @throws InterruptedException
>      *             if interrupted while waiting
>      */
>     public E take() throws InterruptedException {
>         for (;;) {
>             E e = q.poll();
>
>             if (e != null)
>                 return e;
>             ThreadMarker m = new ThreadMarker(Thread.currentThread());
>             parkq.offer(m);
>             // check again in case there is data race
>
>             e = q.poll();
>
>             if (e != null)
>             {
>                 // data race indeed
>                 m.parked = false;
>                 return e;
>             }
>             LockSupport.park ();
>
>         }
>     }
>
>     /**
>      * Retrieves and removes the head of this queue, waiting up to the
> specified
>      * wait time if necessary for an element to become available.
>      *
>      * @param timeout
>      *            how long to wait before giving up, in units of
> <tt>unit</tt>
>      * @param unit
>      *            a <tt>TimeUnit</tt> determining how to interpret the
>      *            <tt>timeout</tt> parameter
>      * @return the head of this queue, or <tt>null</tt> if the specified
>      *         waiting time elapses before an element is available
>      * @throws InterruptedException
>      *             if interrupted while waiting
>      */
>     public E poll(long timeout, TimeUnit unit) {
>
>         for (;;) {
>             E e = q.poll();
>
>             if (e != null)
>                 return e;
>             ThreadMarker m = new ThreadMarker( Thread.currentThread());
>             parkq.offer(m);
>             e = q.poll();
>
>             if (e != null) {
>                 m.parked = false;
>                 return e;
>             }
>             LockSupport.parkNanos (unit.toNanos(timeout));
>         }
>     }
> }
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060909/74e43ee0/attachment.html 


More information about the Concurrency-interest mailing list