[concurrency-interest] Reducing the amount of locking

Tim Peierls tim at peierls.net
Sun Jan 11 12:45:24 EST 2009


I don't fully understand the code, so I can't answer your question directly,
but it sure seems like a perfect application for map-reduce using
extra166y's ParallelArray:

http://gee.cs.oswego.edu/dl/jsr166/dist/extra166ydocs/extra166y/ParallelArray.html

With a few modifications to your Node class (providing an EMPTY Node
instance and adding a mergeBalanced method that merges two balanced Nodes),
your code could be as simple as this, with no locking whatsoever:

import static extra166y.Ops.Op;
import static extra166y.Ops.Reducer;
import static extra166y.ParallelArray.createUsingHandoff;
import static extra166y.ParallelArray.defaultExecutor;

public final class ParallelArraySnapshotBuilder implements SnapshotBuilder {

    public HeapSnapshot create(Content... changes) {
        Node root = createUsingHandoff(changes, defaultExecutor())
            .withMapping(contentToNode)
            .reduce(mergeBalanced, Node.EMPTY);

        return new PartitionedHeapSnapshot(new Node[] { root }); // or
something more direct
    }

    private static Op<Content, Node> contentToNode = new Op<Content, Node>()
{
        public Node op(Content content) { return new Node(content); }
    };

    private static Reducer<Node> mergeBalanced = new Reducer<Node>() {
        public Node op(Node a, Node b) { return a.mergeBalanced(b); }
    };
}

That aside, are you sure your use of volatile for busyRoot and completedRoot
is correct? I'd worry about the places where they are tested for nullity
without the lock being held. Even if it is correct, it's not easy to see
that it is, which for me counts as a bug.

--tim


On Sat, Jan 10, 2009 at 2:27 PM, Peter Veentjer <alarmnummer at gmail.com>wrote:

