[concurrency-interest] Concurrent indexed queue
Ben Manes
ben_manes at yahoo.com
Wed Aug 24 14:59:36 EDT 2011
If you do not need to maintain write-order processing of the queue, then only the map is required. The value could maintain a flag so that when a consumer scans the map it can skip previously processed entries. At a size of 5,000 entries, the map is probably small enough to make this reasonably fast despite being brute force.
Another approach is to maintain a write-ordered queue that producers periodically prune. If queue exceeds a soft bounding then a producer can tryLock to exclusively prune, scan it to discard stale events, and perhaps bound the work to avoid an excessive penalty. This would effectively amortize the clean-up on the producer threads.
A more complicated variant would be to place the linkage pointers onto the values to allow O(1) removals or reorderings. The producers can add tasks to buffer(s) to allow the queue to be caught-up in a non-blocking fashion. When the tryLock is acquired by either a producer or consumer the buffers are drained to bring the queue up to date. The consumer would have the additional task of draining the queue when processing the entries. This would maintain write ordering, avoid unbounded growth, and maintain O(1) efficiency. However, its probably more complicated than needed in this situation as the design is styled based on a concurrent LinkedHashMap.
________________________________
From: Nathan Reynolds <nathan.reynolds at oracle.com>
To: Rohit Reja <rreja2000 at yahoo.com>
Cc: Concurrency <concurrency-interest at cs.oswego.edu>
Sent: Wednesday, August 24, 2011 9:33 AM
Subject: Re: [concurrency-interest] Concurrent indexed queue
A CHM will work but there might be a lighter-weight approach. But the lighter-weight approach requires that all of the instruments are known ahead of time and the producers simply put() the latest quote.
If these constraints are allowed, consider filling a HashMap with
each instrument and a null value or a dummy quote. As data new
quotes arrive, simply put(I, Q). Consumers will then simply get(I).
Outside the HashMap, all producers have to set a volatile variable
(e.g. an int) to some constant (e.g. 0) after the put() call. All
consumers have to read the same volatile variable. The volatile
variable ensures that the JVM will insert any necessary memory
barriers so that consumers will always see the latest quote. If
consumers are allowed to see slightly older quotes even though a
newer one is in the HashMap, then the volatile variable is not
necessary.
Nathan Reynolds | Consulting Member of Technical Staff | 602.333.9091
Oracle PSR Engineering | Server Technology
On 8/24/2011 9:10 AM, Rohit Reja wrote:
Hi Aleksey,
>
>To clarify a bit further, I dont need to keep any
history of quotes, I just need to keep the latest quote
for an instrument.
>
>So for e.g. One of the possible solution that i thought
was to keep a CHM of instrumentid (key) and
quote(value).
>And an event queue of "instrumentids". so lets say
events comes as
>E1( I1, Q1)
>E2(I2, q1)
>E3(I1, Q2)
>
>Then CHM will have 2 entries. While the Queue will have
3 entries {I1, I2, I1}.
>Now when consumer consumes I1 it removes I1's entry from
CHM, so that consumer can ignore the event
>I1 next time when it retrieves I1 from queue.
>
>This implementation would cause the event queue to grow
indefinetely as the consumer is slower.
>To avoid this , I can simply check if I1 is contained in
the queue before putting in queue but that would make
things definetely very slow.
>
>I hope I have made things clearer now.
>
>Thanks,
>rohit
>
>
>
>________________________________
> From: Aleksey Shipilev <aleksey.shipilev at gmail.com>;
>To: Rohit Reja <rreja2000 at yahoo.com>;
>Cc: <concurrency-interest at cs.oswego.edu>;
>Subject: Re: [concurrency-interest] Concurrent indexed queue
>Sent: Wed, Aug 24, 2011 3:28:23 PM
>
>
>Hi Rohit,
>
>
>Are you looking for efficient CircularBuffer implementation then? Having one of those, you can then have per-instrument CircularBuffer and poll each by "key", which will provide essential indexing.
>
>
>-Aleksey.
>
>
>On Wed, Aug 24, 2011 at 6:58 PM, Rohit Reja <rreja2000 at yahoo.com> wrote:
>
>Hi,
>>
>>I am working on a project where I
need advise on designing following
functionality.
>>
>>Producer threads read quotes on a
set of instruments (5000
instruments) from a message bus
>>Which needs to be routed to a
downstream system which can consume
messages at a slower rate
>>Than producers are producing
messages. The catch is that only the
latest quotes need to be published
>>To the downstream system. So we can
miss out on stale quotes. So
essentially we dont need to slow
down the producers as the event
queue is bounded by the number of
instruments.
>>
>>I want to keep the design flexible
as to allow multiple producers and
multiple consumers.
>>So in short I need a concurrent
indexed queue where i can update the
entries in place. This is a latency
sensitive application so we would
want latency to be as low as
possible.
>>
>>Please advise me for suitable data
structures that can be of use.
>>Regards,
>>Rohit
>>
>>
>>
>>
>>
>>
>>
>>________________________________
>> From: Jeff Hain <jeffhain at rocketmail.com>;
>>To: Doug Lea <dl at cs.oswego.edu>;
>>Cc: <concurrency-interest at cs.oswego.edu>;
>>Subject: [concurrency-interest] Re : Re : concurrent counter : incrementAndGet
>>Sent: Sat, Aug 20, 2011 10:17:54 AM
>>
>>
>>I hand-emulated ">" replies style, which is handier :)
>>
>>
>>________________________________
>>>De : Doug Lea <dl at cs.oswego.edu>
>>>À : concurrency-interest at cs.oswego.edu
>>>Envoyé le : Jeu 18 août 2011, 22h 05min 59s
>>>Objet : Re: [concurrency-interest] Re : concurrent counter : incrementAndGet
>>>
>>>On 08/14/11 07:03, Jeff
Hain wrote:
>>>>
>>>> That could work well
for some cases indeed, but I'm
working on a sort
>>>> of Disruptor, using
the counter with modulo to
pick up slots to write to in
>>>> a cyclic array,
monotonically, or
non-monotonically but for a
short time
>>>> (for readers not to
be blocked on a
not-yet-written slot), and any
writer
>>>> can stop to work
anytime; in this case I don't
see how that could apply
>>>> easily.
>>>
>>>I don't think
"non-monotonically but for a
short time" makes
>>>this easier, since any
violations are likely to be
unbounded.
>>
>> Violations are indeed
unbounded, which could make
for example all producers
>>claim wrapping sequences (on
entries which could not be
written for the time),
>>and let consumers stuck on
not-yet-written entries (*).
>> Though, these violations
are dealt with, as follows:
when a consumer encounters
>>a writable entry (for "current
round"), and that the counter
already provided a higher
>>sequence to a producer (hence
there will be something to eat
further), it tries to
>>CAS-it-up to being writable
but for "next round" (i.e.
sequence += buffer length),
>>so that consumers can pass on
it and not be stuck (a
producer can write it just
after,
>>it doesn't hurt, consumers
will still just pass on it).
In the process, consumer also
depletes
>>the counter of any lower
sequence, to lower violations,
and so that no producer
>>subsequently tries to write on
a CASed-up entry (if one does,
i.e. entry doesn't
>>have the expected sequence, it
just picks another sequence).
>> At the time, on a core i7 X
980, when doing "no work"
benches (i.e. consumers
>>do nothing), using a
concurrent counter instead of
AtomicLong is just a bit
slower if
>>consumers are going faster
than a few producers (due to
the CASing-up going on I
guess),
>>but if producers go faster,
i.e. if consumers have
something to eat most of the
time,
>>I've seen up to 30 percent
improvement with 8 producers
and 1 consumer, and about
>>20 percent with anywhere
between 32 and 128 producers.
>> (My code is still a mess of
comments and obsolete code, or
dead debugging code,
>>so I can't attach it, but
there's not much more in it
than what I said.)
>>
>> (*) The "Disruptor" I use
is "unicast", and producers
and consumers don't spy
>>on each other (which doesn't
scale well), but only look at
entries sequence and status
>>(writable,being
written,readable,being read),
both stored in a same long.
>>
>>>You might be able to live
with thread local random
number generators,
>>>so that in the long run on
average you are balanced?
>>
>> I tried to use random
(Marsaglia XorShift) in the
concurrent counter,
>>but it was actually slower
than just doing "++index" for
the next cell to
>>CAS on (it's an array of
sub-counters, each incremented
by the number
>>of sub-counters each time). If
CAS fails, I try another cell.
Also, I actually
>>don't do "++index", but "index
= index + 1 + nbrOfFailedCAS"
(with mask),
>>which seems to help to get
away from contention.
>> Also, as I said, balancing
is dealt with, but the counter
also has
>>a method that ensures
monotonicity (for the
thread-local, or
>>non-thread-safe-view-instance-local,
data structure that holds
previous
>>index and highest returned
value). It is a bit slower,
but it tends
>>to lower unbalancing, since
when it gets a value lower
than the
>>highest previously returned
one, it tries to CAS again on
the same cell,
>>making it catch-up.
>>
>>>Otherwise, you have a
version of the Counting
problem described
>>>in Herlihy & Shavit,
for which all known scalable
solutions
>>>are expensive,
complicated, and difficult to
package as
>>>j.u.c components. Although
there are some special cases
like
>>>SNZI for detecting when
counts hit the particular
value of zero.
>>
>> Thanks, I didn't know about
these. I've tried Shavit &
others's
>>flat-combining method though
for the counter, but it was
too slow.
>>
>>>
>>>(This seems to be the main
bottleneck in Disruptor-like
designs.)
>>
>> Yes, that's why I started
to focus on it. I think it's
the only part of this
disruptor
>>that is potentially
proportionnal-or-worse to the
number of threads (one could
think
>>of ring buffer scanning by
consumers too, but it should
be possible to use
>>modulo so that each entry
doesn't get scanned by all
consumers).
>>
>>>
>>>-Doug
>>
>>-Jeff
>>
>>_______________________________________________
>>Concurrency-interest
mailing list
>>Concurrency-interest at cs.oswego.edu
>>http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>>_______________________________________________
>>Concurrency-interest mailing list
>>Concurrency-interest at cs.oswego.edu
>>http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>>
>
>
>
>_______________________________________________
Concurrency-interest mailing list Concurrency-interest at cs.oswego.edu http://cs.oswego.edu/mailman/listinfo/concurrency-interest
_______________________________________________
Concurrency-interest mailing list
Concurrency-interest at cs.oswego.edu
http://cs.oswego.edu/mailman/listinfo/concurrency-interest
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20110824/c69215d3/attachment-0001.html>
More information about the Concurrency-interest
mailing list