[concurrency-interest] MRULinkedBlockingQueue

Hanson Char hanson.char at gmail.com
Wed Apr 18 14:36:21 EDT 2007


Are you trying to have a thread-safe bounded buffer that keeps only
the latest information ?  If so, you can consider the use of an
AtomicReferenceArray for implementing the bounded buffer, together
with an AtomicInteger for the current index pointing to the next slot
to put the latest element.  As long as the nextPosition operation
returns an index modulus the length of the array, you got the effect
of a circular buffer.

For retrieval of the latest info (as a whole), just iterate through
the entire array ignoring the empty (ie null) slots.  Is that good
enough for your use case ?

Thread-safe, concurrent (ie lock free).

See CircularBuffer.java below.

Hanson Char

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;

public class CircularBuffer<T> implements Iterable<T>
{
    private final AtomicReferenceArray<T> a;
    private final AtomicInteger pos = new AtomicInteger();

    public CircularBuffer(int size)
    {
        a = new AtomicReferenceArray<T>(size);
    }

    public T add(T ele) {
        T ret = a.getAndSet(
                    nextpos(), ele);
        return ret;
    }

    public int size() {
        return a.length();
    }

    public Iterator<T> iterator()
    {
        return new Iterator<T>()
        {
            private int cur;

            public boolean hasNext()
            {
                return cur < a.length()
                    && a.get(cur) != null;
            }

            public T next() {
                if (cur < a.length())
                    return a.get(cur++);
                return null;
            }

            public void remove() {
                throw new UnsupportedOperationException("remove not supported");
            }
        };
    }

    private int nextpos()
    {
        for (;;) {
            final int current = pos.get();
            final int newValue = (current + 1) % a.length();

            if (pos.compareAndSet(current, newValue))
                return current;
        }
    }
}


On 4/18/07, Mike Quilleash <mike.quilleash at subexazure.com> wrote:
>
>
> Hi all,
>
> Has anyone seen or comes across an collection implementation that has the
> following characteristics.
>
> 1) Concurrent
> 2) Maintains insertion order (for iteration)
> 3) Capacity bound
> 4) An add() that would cause the capacity to exceed the limit would remove
> the oldest element and add a new one.
>
> I thought of the following extension to LinkedBlockingQueue which simply
> loops in add() trying to offer() and removing an element from the queue if
> the offer() fails.
>
>
> public class MRULinkedBlockingQueue< T > extends LinkedBlockingQueue< T >
> {
>     public MRULinkedBlockingQueue( int capacity )
>     {
>         super( capacity );
>     }
>
>     @Override
>     public boolean add( T o )
>     {
>         for ( ;; )
>         {
>             // try and offer the element
>             if ( offer( o ) )
>                 return true;
>
>             // remove the head of the queue to make room if the offer
> failed.
>             poll();
>         }
>     }
> }
>
>
> Any comments appreciated.
>
>
>   This e-mail is bound by the terms and conditions described at
> http://www.subexazure.com/mail-disclaimer.html
>
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>


More information about the Concurrency-interest mailing list