> Hi All,
>
> I'm currently playing with a heap implementation for an STM which in
> essence is just a balanced tree (see Node object at the bottom of this
> mail). One of the problems of building that tree is that the balancing
> is quite expensive and if the number of changes is big, the system
> can't process any other transactions.
>
> So the goal is to make the period to process the transaction as short
> as possible, and one of the ways to do that is to partition the tree
> building, so that it can be processed in parallel and thereby reducing
> the processing time:
>
> My main question is: is there any way to reduce the amount of locking
> needed in the BlockingPartitionBuilder? At the moment a single lock is
> used for the placement of items and the taking of items. I have been
> playing with some AtomicReferences to reduce the amount of locking,
> but the system is going to be subject to deadlocks (missed signals).
> So I tried this 'standard' solution first, but I'm wondering if there
> is a better one.
>
> =====================================================================
>
> public final class ParallelSnapshotBuilder implements SnapshotBuilder {
>
>    private final int partitionCount;
>    private final PartitionBuilder[] builders;
>
>    public ParallelSnapshotBuilder() {
>        partitionCount = Runtime.getRuntime().availableProcessors();
>
>        builders = new PartitionBuilder[partitionCount];
>        for (int k = 0; k < partitionCount; k++)
>            builders[k] = new BlockingPartitionBuilder();
>    }
>
>    public HeapSnapshot create(Content... changes) {
>        forkBuildRoots(changes);
>        Node[] roots = joinAndGetBuildRoots();
>        return new PartitionedHeapSnapshot(roots);
>    }
>
>    private Node[] joinAndGetBuildRoots() {
>        Node[] nodes = new Node[partitionCount];
>        for (int k = 0; k < partitionCount; k++)
>            nodes[k] = builders[k].joinAndGetBuildRoot();
>        return nodes;
>    }
>
>    private void forkBuildRoots(Content... changes) {
>        for (Content content : changes)
>            getPartitionBuilder(content).forkBuildRoot(content);
>    }
>
>    private PartitionBuilder getPartitionBuilder(Content content) {
>        int partition = getPartition(content);
>        return builders[partition];
>    }
>
>    private int getPartition(Content content) {
>        return (int) (content.getHandle() % partitionCount);
>    }
> }
>
> =====================================================================
>
> class BlockingPartitionBuilder implements PartitionBuilder {
>
>    private final static Object TERMINATOR = new Object();
>
>    private final Thread builderThread;
>
>    private final Lock mainLock = new ReentrantLock();
>    private final Condition contentAvailableCondition =
> mainLock.newCondition();
>    private final Condition completedCondition = mainLock.newCondition();
>
>    private TodoNode head;
>
>    private volatile Node busyRoot;
>    private volatile Node completedRoot;
>
>    BlockingPartitionBuilder() {
>        builderThread = new Thread(new BuildTask());
>        builderThread.setDaemon(true);
>        builderThread.start();
>    }
>
>    public void forkBuildRoot(Content content) {
>        if (content == null) throw new NullPointerException();
>
>        mainLock.lock();
>        try {
>            if (completedRoot != null) {
>                busyRoot = completedRoot;
>                completedRoot = null;
>            }
>            head = new TodoNode(head, content);
>            contentAvailableCondition.signal();
>        } finally {
>            mainLock.unlock();
>        }
>    }
>
>   public Node joinAndGetBuildRoot() {
>        if (completedRoot == null) {
>            mainLock.lock();
>            try {
>                head = new TodoNode(head, TERMINATOR);
>                while (completedRoot == null)
>                    completedCondition.awaitUninterruptibly();
>            } finally {
>                mainLock.unlock();
>            }
>        }
>        return completedRoot;
>    }
>
>    private class BuildTask implements Runnable {
>
>        public void run() {
>            while (true)
>                processItemsForSingleCommit();
>        }
>
>        private void process(Content content) {
>            if (busyRoot == null)
>                busyRoot = new Node(content);
>            else
>                busyRoot = busyRoot.createBalanced(content);
>        }
>
>        private TodoNode retrieveItems() {
>            mainLock.lock();
>            try {
>                while (head == null)
>                    contentAvailableCondition.awaitUninterruptibly();
>                TodoNode result = head;
>                head = null;
>                return result;
>            } finally {
>                mainLock.unlock();
>            }
>        }
>
>        private void processItemsForSingleCommit() {
>            do {
>                TodoNode head = retrieveItems();
>                do {
>                    if (head.content == TERMINATOR) {
>                        mainLock.lock();
>                        try {
>                            completedRoot = busyRoot;
>                            completedCondition.signal();
>                        } finally {
>                            mainLock.unlock();
>                        }
>                    } else {
>                        process((Content) head.content);
>                    }
>                    head = head.next;
>                } while (head != null);
>            } while (completedRoot == null);
>        }
>    }
>
>    private static class TodoNode {
>        final TodoNode next;
>        final Object content;
>
>        TodoNode(TodoNode next, Object content) {
>            this.next = next;
>            this.content = content;
>        }
>    }
> }
>
> =====================================================================
>
> public class Node {
>
>    public static int height(Node node) {
>        return node == null ? 0 : node.height;
>    }
>
>    private static final int COMPARE_SPOT_ON = 0;
>    private static final int COMPARE_GO_RIGHT = 1;
>    private static final int COMPARE_GO_LEFT = -1;
>
>    private final Content content;
>    private final Node left;
>    private final Node right;
>    private final int height;
>
>    public Node(Content content) {
>        this(content, null, null);
>    }
>
>    public Node(Content content, Node left, Node right) {
>        this.content = content;
>        this.left = left;
>        this.right = right;
>        this.height = max(height(left), height(right)) + 1;
>    }
>
>    public Content getContent() {
>        return content;
>    }
>
>    public Node singleRotateRight() {
>        if (left == null)
>            throw new IllegalStateException("to do a right rotate, the
> left field can't be null");
>
>        Node q = this;
>        Node p = q.left;
>        Node a = p.left;
>        Node b = p.right;
>        Node c = q.right;
>
>        Node qNew = new Node(q.content, b, c);
>        return new Node(p.content, a, qNew);
>    }
>
>    public Node doubleRotateRight() {
>        Node newLeft = left.singleRotateLeft();
>        return new Node(content, newLeft, right).singleRotateRight();
>    }
>
>    public Node singleRotateLeft() {
>        if (right == null)
>            throw new IllegalStateException("to do a left rotate, the
> right field can't be null");
>
>        Node p = this;
>        Node q = p.right;
>        Node a = p.left;
>        Node b = q.left;
>        Node c = q.right;
>        Node pNew = new Node(p.content, a, b);
>        return new Node(q.content, pNew, c);
>    }
>
>    public Node doubleRotateLeft() {
>        Node newRight = right.singleRotateRight();
>        return new Node(content, left, newRight).singleRotateLeft();
>    }
>
>    public Node createBalanced(Content change) {
>        Node unbalanced = createUnbalanced(change);
>        return unbalanced.balance();
>    }
>
>    public Node balance() {
>        int balanceFactor = balanceFactor();
>        switch (balanceFactor) {
>            case 0:
>                return this;
>            case 1:
>                return this;
>            case -1:
>                return this;
>            case 2:
>                //het is een right/right of een right/left case
>                //is the right right heavy, or left heavy
>                int rightBalanceFactor = right.balanceFactor();
>                if (rightBalanceFactor == 1)
>                    return this.singleRotateLeft();
>                else
>                    return this.doubleRotateLeft();
>            case -2:
>                //is the left/left  heavy, or left/right heavy
>                int leftBalanceFactor = left.balanceFactor();
>                if (leftBalanceFactor == -1)
>                    return this.singleRotateRight();
>                else
>                    return this.doubleRotateRight();
>            default:
>                throw new RuntimeException("unhandeled balanceFactor:
> " + balanceFactor);
>        }
>    }
>
>    public Node createUnbalanced(Content change) {
>        int compare = compare(change.getHandle());
>        switch (compare) {
>            case COMPARE_SPOT_ON:
>                //since the left and right trees are balanced, the new
> node will be balanced.
>                return new Node(change, left, right);
>            case COMPARE_GO_RIGHT:
>                Node newRight;
>                if (right == null)
>                    newRight = new Node(change, null, null);
>                else
>                    newRight = right.createBalanced(change);
>                return new Node(content, left, newRight);
>            case COMPARE_GO_LEFT:
>                Node newLeft;
>                if (left == null)
>                    newLeft = new Node(change, null, null);
>                else
>                    newLeft = left.createBalanced(change);
>                return new Node(content, newLeft, right);
>            default:
>                throw new RuntimeException("unhandeled compare " + compare);
>        }
>    }
>
>    public int size() {
>        int size = 1;
>        if (right != null)
>            size += right.size();
>        if (left != null)
>            size += left.size();
>        return size;
>    }
>
>    public int balanceFactor() {
>        return height(right) - height(left);
>    }
>
>    public int compare(long otherHandle) {
>        if (content.getHandle() == otherHandle) {
>            return COMPARE_SPOT_ON;
>        } else if (content.getHandle() < otherHandle) {
>            return COMPARE_GO_RIGHT;
>        } else {
>            return COMPARE_GO_LEFT;
>        }
>    }
>
>    public Node find(long handle) {
>        Node node = this;
>        do {
>            switch (node.compare(handle)) {
>                case COMPARE_SPOT_ON:
>                    return node;
>                case COMPARE_GO_RIGHT:
>                    node = node.right;
>                    break;
>                case COMPARE_GO_LEFT:
>                    node = node.left;
>                    break;
>                default:
>                    throw new RuntimeException("unhandled case");
>            }
>        } while (node != null);
>
>        return null;
>    }
>
> }
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20090111/56a90783/attachment-0001.html>


More information about the Concurrency-interest mailing list