[concurrency-interest] ConcurrentLinkedBlockingQueue ?
Hanson Char
hanson.char at gmail.com
Thu Sep 7 02:13:46 EDT 2006
Hi,
I've been wondering why there is ConcurrentLinkedQueue in Java 5+, but not
something like a ConcurrentLinkedBlockingQueue, which would allow the client
to block on an empty queue via a "take" method, or block on an empty for a
limited time via a "poll" method.
Please find below an attempt to build such queue on other existing
structures.
Any concurrency problem with these classes ? Is there a better way ?
Hanson
-------------------------------------------
public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
implements java.io.Serializable
{
private final ConcurrentLinkedQueue<E> q;
// See below for details of RelaxedSemaphore
private final RelaxedSemaphore sem = new RelaxedSemaphore(1);
public ConcurrentLinkedBlockingQueue() {
q = new ConcurrentLinkedQueue<E>();
}
public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
q = new ConcurrentLinkedQueue<E>(c);
}
@Override
public Iterator<E> iterator() {
return q.iterator();
}
@Override
public int size() {
return q.size();
}
public boolean offer(E e) {
boolean b = q.offer(e);
sem.release();
return b;
}
public E peek() {
return q.peek();
}
public E poll() {
return q.poll();
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting
*/
public E take() throws InterruptedException
{
for (;;)
{
E e = q.poll();
if (e != null)
return e;
final int permits = sem.availablePermits();
if (permits > 1)
sem.reducePermits(permits-1);
sem.acquire();
}
}
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
* @param timeout how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element is available
* @throws InterruptedException if interrupted while waiting
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException
{
for (;;)
{
E e = q.poll();
if (e != null)
return e;
final int permits = sem.availablePermits();
if (permits > 1)
sem.reducePermits(permits-1);
if (!sem.tryAcquire(timeout, unit))
return null; // timeout
}
}
}
------------------------
// make reducePermits public rather than protected
public class RelaxedSemaphore extends Semaphore {
private static final long serialVersionUID = -3533893015748638023L;
public RelaxedSemaphore(int permits) {
super(permits);
}
public RelaxedSemaphore(int permits, boolean fair) {
super(permits, fair);
}
@Override
public void reducePermits(int reduction) {
super.reducePermits(reduction);
}
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060906/8a07a3a2/attachment.html
More information about the Concurrency-interest
mailing list