[concurrency-interest] MRULinkedBlockingQueue

Szabolcs Ferenczi szabolcs.ferenczi at gmail.com
Wed Apr 18 15:47:18 EDT 2007


Hi Hanson,

I think it is an unpredictable construction. I will try to explain a
possible scenario.

Lets take a two elements buffer from this type. Thread T1 calls
add(item1) and at the same time, but shortly after T1, T2 calls
add(item2). Both starts nextpos() and obtains the current value 0 from
pos. Now either T1 or T2 can complete compareAndSet(0,1)
successfully. Let us say, T1 wins but it is delayed with the return. In the
meantime T3 comes with add(item3) and competes with T2 the same way
with compareAndSet(1,0). Lets say, T3 wins and T3 is also delayed with
the return. Now, T2 can update the counter with compareAndSet(0,1). At
this moment, T1 is about to return with 0, T3 with 1 and T2 with
0. These return values are array indexes. So the three threads are
competing for the atomic update a.getAndSet(x,y). The possible results
where the buffer can end up are:

[item1, item3] and subsequently [item2, item3] or
[item2, item3] and subsequently [item1, item3]

Remember, the calling sequence was item1, item2, item3. The final
content of the buffer is non deterministic, however.

Do you think this may happen with this buffer?

Just asking.

Best Regards,
Szabolcs

On 18/04/07, Hanson Char <hanson.char at gmail.com> wrote:
> Are you trying to have a thread-safe bounded buffer that keeps only
> the latest information ?  If so, you can consider the use of an
> AtomicReferenceArray for implementing the bounded buffer, together
> with an AtomicInteger for the current index pointing to the next slot
> to put the latest element.  As long as the nextPosition operation
> returns an index modulus the length of the array, you got the effect
> of a circular buffer.
>
> For retrieval of the latest info (as a whole), just iterate through
> the entire array ignoring the empty (ie null) slots.  Is that good
> enough for your use case ?
>
> Thread-safe, concurrent (ie lock free).
>
> See CircularBuffer.java below.
>
> Hanson Char
>
> import java.util.Iterator;
> import java.util.concurrent.atomic.AtomicInteger;
> import java.util.concurrent.atomic.AtomicReferenceArray;
>
> public class CircularBuffer<T> implements Iterable<T>
> {
>     private final AtomicReferenceArray<T> a;
>     private final AtomicInteger pos = new AtomicInteger();
>
>     public CircularBuffer(int size)
>     {
>         a = new AtomicReferenceArray<T>(size);
>     }
>
>     public T add(T ele) {
>         T ret = a.getAndSet(
>                     nextpos(), ele);
>         return ret;
>     }
>
>     public int size() {
>         return a.length();
>     }
>
>     public Iterator<T> iterator()
>     {
>         return new Iterator<T>()
>         {
>             private int cur;
>
>             public boolean hasNext()
>             {
>                 return cur < a.length()
>                     && a.get(cur) != null;
>             }
>
>             public T next() {
>                 if (cur < a.length())
>                     return a.get(cur++);
>                 return null;
>             }
>
>             public void remove() {
>                 throw new UnsupportedOperationException("remove not supported");
>             }
>         };
>     }
>
>     private int nextpos()
>     {
>         for (;;) {
>             final int current = pos.get();
>             final int newValue = (current + 1) % a.length();
>
>             if (pos.compareAndSet(current, newValue))
>                 return current;
>         }
>     }
> }
>
>
> On 4/18/07, Mike Quilleash <mike.quilleash at subexazure.com> wrote:
> >
> >
> > Hi all,
> >
> > Has anyone seen or comes across an collection implementation that has the
> > following characteristics.
> >
> > 1) Concurrent
> > 2) Maintains insertion order (for iteration)
> > 3) Capacity bound
> > 4) An add() that would cause the capacity to exceed the limit would remove
> > the oldest element and add a new one.
> >
> > I thought of the following extension to LinkedBlockingQueue which simply
> > loops in add() trying to offer() and removing an element from the queue if
> > the offer() fails.
> >
> >
> > public class MRULinkedBlockingQueue< T > extends LinkedBlockingQueue< T >
> > {
> >     public MRULinkedBlockingQueue( int capacity )
> >     {
> >         super( capacity );
> >     }
> >
> >     @Override
> >     public boolean add( T o )
> >     {
> >         for ( ;; )
> >         {
> >             // try and offer the element
> >             if ( offer( o ) )
> >                 return true;
> >
> >             // remove the head of the queue to make room if the offer
> > failed.
> >             poll();
> >         }
> >     }
> > }
> >
> >
> > Any comments appreciated.
> >
> >
> >   This e-mail is bound by the terms and conditions described at
> > http://www.subexazure.com/mail-disclaimer.html
> >
> >
> >
> > _______________________________________________
> > Concurrency-interest mailing list
> > Concurrency-interest at altair.cs.oswego.edu
> > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> >
> >
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>


More information about the Concurrency-interest mailing list