[concurrency-interest] MRULinkedBlockingQueue

Mike Quilleash mike.quilleash at subexazure.com
Wed Apr 18 16:06:05 EDT 2007


Thanks for the replies.

That's an interesting idea, I hadn't considered using the Atomic stuff,
not much experience with it beyond using AtomicInteger for counting.

The only problem I see for my use case is that I don't think the
iterator returns them in insertion order, it will work for the first
"capacity" inserts and then once the array starts to wrap it will return
them in the array order starting from index 0.  Perhaps changing the
iterator to start at the current position and wrap around at most once
until current position == start position?

@Szabolcs - I don't see any data race problems.  I think your scenario
goes like this...

T1 - Calls add() enters nextPos(), gets current = 0 and newValue = 1
T2 - ditto
T1 - compareAndSet( 0, 1 ), succeeds and returns 0.
T3 - Calls add() enters nextPos(), gets current = 1 and newValue = 2
T3 - compareAndSet( 1, 2 ), succeeds and returns 1.
T2 - compareAndSet( 0, 1 ), fails and loops.
T2 - current = 2 and newValue = 3
T2 - compareAndSet( 2, 3 ), succeeds and return 2.

Unless I'm missing or misunderstanding something.

So:
T1 = 0
T3 = 1
T2 = 2

All distinct values, not in order of calling add(), but this is an
unfair structure (fine for my use-case).  If any thread calls
compareAndSet() all other threads that have stored a current value will
now fail their call to compareAndSet() and be forced to refetch current
and try again.

Cheers.

Mike.

-----Original Message-----
From: Hanson Char [mailto:hanson.char at gmail.com] 
Sent: 18 April 2007 19:36
To: Mike Quilleash 
Cc: Concurrency-interest at cs.oswego.edu
Subject: Re: [concurrency-interest] MRULinkedBlockingQueue

Are you trying to have a thread-safe bounded buffer that keeps only the
latest information ?  If so, you can consider the use of an
AtomicReferenceArray for implementing the bounded buffer, together with
an AtomicInteger for the current index pointing to the next slot to put
the latest element.  As long as the nextPosition operation returns an
index modulus the length of the array, you got the effect of a circular
buffer.

For retrieval of the latest info (as a whole), just iterate through the
entire array ignoring the empty (ie null) slots.  Is that good enough
for your use case ?

Thread-safe, concurrent (ie lock free).

See CircularBuffer.java below.

Hanson Char

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;

public class CircularBuffer<T> implements Iterable<T> {
    private final AtomicReferenceArray<T> a;
    private final AtomicInteger pos = new AtomicInteger();

    public CircularBuffer(int size)
    {
        a = new AtomicReferenceArray<T>(size);
    }

    public T add(T ele) {
        T ret = a.getAndSet(
                    nextpos(), ele);
        return ret;
    }

    public int size() {
        return a.length();
    }

    public Iterator<T> iterator()
    {
        return new Iterator<T>()
        {
            private int cur;

            public boolean hasNext()
            {
                return cur < a.length()
                    && a.get(cur) != null;
            }

            public T next() {
                if (cur < a.length())
                    return a.get(cur++);
                return null;
            }

            public void remove() {
                throw new UnsupportedOperationException("remove not
supported");
            }
        };
    }

    private int nextpos()
    {
        for (;;) {
            final int current = pos.get();
            final int newValue = (current + 1) % a.length();

            if (pos.compareAndSet(current, newValue))
                return current;
        }
    }
}


On 4/18/07, Mike Quilleash <mike.quilleash at subexazure.com> wrote:
>
>
> Hi all,
>
> Has anyone seen or comes across an collection implementation that has 
> the following characteristics.
>
> 1) Concurrent
> 2) Maintains insertion order (for iteration)
> 3) Capacity bound
> 4) An add() that would cause the capacity to exceed the limit would 
> remove the oldest element and add a new one.
>
> I thought of the following extension to LinkedBlockingQueue which 
> simply loops in add() trying to offer() and removing an element from 
> the queue if the offer() fails.
>
>
> public class MRULinkedBlockingQueue< T > extends LinkedBlockingQueue< 
> T > {
>     public MRULinkedBlockingQueue( int capacity )
>     {
>         super( capacity );
>     }
>
>     @Override
>     public boolean add( T o )
>     {
>         for ( ;; )
>         {
>             // try and offer the element
>             if ( offer( o ) )
>                 return true;
>
>             // remove the head of the queue to make room if the offer 
> failed.
>             poll();
>         }
>     }
> }
>
>
> Any comments appreciated.
>
>
>   This e-mail is bound by the terms and conditions described at 
> http://www.subexazure.com/mail-disclaimer.html
>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>


 This e-mail is bound by the terms and conditions described at http://www.subexazure.com/mail-disclaimer.html




More information about the Concurrency-interest mailing list