[concurrency-interest] thoughts on ordered, concurrent locking

Peter Veentjer alarmnummer at gmail.com
Sun Mar 8 17:18:24 EDT 2009


Hi Taylor,

Although it not an answer to your question, this issue could be solved
with software transactional memory: it makes composing atomic
operations a lot easier.

I'm currently working on an implementation and it is nice to see how
some problems are so much easier to solve. I use multiversion
concurrency control with automatic transaction level read consistency
(the oracle version of serialised isolation level) and violations of
isolation are not allowed (although the oracle serialised isolation
level can result in different results than allowed by serialised
execution of transactions).

This is one of the tests. There is a lot of clutter that can be solved
with instrumentation and annotations, but that is besides the point.
Below this test there is another one with waiting on multiple queues
(also a hard problem with classic concurrency control).

/**
 * This test makes sure that all transaction execute isolated from
other transactions
 * <p/>
 * The Test: a shared integer value that is increased by a
modification thread. When the transaction begins,
 * it is increased (so the value can't be divided by 2) there will be
a delay, another increase (so that
 * the value can be divided by 2) and the transaction commits. Another
observing thread that looks at this
 * value should never see a value that can't be divided by 2.
 *
 * @author Peter Veentjer.
 */
public class IsolatedBehaviorTest {

    private MultiversionedStm stm;
    private DefaultMultiversionedHeap heap;
    private long handle;
    private int modifyCount = 300;

    private AtomicInteger modifyCountDown = new AtomicInteger();

    @Before
    public void setUp() {
        heap = new DefaultMultiversionedHeap();
        stm = new MultiversionedStm(heap);
        handle = TestUtils.commit(stm, new IntegerValue());
    }

    @After
    public void tearDown() {
        System.out.println(stm.getStatistics());
        System.out.println(heap.getStatistics());
    }

    @Test
    public void test() {
        modifyCountDown.set(modifyCount);

        ModifyThread modifyThread = new ModifyThread(1);
        ObserveThread observeThread = new ObserveThread(1);

        startAll(modifyThread, observeThread);
        joinAll(modifyThread, observeThread);
    }

    class ModifyThread extends TestThread {
        public ModifyThread(int threadId) {
            super("ModifyThread-" + threadId);
        }

        @Override
        public void run() {
            while (modifyCountDown.decrementAndGet() > 0) {
                new TransactionTemplate(stm) {
                    @Override
                    protected Object execute(Transaction t) throws Exception {
                        IntegerValue value = (IntegerValue) t.read(handle);
                        value.inc();

                        sleepRandomMs(50);

                        value.inc();
                        return null;
                    }
                }.execute();

                sleepRandomMs(1);
            }
        }
    }

    class ObserveThread extends TestThread {
        public ObserveThread(int threadId) {
            super("ObserveThread-" + threadId);
        }

        @Override
        public void run() {
            while (modifyCountDown.get() > 0) {
                new TransactionTemplate(stm) {
                    @Override
                    protected Object execute(Transaction t) throws Exception {
                        IntegerValue value = (IntegerValue) t.read(handle);
                        if (value.get() % 2 != 0)
                            fail();

                        return null;
                    }
                }.execute();

                sleepRandomMs(1);
            }
        }
    }
}

Other problems like listening to multiple queues for example also is very easy:

/**
 * The test checks if a wait can be done on multiple condition
variables. Normally with a BlockingQueue you can only
 * block on 1 queue, if you have multiple queues, waiting on an update
on one of them is not possible, because each
 * queue has his own waitset.
 * <p/>
 * With STM's this limitation doesn't exist. This tests checks if that
works. It does that be creating a
 * bunch of queues and a producer threads random places items in of
the queues, and a consumer thread that
 * waits on the availability of an item on all queues.
 *
 * @author Peter Veentjer.
 */
public class WaitOnMultiConditionVariableTest {

    private MultiversionedStm stm;
    private long[] queues;
    private DefaultMultiversionedHeap heap;

    private int queueCount = 10;
    private int produceCount = 2000;
    private int delayMs = 5;

    @Before
    public void setUp() {
        heap = new DefaultMultiversionedHeap();
        stm = new MultiversionedStm(heap);
    }

    @After
    public void teatDown() {
        System.out.println(stm.getStatistics());
        System.out.println(heap.getStatistics());
    }

