[concurrency-interest] ConcurrentLinkedBlockingQueue ?

Hanson Char hanson.char at gmail.com
Sat Sep 9 21:18:54 EDT 2006


Hi Doug,

>This way works, but reduces concurrency by using a single semaphore,
>so is a bit less scalable than current LinkedBlockingQueue.

I think I found a better way that seems promising enough to provide better
scalability than LinkedBlockingQueue.   Details below.

This time the constructs I use include
* an extra ConcurrentLinkedQueue for the parking threads
* volatile for marking if a thread is potentially parked or not
* LockSupport.[un]park[Nano])

Does this work ?

Hanson

public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
        implements java.io.Serializable
{
    private static class ThreadMarker {
        final Thread thread;
        // assumed parked until found otherwise.
        volatile boolean parked = true;

        ThreadMarker(Thread thread)
        {
            this.thread = thread;
        }
    }

    private final ConcurrentLinkedQueue<ThreadMarker> parkq = new
ConcurrentLinkedQueue<ThreadMarker>();

    private final ConcurrentLinkedQueue<E> q;

    public ConcurrentLinkedBlockingQueue() {
        q = new ConcurrentLinkedQueue<E>();
    }

    public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
        q = new ConcurrentLinkedQueue<E>(c);
    }

    @Override
    public Iterator<E> iterator() {
        return q.iterator();
    }

    @Override
    public int size() {
        return q.size();
    }

    public boolean offer(E e) {
        boolean b = q.offer(e);

        for (;;)
        {
            ThreadMarker marker = parkq.poll();

            if (marker == null)
                break;
            if (marker.parked)
            {
                LockSupport.unpark(marker.thread);
                break;
            }
        }
        return b;
    }

    public E peek() {
        return q.peek();
    }

    public E poll() {
        return q.poll();
    }

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
until
     * an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException
     *             if interrupted while waiting
     */
    public E take() throws InterruptedException {
        for (;;) {
            E e = q.poll();

            if (e != null)
                return e;
            ThreadMarker m = new ThreadMarker(Thread.currentThread());
            parkq.offer(m);
            // check again in case there is data race
            e = q.poll();

            if (e != null)
            {
                // data race indeed
                m.parked = false;
                return e;
            }
            LockSupport.park();
        }
    }

    /**
     * Retrieves and removes the head of this queue, waiting up to the
specified
     * wait time if necessary for an element to become available.
     *
     * @param timeout
     *            how long to wait before giving up, in units of
<tt>unit</tt>
     * @param unit
     *            a <tt>TimeUnit</tt> determining how to interpret the
     *            <tt>timeout</tt> parameter
     * @return the head of this queue, or <tt>null</tt> if the specified
     *         waiting time elapses before an element is available
     * @throws InterruptedException
     *             if interrupted while waiting
     */
    public E poll(long timeout, TimeUnit unit) {
        for (;;) {
            E e = q.poll();

            if (e != null)
                return e;
            ThreadMarker m = new ThreadMarker(Thread.currentThread());
            parkq.offer(m);
            e = q.poll();

            if (e != null) {
                m.parked = false;
                return e;
            }
            LockSupport.parkNanos(unit.toNanos(timeout));
        }
    }
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060909/32a35ba3/attachment.html 


More information about the Concurrency-interest mailing list