[concurrency-interest] ConcurrentLinkedBlockingQueue ?
Hanson Char
hanson.char at gmail.com
Sun Sep 10 02:21:55 EDT 2006
There is a bug in poll(long, TimeUnit). See below for the fix.
H
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
if (timeout < 0)
return take(); // treat -ve timeout same as to wait forever
long t0=0;
for (;;) {
E e = q.poll();
if (e != null)
return e;
if (t0 > 0 && System.nanoTime() >= (t0 + unit.toNanos(timeout)))
return null; // time out
ThreadMarker m = new ThreadMarker(Thread.currentThread());
parkq.offer(m);
e = q.poll();
if (e != null) {
m.parked = false;
return e;
}
t0 = System.nanoTime();
LockSupport.parkNanos(unit.toNanos(timeout));
}
}
On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
>
> One benefit of such ConcurrentLinkedBlockingQueue is that it is unbounded,
> where as even LinkedBlockingQueue has a max capacity of Integer.MAX_VALUE.
>
> Hanson
>
>
> On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
> >
> > Hi Doug,
> >
> >
> > >This way works, but reduces concurrency by using a single semaphore,
> > >so is a bit less scalable than current LinkedBlockingQueue.
> >
> > I think I found a better way that seems promising enough to provide
> > better scalability than LinkedBlockingQueue. Details below.
> >
> > This time the constructs I use include
> > * an extra ConcurrentLinkedQueue for the parking threads
> > * volatile for marking if a thread is potentially parked or not
> > * LockSupport.[un]park[Nano])
> >
> > Does this work ?
> >
> >
> > Hanson
> >
> > public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
> > implements java.io.Serializable
> > {
> > private static class ThreadMarker {
> > final Thread thread;
> > // assumed parked until found otherwise.
> > volatile boolean parked = true;
> >
> > ThreadMarker(Thread thread)
> > {
> > this.thread = thread;
> > }
> > }
> >
> > private final ConcurrentLinkedQueue<ThreadMarker> parkq = new
> > ConcurrentLinkedQueue<ThreadMarker>();
> >
> >
> > private final ConcurrentLinkedQueue<E> q;
> >
> > public ConcurrentLinkedBlockingQueue() {
> > q = new ConcurrentLinkedQueue<E>();
> > }
> >
> > public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
> > q = new ConcurrentLinkedQueue<E>(c);
> > }
> >
> > @Override
> > public Iterator<E> iterator() {
> > return q.iterator();
> > }
> >
> > @Override
> > public int size() {
> > return q.size();
> > }
> >
> > public boolean offer(E e) {
> > boolean b = q.offer(e);
> >
> > for (;;)
> > {
> > ThreadMarker marker = parkq.poll();
> >
> > if (marker == null)
> > break;
> > if ( marker.parked)
> > {
> > LockSupport.unpark(marker.thread);
> > break;
> >
> > }
> > }
> > return b;
> > }
> >
> > public E peek() {
> > return q.peek ();
> > }
> >
> > public E poll() {
> > return q.poll();
> > }
> >
> > /**
> > * Retrieves and removes the head of this queue, waiting if
> > necessary until
> > * an element becomes available.
> > *
> > * @return the head of this queue
> > * @throws InterruptedException
> > * if interrupted while waiting
> > */
> > public E take() throws InterruptedException {
> > for (;;) {
> > E e = q.poll();
> >
> > if (e != null)
> > return e;
> > ThreadMarker m = new ThreadMarker(Thread.currentThread());
> > parkq.offer(m);
> > // check again in case there is data race
> >
> > e = q.poll();
> >
> > if (e != null)
> > {
> > // data race indeed
> > m.parked = false;
> > return e;
> > }
> > LockSupport.park ();
> >
> > }
> > }
> >
> > /**
> > * Retrieves and removes the head of this queue, waiting up to the
> > specified
> > * wait time if necessary for an element to become available.
> > *
> > * @param timeout
> > * how long to wait before giving up, in units of
> > <tt>unit</tt>
> > * @param unit
> > * a <tt>TimeUnit</tt> determining how to interpret the
> > * <tt>timeout</tt> parameter
> > * @return the head of this queue, or <tt>null</tt> if the specified
> > * waiting time elapses before an element is available
> > * @throws InterruptedException
> > * if interrupted while waiting
> > */
> > public E poll(long timeout, TimeUnit unit) {
> >
> > for (;;) {
> > E e = q.poll();
> >
> > if (e != null)
> > return e;
> > ThreadMarker m = new ThreadMarker( Thread.currentThread());
> > parkq.offer(m);
> > e = q.poll();
> >
> > if (e != null) {
> > m.parked = false;
> > return e;
> > }
> > LockSupport.parkNanos (unit.toNanos(timeout));
> > }
> > }
> > }
> >
> >
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060909/fede6011/attachment-0001.html
More information about the Concurrency-interest
mailing list