    @Test
    public void test() throws InterruptedException {
        queues = createQueues(queueCount);

        ProducerThread producerThread = new ProducerThread();
        producerThread.start();

        ConsumerThread consumerThread = new ConsumerThread();
        consumerThread.start();

        joinAll(producerThread, consumerThread);

        assertQueuesAreEmpty();
        assertEquals(new HashSet(producerThread.producedList), new
HashSet(consumerThread.consumedList));
    }

    public void assertQueuesAreEmpty() {
        Transaction t = stm.startTransaction();
        for (long handle : queues) {
            Queue queue = (Queue) t.read(handle);
            if (!queue.isEmpty())
                fail();
        }

        t.commit();
    }

    private long[] createQueues(int queueCount) {
        Transaction t = stm.startTransaction();
        long[] handles = new long[queueCount];
        for (int k = 0; k < queueCount; k++)
            handles[k] = t.attachAsRoot(new Queue());
        t.commit();
        return handles;
    }

    private class ProducerThread extends TestThread {
        private final List<Integer> producedList = new
ArrayList<Integer>(produceCount);

        public ProducerThread() {
            setName("ProducerThread");
        }

        public void produceOneItem(final int item) {
            new TransactionTemplate(stm) {
                @Override
                protected Object execute(Transaction t) throws Exception {
                    long queueHandle = queues[item % queues.length];
                    Queue<Integer> queue = (Queue<Integer>) t.read(queueHandle);
                    queue.push(item);
                    return null;
                }
            }.execute();

            producedList.add(item);
        }

        public void run() {
            for (int k = 1; k <= produceCount; k++) {
                produceOneItem(k);
                sleepRandomMs(delayMs);
            }
        }
    }

    private class ConsumerThread extends TestThread {
        private final List consumedList = new LinkedList();

        public ConsumerThread() {
            setName("ConsumerThread");
        }

        public int consumeOneItem() {

            return new TransactionTemplate<Integer>(stm) {
                @Override
                protected Integer execute(Transaction t) throws Exception {
                    for (int k = 0; k < queues.length; k++) {
                        Queue<Integer> queue = (Queue) t.read(queues[k]);
                        Integer item = queue.peek();
                        if (item != null)
                            return item;
                    }

                    retry();
                    return null;
                }
            }.execute();
        }

        public void run() {
            for (int k = 0; k < produceCount; k++) {
                int item = consumeOneItem();
                sleepRandomMs(delayMs);
                consumedList.add(item);
            }
        }
    }
}

Stm's could be a nice tool in the toolbox.. But it introduces its own
problems like performance or leaky abstractions from the concurrency
implementation. Certainly very interesting though :)

