[concurrency-interest] Concurrent indexed queue

Ben Manes ben_manes at yahoo.com
Wed Aug 24 14:59:36 EDT 2011

If you do not need to maintain write-order processing of the queue, then only the map is required. The value could maintain a flag so that when a consumer scans the map it can skip previously processed entries. At a size of 5,000 entries, the map is probably small enough to make this reasonably fast despite being brute force.

Another approach is to maintain a write-ordered queue that producers periodically prune. If queue exceeds a soft bounding then a producer can tryLock to exclusively prune, scan it to discard stale events, and perhaps bound the work to avoid an excessive penalty. This would effectively amortize the clean-up on the producer threads.

A more complicated variant would be to place the linkage pointers onto the values to allow O(1) removals or reorderings. The producers can add tasks to buffer(s) to allow the queue to be caught-up in a non-blocking fashion. When the tryLock is acquired by either a producer or consumer the buffers are drained to bring the queue up to date. The consumer would have the additional task of draining the queue when processing the entries. This would maintain write ordering, avoid unbounded growth, and maintain O(1) efficiency. However, its probably more complicated than needed in this situation as the design is styled based on a concurrent LinkedHashMap.

From: Nathan Reynolds <nathan.reynolds at oracle.com>
To: Rohit Reja <rreja2000 at yahoo.com>
Cc: Concurrency <concurrency-interest at cs.oswego.edu>
Sent: Wednesday, August 24, 2011 9:33 AM
Subject: Re: [concurrency-interest] Concurrent indexed queue

 A CHM will work but there might be a lighter-weight approach.  But the lighter-weight approach requires that all of the instruments are known ahead of time and the producers simply put() the latest quote.

If these constraints are allowed, consider filling a HashMap with
    each instrument and a null value or a dummy quote.  As data new
    quotes arrive, simply put(I, Q).  Consumers will then simply get(I).

Outside the HashMap, all producers have to set a volatile variable
    (e.g. an int) to some constant (e.g. 0) after the put() call.  All
    consumers have to read the same volatile variable.  The volatile
    variable ensures that the JVM will insert any necessary memory
    barriers so that consumers will always see the latest quote.  If
    consumers are allowed to see slightly older quotes even though a
    newer one is in the HashMap, then the volatile variable is not

Nathan Reynolds | Consulting Member of Technical Staff | 602.333.9091
Oracle PSR Engineering | Server Technology 
On 8/24/2011 9:10 AM, Rohit Reja wrote: 
Hi Aleksey, 
>To clarify a bit further, I dont need to keep any
                history of quotes, I just need to keep the latest quote
                for an instrument. 
>So for e.g. One of the possible solution that i thought
                was to keep a CHM of instrumentid (key) and
>And an event queue of "instrumentids". so lets say
                events comes as 
>E1( I1, Q1)
>E2(I2, q1)
>E3(I1, Q2)
>Then CHM will have 2 entries. While the Queue will have
                3 entries {I1, I2, I1}. 
>Now when consumer consumes I1 it removes I1's entry from
                CHM, so that consumer can ignore the event 
>I1 next time when it retrieves I1 from queue. 
>This implementation would cause the event queue to grow
                indefinetely as the consumer is slower.
>To avoid this , I can simply check if I1 is contained in
                the queue before putting in queue but that would make
                things definetely very slow.
>I hope I have made things clearer now.
> From:  Aleksey Shipilev <aleksey.shipilev at gmail.com>; 
>To:  Rohit Reja <rreja2000 at yahoo.com>; 
>Cc:  <concurrency-interest at cs.oswego.edu>; 
>Subject:  Re: [concurrency-interest] Concurrent indexed queue 
>Sent:  Wed, Aug 24, 2011 3:28:23 PM 
>Hi Rohit,  
>Are you looking for efficient CircularBuffer implementation then? Having one of those, you can then have per-instrument CircularBuffer and poll each by "key", which will provide essential indexing.
>On Wed, Aug 24, 2011 at 6:58 PM, Rohit Reja <rreja2000 at yahoo.com> wrote:
>>I am working on a project where I
                                    need advise on designing following
>>Producer threads read quotes on a
                                    set of instruments (5000
                                    instruments) from a message bus 
>>Which needs to be routed to a
                                    downstream system which can consume
                                    messages at a slower rate 
>>Than producers are producing
                                    messages. The catch is that only the
                                    latest quotes need to be published
>>To the downstream system. So we can
                                    miss out on stale quotes. So
                                    essentially we dont need to slow
                                    down the producers as the event
                                    queue is bounded by the number of
>>I want to keep the design flexible
                                    as to allow multiple producers and
                                    multiple consumers. 
>>So in short I need a concurrent
                                    indexed queue where i can update the
                                    entries in place. This is a latency
                                    sensitive application so we would
                                    want latency to be as low as
>>Please advise me for suitable data
                                    structures that can be of use.
>> From:  Jeff Hain <jeffhain at rocketmail.com>; 
>>To:  Doug Lea <dl at cs.oswego.edu>; 
>>Cc:  <concurrency-interest at cs.oswego.edu>; 
>>Subject:  [concurrency-interest] Re : Re : concurrent counter : incrementAndGet 
>>Sent:  Sat, Aug 20, 2011 10:17:54 AM 
>>I hand-emulated ">" replies style, which is handier :)
>>>De : Doug Lea <dl at cs.oswego.edu>
>>>À : concurrency-interest at cs.oswego.edu
>>>Envoyé le : Jeu 18 août 2011, 22h 05min 59s
>>>Objet : Re: [concurrency-interest] Re : concurrent counter : incrementAndGet
>>>On 08/14/11 07:03, Jeff
                                          Hain wrote:
