[concurrency-interest] Multi-Reader One-Writer Queue

Brian Goetz brian at quiotix.com
Fri Sep 2 18:04:03 EDT 2005


 > However, I'm
 > wondering how to determine efficiently who is the last reader during
 > dequeue operation?

I think that's a task for the garbage collector.

> To achieve that, I'm thinking to create an array-based custom queue
 > where each reader is in fact a queue proxy that keep its "dequeue
 > pointer" in the array.

This will only work if you can ensure that the producer cannot outrun 
the slowest consumer.  If you can't, then you have to make the 
multi-insert operation block.

To do this, you could have a semaphore which represents the put credit, 
and no master queue, but N array-based queues:

   Semaphore sem = new Semaphore(bound);

The put operation operates on all queues:

   void put(T t) throws IE {
     sem.acquire();
     Node n = new Node(t, nQueues);
     for (Queue q : queues)
       q.put(n);
   }

The get() operation operates on one queue, and the last of the get() 
operations for a given node releases the semaphore permit:

   T get() {
     Node n = q.take();
     int count = n.count.decrementAndGet();
     if (count == 0)
       sem.release();
   }

   class Node<T> {
     final T item;
     final AtomicInteger count;

     public Node(T item, int count) {
       this.item = item;
       this.count = new AtomicInteger(count);
     }
   }



More information about the Concurrency-interest mailing list