[concurrency-interest] Quest for the optimal queue

Andriy Plokhotnyuk plokhotnyuk at gmail.com
Thu May 24 08:21:03 EDT 2012

Hi All,

Thank you, Michael, for sharing idea how to minimize access to volatile

Here is my attempt to use one of Dmitriy Vyukov's version of multi producer
/ single consumer node based queue in implementation of Scalaz actor instead
of ConcurrentLinkedQueue:

Your hints allowed to improve throughput in ~3x times and decrease latency
on ~20% comparing to Scalaz actor that uses CLQ, here are simple benchmarks
and their results:

Also I found that the same approach cannot be easy applied for custom
implementations of Akka mailboxes:

EDIT: This and other ideas like reusing envelope as node for queue list, or
hacking of fork/join pool to automatically discard actor resheduling
requests while task still running (it will allow avoid of tracking of
running state when adding message to the queue) - can improve not only
throughput but greatly decrease latency too.

Best regards,

Michael Barker-4 wrote:
>> The problem with batch dequeues is that if one of the messages fail we
>> need
>> to be at a stable state, and doing that would mean store away the
>> remaining
>> batch, which would bump the processors size by atleast a reference, which
>> can be expensive if you have millions of them.
> If you are will to stay away from the standard Java collections API,
> then I was thinking of a call with a more functional style.  E.g.
> (pseudo code)
>     public void foreach(Callback callback) {
>         Node current = head;
>         if (current.next == null) {
>             return;
>         }
>         try {
>             do {
>                 Object value = current.value;
>                 callback.onEvent(value);
>                 current = current.next;
>             } while (current.next != null);
>         } finally {
>             updateHead(current);  // Do the appropriate thread-safe
>                                             // update to head
>         }
>     }
>     private static interface Callback
>     {
>         void onMessage(Object o);
>     }
> In the above case, if an exception thrown when handling the message,
> then the message that caused the exception is redelivered.  The
> collection is then responsible for maintaining a stable state in the
> case of an exception and the definition of that behaviour is part of
> the contract for the collection.
> I think there is also an interesting optimisation here.  The reason
> I've added the short circuit check at the top of the method is that I
> think it removes the only possible case where you could have write
> contention with producers and consumers.  The only time a consumer and
> a producer would contend on a write would be if the queue was empty.
> I.e. head == tail.  If we remove that case from the consumer then
> producers and consumers should never have a write conflict.  The
> updateHead() method used by the consumer may not need a CAS, it is
> possible that you could get away with a lazySet, which would certainly
> improve performance.  Someone should check my reasoning though.
>> It's an interesting problem though. I've been thinking about how to
>> handle
>> producer conflicts as cheap as possible, as there are no consumer
>> conflicts.
> That's a tough one.  The most complicated code in the Disruptor is
> dealing with this case and we've ended up with 2 strategies based on
> the ratio of producer threads to available cores. With an array-backed
> queue this is easier, but I think for your use case you need something
> list-backed.
> Mike.
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
View this message in context: http://old.nabble.com/Quest-for-the-optimal-queue-tp33833783p33896981.html
Sent from the JSR166 Concurrency mailing list archive at Nabble.com.

More information about the Concurrency-interest mailing list