[concurrency-interest] ConcurrentLinkedBlockingQueue ?
Hanson Char
hanson.char at gmail.com
Sat Sep 9 21:18:54 EDT 2006
Hi Doug,
>This way works, but reduces concurrency by using a single semaphore,
>so is a bit less scalable than current LinkedBlockingQueue.
I think I found a better way that seems promising enough to provide better
scalability than LinkedBlockingQueue. Details below.
This time the constructs I use include
* an extra ConcurrentLinkedQueue for the parking threads
* volatile for marking if a thread is potentially parked or not
* LockSupport.[un]park[Nano])
Does this work ?
Hanson
public class ConcurrentLinkedBlockingQueue<E> extends AbstractQueue<E>
implements java.io.Serializable
{
private static class ThreadMarker {
final Thread thread;
// assumed parked until found otherwise.
volatile boolean parked = true;
ThreadMarker(Thread thread)
{
this.thread = thread;
}
}
private final ConcurrentLinkedQueue<ThreadMarker> parkq = new
ConcurrentLinkedQueue<ThreadMarker>();
private final ConcurrentLinkedQueue<E> q;
public ConcurrentLinkedBlockingQueue() {
q = new ConcurrentLinkedQueue<E>();
}
public ConcurrentLinkedBlockingQueue(Collection<? extends E> c) {
q = new ConcurrentLinkedQueue<E>(c);
}
@Override
public Iterator<E> iterator() {
return q.iterator();
}
@Override
public int size() {
return q.size();
}
public boolean offer(E e) {
boolean b = q.offer(e);
for (;;)
{
ThreadMarker marker = parkq.poll();
if (marker == null)
break;
if (marker.parked)
{
LockSupport.unpark(marker.thread);
break;
}
}
return b;
}
public E peek() {
return q.peek();
}
public E poll() {
return q.poll();
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
until
* an element becomes available.
*
* @return the head of this queue
* @throws InterruptedException
* if interrupted while waiting
*/
public E take() throws InterruptedException {
for (;;) {
E e = q.poll();
if (e != null)
return e;
ThreadMarker m = new ThreadMarker(Thread.currentThread());
parkq.offer(m);
// check again in case there is data race
e = q.poll();
if (e != null)
{
// data race indeed
m.parked = false;
return e;
}
LockSupport.park();
}
}
/**
* Retrieves and removes the head of this queue, waiting up to the
specified
* wait time if necessary for an element to become available.
*
* @param timeout
* how long to wait before giving up, in units of
<tt>unit</tt>
* @param unit
* a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return the head of this queue, or <tt>null</tt> if the specified
* waiting time elapses before an element is available
* @throws InterruptedException
* if interrupted while waiting
*/
public E poll(long timeout, TimeUnit unit) {
for (;;) {
E e = q.poll();
if (e != null)
return e;
ThreadMarker m = new ThreadMarker(Thread.currentThread());
parkq.offer(m);
e = q.poll();
if (e != null) {
m.parked = false;
return e;
}
LockSupport.parkNanos(unit.toNanos(timeout));
}
}
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060909/32a35ba3/attachment.html
More information about the Concurrency-interest
mailing list