[concurrency-interest] Concurrent indexed queue

Nathan Reynolds nathan.reynolds at oracle.com
Wed Aug 24 12:33:36 EDT 2011

A CHM will work but there might be a lighter-weight approach.  But the 
lighter-weight approach requires that all of the instruments are known 
ahead of time and the producers simply put() the latest quote.

If these constraints are allowed, consider filling a HashMap with each 
instrument and a null value or a dummy quote.  As data new quotes 
arrive, simply put(I, Q).  Consumers will then simply get(I).

Outside the HashMap, all producers have to set a volatile variable (e.g. 
an int) to some constant (e.g. 0) after the put() call.  All consumers 
have to read the same volatile variable.  The volatile variable ensures 
that the JVM will insert any necessary memory barriers so that consumers 
will always see the latest quote.  If consumers are allowed to see 
slightly older quotes even though a newer one is in the HashMap, then 
the volatile variable is not necessary.

Nathan Reynolds 
<http://psr.us.oracle.com/wiki/index.php/User:Nathan_Reynolds> | 
Consulting Member of Technical Staff | 602.333.9091
Oracle PSR Engineering <http://psr.us.oracle.com/> | Server Technology

On 8/24/2011 9:10 AM, Rohit Reja wrote:
> Hi Aleksey,
> To clarify a bit further, I dont need to keep any history of quotes, I 
> just need to keep the latest quote for an instrument.
> So for e.g. One of the possible solution that i thought was to keep a 
> CHM of instrumentid (key) and quote(value).
> And an event queue of "instrumentids". so lets say events comes as
> E1( I1, Q1)
> E2(I2, q1)
> E3(I1, Q2)
> Then CHM will have 2 entries. While the Queue will have 3 entries {I1, 
> I2, I1}.
> Now when consumer consumes I1 it removes I1's entry from CHM, so that 
> consumer can ignore the event
> I1 next time when it retrieves I1 from queue.
> This implementation would cause the event queue to grow indefinetely 
> as the consumer is slower.
> To avoid this , I can simply check if I1 is contained in the queue 
> before putting in queue but that would make things definetely very slow.
> I hope I have made things clearer now.
> Thanks,
> rohit
> ------------------------------------------------------------------------
> *From: * Aleksey Shipilev <aleksey.shipilev at gmail.com>;
> *To: * Rohit Reja <rreja2000 at yahoo.com>;
> *Cc: * <concurrency-interest at cs.oswego.edu>;
> *Subject: * Re: [concurrency-interest] Concurrent indexed queue
> *Sent: * Wed, Aug 24, 2011 3:28:23 PM
> Hi Rohit,
> Are you looking for efficient CircularBuffer implementation then? 
> Having one of those, you can then have per-instrument CircularBuffer 
> and poll each by "key", which will provide essential indexing.
> -Aleksey.
> On Wed, Aug 24, 2011 at 6:58 PM, Rohit Reja <rreja2000 at yahoo.com 
> <javascript:return>> wrote:
>     Hi,
>     I am working on a project where I need advise on designing
>     following functionality.
>     Producer threads read quotes on a set of instruments (5000
>     instruments) from a message bus
>     Which needs to be routed to a downstream system which can consume
>     messages at a slower rate
>     Than producers are producing messages. The catch is that only the
>     latest quotes need to be published
>     To the downstream system. So we can miss out on stale quotes. So
>     essentially we dont need to slow down the producers as the event
>     queue is bounded by the number of instruments.
>     I want to keep the design flexible as to allow multiple producers
>     and multiple consumers.
>     So in short I need a concurrent indexed queue where i can update
>     the entries in place. This is a latency sensitive application so
>     we would want latency to be as low as possible.
>     Please advise me for suitable data structures that can be of use.
>     Regards,
>     Rohit
>     ------------------------------------------------------------------------
>     *From: * Jeff Hain <jeffhain at rocketmail.com <javascript:return>>;
>     *To: * Doug Lea <dl at cs.oswego.edu <javascript:return>>;
>     *Cc: * <concurrency-interest at cs.oswego.edu <javascript:return>>;
>     *Subject: * [concurrency-interest] Re : Re : concurrent counter :
>     incrementAndGet
>     *Sent: * Sat, Aug 20, 2011 10:17:54 AM
>     I hand-emulated ">" replies style, which is handier :)
>     ------------------------------------------------------------------------
>     *>De :* Doug Lea <dl at cs.oswego.edu <javascript:return>>
>     *>À :* concurrency-interest at cs.oswego.edu <javascript:return>
>     *>Envoyé le :* Jeu 18 août 2011, 22h 05min 59s
>     *>Objet :* Re: [concurrency-interest] Re : concurrent counter :
>     incrementAndGet
>     >
>     >On 08/14/11 07:03, Jeff Hain wrote:
>     >>
>     >> That could work well for some cases indeed, but I'm working on
>     a sort
>     >> of Disruptor, using the counter with modulo to pick up slots to
>     write to in
>     >> a cyclic array, monotonically, or non-monotonically but for a
>     short time
>     >> (for readers not to be blocked on a not-yet-written slot), and
>     any writer
>     >> can stop to work anytime; in this case I don't see how that
>     could apply
>     >> easily.
>     >
>     >I don't think "non-monotonically but for a short time" makes
>     >this easier, since any violations are likely to be unbounded.
>        Violations are indeed unbounded, which could make for example
>     all producers
>     claim wrapping sequences (on entries which could not be written
>     for the time),
>     and let consumers stuck on not-yet-written entries (*).
>        Though, these violations are dealt with, as follows: when a
>     consumer encounters
>     a writable entry (for "current round"), and that the counter
>     already provided a higher
>     sequence to a producer (hence there will be something to eat
>     further), it tries to
>     CAS-it-up to being writable but for "next round" (i.e. sequence +=
>     buffer length),
>     so that consumers can pass on it and not be stuck (a producer can
>     write it just after,
>     it doesn't hurt, consumers will still just pass on it). In the
>     process, consumer also depletes
>     the counter of any lower sequence, to lower violations, and so
>     that no producer
>     subsequently tries to write on a CASed-up entry (if one does, i.e.
>     entry doesn't
>     have the expected sequence, it just picks another sequence).
>        At the time, on a core i7 X 980, when doing "no work" benches
>     (i.e. consumers
>     do nothing), using a concurrent counter instead of AtomicLong is
>     just a bit slower if
>     consumers are going faster than a few producers (due to the
>     CASing-up going on I guess),
>     but if producers go faster, i.e. if consumers have something to
>     eat most of the time,
>     I've seen up to 30 percent improvement with 8 producers and 1
>     consumer, and about
>     20 percent with anywhere between 32 and 128 producers.
>        (My code is still a mess of comments and obsolete code, or dead
>     debugging code,
>     so I can't attach it, but there's not much more in it than what I
>     said.)
>        (*) The "Disruptor" I use is "unicast", and producers and
>     consumers don't spy
>     on each other (which doesn't scale well), but only look at entries
>     sequence and status
>     (writable,being written,readable,being read), both stored in a
>     same long.
>     >You might be able to live with thread local random number generators,
>     >so that in the long run on average you are balanced?
>        I tried to use random (Marsaglia XorShift) in the concurrent
>     counter,
>     but it was actually slower than just doing "++index" for the next
>     cell to
>     CAS on (it's an array of sub-counters, each incremented by the number
>     of sub-counters each time). If CAS fails, I try another cell.
>     Also, I actually
>     don't do "++index", but "index = index + 1 + nbrOfFailedCAS" (with
>     mask),
>     which seems to help to get away from contention.
>        Also, as I said, balancing is dealt with, but the counter also has
>     a method that ensures monotonicity (for the thread-local, or
>     non-thread-safe-view-instance-local, data structure that holds
>     previous
>     index and highest returned value). It is a bit slower, but it tends
>     to lower unbalancing, since when it gets a value lower than the
>     highest previously returned one, it tries to CAS again on the same
>     cell,
>     making it catch-up.
>     >Otherwise, you have a version of the Counting problem described
>     >in Herlihy & Shavit, for which all known scalable solutions
>     >are expensive, complicated, and difficult to package as
>     >j.u.c components. Although there are some special cases like
>     >SNZI for detecting when counts hit the particular value of zero.
>        Thanks, I didn't know about these. I've tried Shavit & others's
>     flat-combining method though for the counter, but it was too slow.
>     >
>     >(This seems to be the main bottleneck in Disruptor-like designs.)
>        Yes, that's why I started to focus on it. I think it's the only
>     part of this disruptor
>     that is potentially proportionnal-or-worse to the number of
>     threads (one could think
>     of ring buffer scanning by consumers too, but it should be
>     possible to use
>     modulo so that each entry doesn't get scanned by all consumers).
>     >
>     >-Doug
>     -Jeff
>     _______________________________________________
>     Concurrency-interest mailing list
>     Concurrency-interest at cs.oswego.edu
>     http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>     _______________________________________________
>     Concurrency-interest mailing list
>     Concurrency-interest at cs.oswego.edu <javascript:return>
>     http://cs.oswego.edu/mailman/listinfo/concurrency-interest
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20110824/18c83196/attachment-0001.html>

More information about the Concurrency-interest mailing list