[concurrency-interest] MRULinkedBlockingQueue

Mike Quilleash mike.quilleash at subexazure.com
Wed Apr 18 16:25:23 EDT 2007


You're mostly correct, it's about only having the most recent x items
available as this structure will be reasonably heavily travelled over a
period of time so I don't want it to balloon but retrieval order is
important to get the current entries in the order they were inserted.

Appreicate the suggestions though :)

Cheers.

Mike.

-----Original Message-----
From: Hanson Char [mailto:hanson.char at gmail.com] 
Sent: 18 April 2007 21:18
To: Mike Quilleash 
Cc: Concurrency-interest at cs.oswego.edu
Subject: Re: [concurrency-interest] MRULinkedBlockingQueue

>I don't think the iterator returns them in insertion order,

No the iterator isn't designed to return the elements in insertion
order.  I was just speculating your requirement is probably just about
getting a snapshot of the buffer containing the latest info of some
sort.  In that case you wouldn't care too much about the order among
the latest, as long as they are the latest.   But my speculation is
probably wrong :)

Cheers,
Hanson Char

On 4/18/07, Mike Quilleash <mike.quilleash at subexazure.com> wrote:
> Thanks for the replies.
>
> That's an interesting idea, I hadn't considered using the Atomic 
> stuff, not much experience with it beyond using AtomicInteger for
counting.
>
> The only problem I see for my use case is that I don't think the 
> iterator returns them in insertion order, it will work for the first 
> "capacity" inserts and then once the array starts to wrap it will 
> return them in the array order starting from index 0.  Perhaps 
> changing the iterator to start at the current position and wrap around

> at most once until current position == start position?
>
> @Szabolcs - I don't see any data race problems.  I think your scenario

> goes like this...
>
> T1 - Calls add() enters nextPos(), gets current = 0 and newValue = 1
> T2 - ditto
> T1 - compareAndSet( 0, 1 ), succeeds and returns 0.
> T3 - Calls add() enters nextPos(), gets current = 1 and newValue = 2
> T3 - compareAndSet( 1, 2 ), succeeds and returns 1.
> T2 - compareAndSet( 0, 1 ), fails and loops.
> T2 - current = 2 and newValue = 3
> T2 - compareAndSet( 2, 3 ), succeeds and return 2.
>
> Unless I'm missing or misunderstanding something.
>
> So:
> T1 = 0
> T3 = 1
> T2 = 2
>
> All distinct values, not in order of calling add(), but this is an 
> unfair structure (fine for my use-case).  If any thread calls
> compareAndSet() all other threads that have stored a current value 
> will now fail their call to compareAndSet() and be forced to refetch 
> current and try again.
>
> Cheers.
>
> Mike.
>
> -----Original Message-----
> From: Hanson Char [mailto:hanson.char at gmail.com]
> Sent: 18 April 2007 19:36
> To: Mike Quilleash
> Cc: Concurrency-interest at cs.oswego.edu
> Subject: Re: [concurrency-interest] MRULinkedBlockingQueue
>
> Are you trying to have a thread-safe bounded buffer that keeps only 
> the latest information ?  If so, you can consider the use of an 
> AtomicReferenceArray for implementing the bounded buffer, together 
> with an AtomicInteger for the current index pointing to the next slot 
> to put the latest element.  As long as the nextPosition operation 
> returns an index modulus the length of the array, you got the effect 
> of a circular buffer.
>
> For retrieval of the latest info (as a whole), just iterate through 
> the entire array ignoring the empty (ie null) slots.  Is that good 
> enough for your use case ?
>
> Thread-safe, concurrent (ie lock free).
>
> See CircularBuffer.java below.
>
> Hanson Char
>
> import java.util.Iterator;
> import java.util.concurrent.atomic.AtomicInteger;
> import java.util.concurrent.atomic.AtomicReferenceArray;
>
> public class CircularBuffer<T> implements Iterable<T> {
>     private final AtomicReferenceArray<T> a;
>     private final AtomicInteger pos = new AtomicInteger();
>
>     public CircularBuffer(int size)
>     {
>         a = new AtomicReferenceArray<T>(size);
>     }
>
>     public T add(T ele) {
>         T ret = a.getAndSet(
>                     nextpos(), ele);
>         return ret;
>     }
>
>     public int size() {
>         return a.length();
>     }
>
>     public Iterator<T> iterator()
>     {
>         return new Iterator<T>()
>         {
>             private int cur;
>
>             public boolean hasNext()
>             {
>                 return cur < a.length()
>                     && a.get(cur) != null;
>             }
>
>             public T next() {
>                 if (cur < a.length())
>                     return a.get(cur++);
>                 return null;
>             }
>
>             public void remove() {
>                 throw new UnsupportedOperationException("remove not 
> supported");
>             }
>         };
>     }
>
>     private int nextpos()
>     {
>         for (;;) {
>             final int current = pos.get();
>             final int newValue = (current + 1) % a.length();
>
>             if (pos.compareAndSet(current, newValue))
>                 return current;
>         }
>     }
> }
>
>
> On 4/18/07, Mike Quilleash <mike.quilleash at subexazure.com> wrote:
> >
> >
> > Hi all,
> >
> > Has anyone seen or comes across an collection implementation that 
> > has the following characteristics.
> >
> > 1) Concurrent
> > 2) Maintains insertion order (for iteration)
> > 3) Capacity bound
> > 4) An add() that would cause the capacity to exceed the limit would 
> > remove the oldest element and add a new one.
> >
> > I thought of the following extension to LinkedBlockingQueue which 
> > simply loops in add() trying to offer() and removing an element from

> > the queue if the offer() fails.
> >
> >
> > public class MRULinkedBlockingQueue< T > extends 
> > LinkedBlockingQueue< T > {
> >     public MRULinkedBlockingQueue( int capacity )
> >     {
> >         super( capacity );
> >     }
> >
> >     @Override
> >     public boolean add( T o )
> >     {
> >         for ( ;; )
> >         {
> >             // try and offer the element
> >             if ( offer( o ) )
> >                 return true;
> >
> >             // remove the head of the queue to make room if the 
> > offer failed.
> >             poll();
> >         }
> >     }
> > }
> >
> >
> > Any comments appreciated.
> >
> >
> >   This e-mail is bound by the terms and conditions described at 
> > http://www.subexazure.com/mail-disclaimer.html
> >
> >
> >
> > _______________________________________________
> > Concurrency-interest mailing list
> > Concurrency-interest at altair.cs.oswego.edu
> > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> >
> >
>
>
>  This e-mail is bound by the terms and conditions described at 
> http://www.subexazure.com/mail-disclaimer.html
>
>


 This e-mail is bound by the terms and conditions described at http://www.subexazure.com/mail-disclaimer.html




More information about the Concurrency-interest mailing list