[concurrency-interest] Concurrent indexed queue

Jeff Schultz jws at csse.unimelb.edu.au
Wed Aug 24 20:45:32 EDT 2011

On Wed, Aug 24, 2011 at 09:10:02AM -0700, Rohit Reja wrote:
>  To clarify a bit further, I dont need to keep any history of
> quotes, I just need to keep the latest quote for an instrument. 

> So for e.g. One of the possible solution that i thought was to keep
> a CHM of instrumentid (key) and quote(value).

> And an event queue of "instrumentids". so lets say events comes as 
> E1( I1, Q1)
> E2(I2, q1)
> E3(I1, Q2)

> Then CHM will have 2 entries. While the Queue will have 3 entries
> {I1, I2, I1}.  Now when consumer consumes I1 it removes I1's entry
> from CHM, so that consumer can ignore the event I1 next time when it
> retrieves I1 from queue. 

This does mean that more than one consumer can be processing quotes
for the same instrument.  Does that matter?

> This implementation would cause the event queue to grow indefinetely
> as the consumer is slower.

> To avoid this , I can simply check if I1 is contained in the queue
> before putting in queue but that would make things definetely very
> slow.

It looks like what you want is a collection of "slots", one for each
ID, which either contain an unprocessed quote or are empty, along with
a queue of IDs where, for each ID with a non-empty corresponding slot,
either the queue contains the ID or a consumer is "about to" process
the ID.

You don't need to inspect the queue to see if the new quote will be
processed by a previously queued entry, just check if the slot is

This can be done with a collection of AtomicReference.  Make some form
of mapping from ID to AtomicReference<Quote> and a BlockingQueue<ID>.

To publish a new (id, quote):

    look up the AtomicReference ref for id
    old = ref.getAndSet(quote)
    if(old == null)
        put id in the queue

To consume a quote:

    take an id from the queue
    look up the AtomicReference ref for id
    quote = ref.getAndSet(null)
    if(quote != null)
        process the quote

(As there is a one-to-one relationship between IDs and the
AtomicReferences, one could just queue the latter and save the
consumer the lookup.)

This has the same potential problem of multiple consumers processing
different quotes for the one instrument, of course.

    Jeff Schultz

More information about the Concurrency-interest mailing list