>>>> That could work well
                                          for some cases indeed, but I'm
                                          working on a sort
>>>> of Disruptor, using
                                          the counter with modulo to
                                          pick up slots to write to in
>>>> a cyclic array,
                                          monotonically, or
                                          non-monotonically but for a
                                          short time
>>>> (for readers not to
                                          be blocked on a
                                          not-yet-written slot), and any
>>>> can stop to work
                                          anytime; in this case I don't
                                          see how that could apply
>>>> easily.
>>>I don't think
                                          "non-monotonically but for a
                                          short time" makes
>>>this easier, since any
                                          violations are likely to be
>>   Violations are indeed
                                          unbounded, which could make
                                          for example all producers
>>claim wrapping sequences (on
                                          entries which could not be
                                          written for the time),
>>and let consumers stuck on
                                          not-yet-written entries (*).
>>   Though, these violations
                                          are dealt with, as follows:
                                          when a consumer encounters
>>a writable entry (for "current
                                          round"), and that the counter
                                          already provided a higher
>>sequence to a producer (hence
                                          there will be something to eat
                                          further), it tries to
>>CAS-it-up to being writable
                                          but for "next round" (i.e.
                                          sequence += buffer length),
>>so that consumers can pass on
                                          it and not be stuck (a
                                          producer can write it just
>>it doesn't hurt, consumers
                                          will still just pass on it).
                                          In the process, consumer also
>>the counter of any lower
                                          sequence, to lower violations,
                                          and so that no producer
>>subsequently tries to write on
                                          a CASed-up entry (if one does,
                                          i.e. entry doesn't
>>have the expected sequence, it
                                          just picks another sequence).
>>   At the time, on a core i7 X
                                          980, when doing "no work"
                                          benches (i.e. consumers
>>do nothing), using a
                                          concurrent counter instead of
                                          AtomicLong is just a bit
                                          slower if
>>consumers are going faster
                                          than a few producers (due to
                                          the CASing-up going on I
>>but if producers go faster,
                                          i.e. if consumers have
                                          something to eat most of the
>>I've seen up to 30 percent
                                          improvement with 8 producers
                                          and 1 consumer, and about
>>20 percent with anywhere
                                          between 32 and 128 producers.
>>   (My code is still a mess of
                                          comments and obsolete code, or
                                          dead debugging code,
>>so I can't attach it, but
                                          there's not much more in it
                                          than what I said.)
>>   (*) The "Disruptor" I use
                                          is "unicast", and producers
                                          and consumers don't spy
>>on each other (which doesn't
                                          scale well), but only look at
                                          entries sequence and status
                                          written,readable,being read),
                                          both stored in a same long.
>>>You might be able to live
                                          with thread local random
                                          number generators,
>>>so that in the long run on
                                          average you are balanced?
>>   I tried to use random
                                          (Marsaglia XorShift) in the
                                          concurrent counter,
>>but it was actually slower
                                          than just doing "++index" for
                                          the next cell to
>>CAS on (it's an array of
                                          sub-counters, each incremented
                                          by the number
>>of sub-counters each time). If
                                          CAS fails, I try another cell.
                                          Also, I actually
>>don't do "++index", but "index
                                          = index + 1 + nbrOfFailedCAS"
                                          (with mask),
>>which seems to help to get
                                          away from contention.
>>   Also, as I said, balancing
                                          is dealt with, but the counter
                                          also has
>>a method that ensures
                                          monotonicity (for the
                                          thread-local, or
                                          data structure that holds
>>index and highest returned
                                          value). It is a bit slower,
                                          but it tends
>>to lower unbalancing, since
                                          when it gets a value lower
                                          than the
>>highest previously returned
                                          one, it tries to CAS again on
                                          the same cell,
>>making it catch-up.
>>>Otherwise, you have a
                                          version of the Counting
                                          problem described
>>>in Herlihy & Shavit,
                                          for which all known scalable
>>>are expensive,
                                          complicated, and difficult to
                                          package as
>>>j.u.c components. Although
                                          there are some special cases
>>>SNZI for detecting when
                                          counts hit the particular
                                          value of zero.
>>   Thanks, I didn't know about
                                          these. I've tried Shavit &
>>flat-combining method though
                                          for the counter, but it was
                                          too slow.
>>>(This seems to be the main
                                          bottleneck in Disruptor-like
>>   Yes, that's why I started
                                          to focus on it. I think it's
                                          the only part of this
>>that is potentially
                                          proportionnal-or-worse to the
                                          number of threads (one could
>>of ring buffer scanning by
                                          consumers too, but it should
                                          be possible to use
>>modulo so that each entry
                                          doesn't get scanned by all
                                              mailing list
>>Concurrency-interest at cs.oswego.edu
>>Concurrency-interest mailing list
>>Concurrency-interest at cs.oswego.edu
Concurrency-interest mailing list Concurrency-interest at cs.oswego.edu http://cs.oswego.edu/mailman/listinfo/concurrency-interest 
Concurrency-interest mailing list
Concurrency-interest at cs.oswego.edu
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20110824/c69215d3/attachment-0001.html>

More information about the Concurrency-interest mailing list