On Sat, Mar 7, 2009 at 8:35 PM, Taylor Gautier
<tgautier at terracottatech.com> wrote:
> Thanks that should do!  I guess my aversion to recursion blinded me here ;)
>
>
>
> On Mar 7, 2009, at 10:23 AM, Marcelo Fukushima <takeshi10 at gmail.com> wrote:
>
>> hi,
>> lemme try to help you
>> comments inlined
>>
>> On Sat, Mar 7, 2009 at 2:55 PM, Taylor Gautier
>> <tgautier at terracottatech.com> wrote:
>>>
>>> Suppose I have a Person object, which has a list of friends, like so:
>>> Person:
>>>  String name;
>>>  List<Person> friends = new ArrayList<Person>();
>>> My question is, let's assume that a business rule requires that
>>> relationships are commutative, i.e if Bob is friend of Alice, then Alice
>>> is
>>> also a friend of Bob.
>>> This means that if we want to establish that relationship, we need to
>>> update
>>> both records atomically.   So let's try that (naively):
>>> public static void makeFriends(Person p1, person p2)
>>> {
>>>    synchronized (p1) {
>>>        p1.friends.add(p2);
>>>    }
>>>    synchronized (p2) {
>>>        p2.friends.add(p1);
>>>    }
>>> }
>>> Uh oh.  This looks like a disaster waiting to happen - the operation is
>>> not
>>> atomic.  Let's fix that:
>>> public static void makeFriends(Person p1, person p2)
>>> {
>>>    synchronized (p1) {
>>>        synchronized (p2) {
>>>            p1.friends.add(p2);
>>>            p2.friends.add(p1);
>>>        }
>>>    }
>>> }
>>> Ok better, but there is still a possibility of deadlocking this, consider
>>> two threads:
>>> Thread 1: makeFriends(bob, alice);
>>> Thread 2: makeFriends(alice, bob);
>>> We have two choices now,
>>> 1) We can find a higher order lock
>>> 2) Order the locks
>>> Approach #1 - Find a higher order lock
>>> If we try to do 1, then later, we cannot do fine-grained locking on
>>> Person
>>> to read what friends they have without acquiring the higher order lock.
>>>  We
>>> also start getting into trouble, what if we decide we want to update the
>>> person's name, now what lock do we use, and so on.  While it seems like a
>>> good goal -- in practice it may not be achievable given the application.
>>> Approach #2 - Order the locks
>>> I like this approach better.  It lets me do whatever other fine-grained
>>> locking on the Person object I would like to do, so it is compatible with
>>> a
>>> fine-grained approach.  But now we get to the crux of the problem - how
>>> do
>>> we order the locks?
>>> I am imagining a helper class here:
>>> public static void makeFriends(final Person p1, final Person p2)
>>> {
>>>    orderedSynchronizeOn(new Runnable() {
>>>        public void run() {
>>>            p1.friends.add(p2);
>>>            p2.friends.add(p1);
>>>        }
>>>    }, p1, p2);
>>> }
>>> Does anyone know of an implementation of orderedSynchronizeOn?  I can
>>> imagine writing something (pseudo-code) like:
>>> public void orderedSynchronizeOn(Runnable cmd, Object... objects)
>>> {
>>>    List sortedList = sort objects;
>>>    for (Object o: sortedList) {
>>>        lock (o);
>>>    }
>>>    cmd.run();
>>>    for (Object o: sortedList) {
>>>        unlock (o);
>>>    }
>>> }
>>
>> you probably want to use a recursive function to synchronize on all
>> objects if you can / want to use simple synchronized blocks, which
>> might look something like this (assuming objects are already sorted)
>>
>> private void runLocked(Runnable run, int index, Object[] locks) {
>>  if(index == locks.length) { run.run(); }
>>  else {
>>   synchronized(locks[index]) {
>>     runLocked(run, index+1, locks);
>>   }
>>  }
>> }
>>
>> its kind of like what ConcurrentHashMap does (take a look at the
>> source code for that and more)
>>
>>> Of course my pseudo-code is insufficient and has several problems that
>>> will
>>> arise which are:
>>> 1) how to sort the objects.  Presumably hashcode would be good - but
>>> could
>>> be unpredictable if the hashcode is not stable
>>
>> you might consider using System.identityHashcode( object ) for sorting
>> maybe breaking ties with the class hashcode, but im not sure how safe
>> is that (and most likely not 100% safe)
>>
>>> 2) I cannot think of any way to programmatically acquire the monitor for
>>> an
>>> object that corresponds to synchronized (o);
>>
>> like i said before, try a recursive version of the method
>>
>>> 3) Of course exceptions have to be handled, and the proper unlocks have
>>> to
>>> be executed in a finally block, which is rather tricky in the generic
>>> case
>>
>> if you use plain synchronized blocks, exception are taken care of
>> automatically
>>
>> i hope that helps
>>
>>> So, I can't see how to solve 2 and 3 using programmatic constructs
>>> (iterative approach to locking all passed in objects).  I can see how to
>>> implement say:
>>> orderedSynchronizeOn1,
>>> orderedSynchronizeOn2,
>>> orderedSynchronizeOn3,
>>> orderedSynchronizeOn4,
>>> orderedSynchronizeOn5,
>>> and so on by unrolling the loop and writing the appropriate try/finally
>>> with
>>> synchronized.
>>> And then implementing:
>>> public void orderedSynchronizeOn(Runnable cmd, Object... objects)
>>> {
>>>   switch (objects.length) {
>>>        case 1:  orderedSynchronized1(cmd, objects[0]); break;
>>>        case 2:  orderedSynchronized1(cmd, objects[0], objects 1); break;
>>>        ...
>>>  }
>>> }
>>>
>>>  That seems pretty ugly, but solves the problem assuming we can presume
>>> stable hash codes.
>>> Any other thoughts?
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.oswego.edu
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>
>>>
>>
>>
>>
>> --
>> []'s
>> Marcelo Takeshi Fukushima
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>



More information about the Concurrency-interest mailing list