[concurrency-interest] ConcurrentLinkedBlockingQueue ?

Hanson Char hanson.char at gmail.com
Thu Sep 7 02:13:46 EDT 2006


Hi,

I've been wondering why there is ConcurrentLinkedQueue in Java 5+, but not
something like a ConcurrentLinkedBlockingQueue, which would allow the client
to block on an empty queue via a "take" method, or block on an empty for a
limited time via a "poll" method.

Please find below an attempt to build such queue on other existing
structures.

Any concurrency problem with these classes ?  Is there a better way ?

Hanson

-------------------------------------------
public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
    implements java.io.Serializable
{
    private final ConcurrentLinkedQueue<E> q;
    // See below for details of RelaxedSemaphore
    private final RelaxedSemaphore sem = new RelaxedSemaphore(1);

    public ConcurrentLinkedBlockingQueue() {
        q = new ConcurrentLinkedQueue<E>();
    }

    public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
        q = new ConcurrentLinkedQueue<E>(c);
    }

    @Override
    public Iterator<E> iterator() {
        return q.iterator();
    }

    @Override
    public int size() {
        return q.size();
    }

    public boolean offer(E e) {
        boolean b = q.offer(e);
        sem.release();
        return b;
    }

    public E peek() {
        return q.peek();
    }

    public E poll() {
        return q.poll();
    }

    /**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting
     */
    public E take() throws InterruptedException
    {
        for (;;)
        {
            E e = q.poll();

            if (e != null)
                return e;
            final int permits = sem.availablePermits();
            if (permits > 1)
                sem.reducePermits(permits-1);
            sem.acquire();
        }
    }

    /**
     * Retrieves and removes the head of this queue, waiting up to the
     * specified wait time if necessary for an element to become available.
     *
     * @param timeout how long to wait before giving up, in units of
     *        <tt>unit</tt>
     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
     *        <tt>timeout</tt> parameter
     * @return the head of this queue, or <tt>null</tt> if the
     *         specified waiting time elapses before an element is available
     * @throws InterruptedException if interrupted while waiting
     */
    public E poll(long timeout, TimeUnit unit) throws InterruptedException
    {
        for (;;)
        {
            E e = q.poll();

            if (e != null)
                return e;
            final int permits = sem.availablePermits();
            if (permits > 1)
                sem.reducePermits(permits-1);

            if (!sem.tryAcquire(timeout, unit))
                return null;    // timeout
        }
    }
}

------------------------
// make reducePermits public rather than protected
public class RelaxedSemaphore extends Semaphore {
    private static final long serialVersionUID = -3533893015748638023L;

    public RelaxedSemaphore(int permits) {
        super(permits);
    }

    public RelaxedSemaphore(int permits, boolean fair) {
        super(permits, fair);
    }

    @Override
    public void reducePermits(int reduction) {
        super.reducePermits(reduction);
    }
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060906/8a07a3a2/attachment.html 


More information about the Concurrency-interest mailing list