[concurrency-interest] ConcurrentLinkedBlockingQueue ?
Hanson Char
hanson.char at gmail.com
Sat Sep 9 23:51:35 EDT 2006
One benefit of such ConcurrentLinkedBlockingQueue is that it is unbounded,
where as even LinkedBlockingQueue has a max capacity of Integer.MAX_VALUE.
Hanson
On 9/9/06, Hanson Char <hanson.char at gmail.com> wrote:
>
> Hi Doug,
>
>
> >This way works, but reduces concurrency by using a single semaphore,
> >so is a bit less scalable than current LinkedBlockingQueue.
>
> I think I found a better way that seems promising enough to provide better
> scalability than LinkedBlockingQueue. Details below.
>
> This time the constructs I use include
> * an extra ConcurrentLinkedQueue for the parking threads
> * volatile for marking if a thread is potentially parked or not
> * LockSupport.[un]park[Nano])
>
> Does this work ?
>
>
> Hanson
>
> public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
> implements java.io.Serializable
> {
> private static class ThreadMarker {
> final Thread thread;
> // assumed parked until found otherwise.
> volatile boolean parked = true;
>
> ThreadMarker(Thread thread)
> {
> this.thread = thread;
> }
> }
>
> private final ConcurrentLinkedQueue<ThreadMarker> parkq = new
> ConcurrentLinkedQueue<ThreadMarker>();
>
>
> private final ConcurrentLinkedQueue<E> q;
>
> public ConcurrentLinkedBlockingQueue() {
> q = new ConcurrentLinkedQueue<E>();
> }
>
> public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
> q = new ConcurrentLinkedQueue<E>(c);
> }
>
> @Override
> public Iterator<E> iterator() {
> return q.iterator();
> }
>
> @Override
> public int size() {
> return q.size();
> }
>
> public boolean offer(E e) {
> boolean b = q.offer(e);
>
> for (;;)
> {
> ThreadMarker marker = parkq.poll();
>
> if (marker == null)
> break;
> if ( marker.parked)
> {
> LockSupport.unpark(marker.thread);
> break;
>
> }
> }
> return b;
> }
>
> public E peek() {
> return q.peek ();
> }
>
> public E poll() {
> return q.poll();
> }
>
> /**
> * Retrieves and removes the head of this queue, waiting if necessary
> until
> * an element becomes available.
> *
> * @return the head of this queue
> * @throws InterruptedException
> * if interrupted while waiting
> */
> public E take() throws InterruptedException {
> for (;;) {
> E e = q.poll();
>
> if (e != null)
> return e;
> ThreadMarker m = new ThreadMarker(Thread.currentThread());
> parkq.offer(m);
> // check again in case there is data race
>
> e = q.poll();
>
> if (e != null)
> {
> // data race indeed
> m.parked = false;
> return e;
> }
> LockSupport.park ();
>
> }
> }
>
> /**
> * Retrieves and removes the head of this queue, waiting up to the
> specified
> * wait time if necessary for an element to become available.
> *
> * @param timeout
> * how long to wait before giving up, in units of
> <tt>unit</tt>
> * @param unit
> * a <tt>TimeUnit</tt> determining how to interpret the
> * <tt>timeout</tt> parameter
> * @return the head of this queue, or <tt>null</tt> if the specified
> * waiting time elapses before an element is available
> * @throws InterruptedException
> * if interrupted while waiting
> */
> public E poll(long timeout, TimeUnit unit) {
>
> for (;;) {
> E e = q.poll();
>
> if (e != null)
> return e;
> ThreadMarker m = new ThreadMarker( Thread.currentThread());
> parkq.offer(m);
> e = q.poll();
>
> if (e != null) {
> m.parked = false;
> return e;
> }
> LockSupport.parkNanos (unit.toNanos(timeout));
> }
> }
> }
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060909/74e43ee0/attachment.html
More information about the Concurrency-interest
mailing list