[concurrency-interest] ConcurrentLinkedBlockingQueue ?
Hanson Char
hanson.char at gmail.com
Sun Sep 10 02:58:56 EDT 2006
Some initial load testings (with 100,000 inserts by 10 concurrent threads
into the same queue) shows the ConcurrentLinkedBlockingQueue is consistently
faster than the LinkedBlockingQueue. If interested, I can also post the
junit tests code.
Hanson
On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
>
> There is a bug in poll(long, TimeUnit). See below for the fix.
>
> H
>
> public E poll(long timeout, TimeUnit unit) throws InterruptedException
> {
> if (timeout < 0)
> return take(); // treat -ve timeout same as to wait forever
>
> long t0=0;
>
>
> for (;;) {
> E e = q.poll();
>
> if (e != null)
> return e;
> if (t0 > 0 && System.nanoTime() >= (t0 + unit.toNanos
> (timeout)))
> return null; // time out
>
> ThreadMarker m = new ThreadMarker(Thread.currentThread());
> parkq.offer(m);
> e = q.poll();
>
> if (e != null) {
> m.parked = false;
> return e;
> }
> t0 = System.nanoTime();
>
> LockSupport.parkNanos(unit.toNanos(timeout));
> }
> }
>
>
> On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
> >
> > One benefit of such ConcurrentLinkedBlockingQueue is that it is
> > unbounded, where as even LinkedBlockingQueue has a max capacity of
> > Integer.MAX_VALUE.
> >
> > Hanson
> >
> >
> > On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
> > >
> > > Hi Doug,
> > >
> > >
> > > >This way works, but reduces concurrency by using a single semaphore,
> > > >so is a bit less scalable than current LinkedBlockingQueue.
> > >
> > > I think I found a better way that seems promising enough to provide
> > > better scalability than LinkedBlockingQueue. Details below.
> > >
> > > This time the constructs I use include
> > > * an extra ConcurrentLinkedQueue for the parking threads
> > > * volatile for marking if a thread is potentially parked or not
> > > * LockSupport.[un]park[Nano])
> > >
> > > Does this work ?
> > >
> > >
> > > Hanson
> > >
> > > public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
> > > implements java.io.Serializable
> > > {
> > > private static class ThreadMarker {
> > > final Thread thread;
> > > // assumed parked until found otherwise.
> > > volatile boolean parked = true;
> > >
> > > ThreadMarker(Thread thread)
> > > {
> > > this.thread = thread;
> > > }
> > > }
> > >
> > > private final ConcurrentLinkedQueue<ThreadMarker> parkq = new
> > > ConcurrentLinkedQueue<ThreadMarker>();
> > >
> > >
> > > private final ConcurrentLinkedQueue<E> q;
> > >
> > > public ConcurrentLinkedBlockingQueue() {
> > > q = new ConcurrentLinkedQueue<E>();
> > > }
> > >
> > > public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
> > > q = new ConcurrentLinkedQueue<E>(c);
> > > }
> > >
> > > @Override
> > > public Iterator<E> iterator() {
> > > return q.iterator();
> > > }
> > >
> > > @Override
> > > public int size() {
> > > return q.size();
> > > }
> > >
> > > public boolean offer(E e) {
> > > boolean b = q.offer(e);
> > >
> > > for (;;)
> > > {
> > > ThreadMarker marker = parkq.poll();
> > >
> > > if (marker == null)
> > > break;
> > > if ( marker.parked)
> > > {
> > > LockSupport.unpark(marker.thread);
> > > break;
> > >
> > > }
> > > }
> > > return b;
> > > }
> > >
> > > public E peek() {
> > > return q.peek ();
> > > }
> > >
> > > public E poll() {
> > > return q.poll();
> > > }
> > >
> > > /**
> > > * Retrieves and removes the head of this queue, waiting if
> > > necessary until
> > > * an element becomes available.
> > > *
> > > * @return the head of this queue
> > > * @throws InterruptedException
> > > * if interrupted while waiting
> > > */
> > > public E take() throws InterruptedException {
> > > for (;;) {
> > > E e = q.poll();
> > >
> > > if (e != null)
> > > return e;
> > > ThreadMarker m = new ThreadMarker(Thread.currentThread());
> > >
> > > parkq.offer(m);
> > > // check again in case there is data race
> > >
> > > e = q.poll();
> > >
> > > if (e != null)
> > > {
> > > // data race indeed
> > > m.parked = false;
> > > return e;
> > > }
> > > LockSupport.park ();
> > >
> > > }
> > > }
> > >
> > > /**
> > > * Retrieves and removes the head of this queue, waiting up to the
> > > specified
> > > * wait time if necessary for an element to become available.
> > > *
> > > * @param timeout
> > > * how long to wait before giving up, in units of
> > > <tt>unit</tt>
> > > * @param unit
> > > * a <tt>TimeUnit</tt> determining how to interpret the
> > > * <tt>timeout</tt> parameter
> > > * @return the head of this queue, or <tt>null</tt> if the
> > > specified
> > > * waiting time elapses before an element is available
> > > * @throws InterruptedException
> > > * if interrupted while waiting
> > > */
> > > public E poll(long timeout, TimeUnit unit) {
> > >
> > > for (;;) {
> > > E e = q.poll();
> > >
> > > if (e != null)
> > > return e;
> > > ThreadMarker m = new ThreadMarker( Thread.currentThread());
> > >
> > > parkq.offer(m);
> > > e = q.poll();
> > >
> > > if (e != null) {
> > > m.parked = false;
> > > return e;
> > > }
> > > LockSupport.parkNanos (unit.toNanos(timeout));
> > > }
> > > }
> > > }
> > >
> > >
> >
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060909/7b79fc43/attachment.html
More information about the Concurrency-interest
mailing list