[concurrency-interest] Reducing the amount of locking

Peter Veentjer alarmnummer at gmail.com
Sat Jan 10 14:27:29 EST 2009


Hi All,

I'm currently playing with a heap implementation for an STM which in
essence is just a balanced tree (see Node object at the bottom of this
mail). One of the problems of building that tree is that the balancing
is quite expensive and if the number of changes is big, the system
can't process any other transactions.

So the goal is to make the period to process the transaction as short
as possible, and one of the ways to do that is to partition the tree
building, so that it can be processed in parallel and thereby reducing
the processing time:

My main question is: is there any way to reduce the amount of locking
needed in the BlockingPartitionBuilder? At the moment a single lock is
used for the placement of items and the taking of items. I have been
playing with some AtomicReferences to reduce the amount of locking,
but the system is going to be subject to deadlocks (missed signals).
So I tried this 'standard' solution first, but I'm wondering if there
is a better one.

=====================================================================

public final class ParallelSnapshotBuilder implements SnapshotBuilder {

    private final int partitionCount;
    private final PartitionBuilder[] builders;

    public ParallelSnapshotBuilder() {
        partitionCount = Runtime.getRuntime().availableProcessors();

        builders = new PartitionBuilder[partitionCount];
        for (int k = 0; k < partitionCount; k++)
            builders[k] = new BlockingPartitionBuilder();
    }

    public HeapSnapshot create(Content... changes) {
        forkBuildRoots(changes);
        Node[] roots = joinAndGetBuildRoots();
        return new PartitionedHeapSnapshot(roots);
    }

    private Node[] joinAndGetBuildRoots() {
        Node[] nodes = new Node[partitionCount];
        for (int k = 0; k < partitionCount; k++)
            nodes[k] = builders[k].joinAndGetBuildRoot();
        return nodes;
    }

    private void forkBuildRoots(Content... changes) {
        for (Content content : changes)
            getPartitionBuilder(content).forkBuildRoot(content);
    }

    private PartitionBuilder getPartitionBuilder(Content content) {
        int partition = getPartition(content);
        return builders[partition];
    }

    private int getPartition(Content content) {
        return (int) (content.getHandle() % partitionCount);
    }
}

=====================================================================

class BlockingPartitionBuilder implements PartitionBuilder {

    private final static Object TERMINATOR = new Object();

    private final Thread builderThread;

    private final Lock mainLock = new ReentrantLock();
    private final Condition contentAvailableCondition = mainLock.newCondition();
    private final Condition completedCondition = mainLock.newCondition();

    private TodoNode head;

    private volatile Node busyRoot;
    private volatile Node completedRoot;

    BlockingPartitionBuilder() {
        builderThread = new Thread(new BuildTask());
        builderThread.setDaemon(true);
        builderThread.start();
    }

    public void forkBuildRoot(Content content) {
        if (content == null) throw new NullPointerException();

        mainLock.lock();
        try {
            if (completedRoot != null) {
                busyRoot = completedRoot;
                completedRoot = null;
            }
            head = new TodoNode(head, content);
            contentAvailableCondition.signal();
        } finally {
            mainLock.unlock();
        }
    }

   public Node joinAndGetBuildRoot() {
        if (completedRoot == null) {
            mainLock.lock();
            try {
                head = new TodoNode(head, TERMINATOR);
                while (completedRoot == null)
                    completedCondition.awaitUninterruptibly();
            } finally {
                mainLock.unlock();
            }
        }
        return completedRoot;
    }

    private class BuildTask implements Runnable {

        public void run() {
            while (true)
                processItemsForSingleCommit();
        }

        private void process(Content content) {
            if (busyRoot == null)
                busyRoot = new Node(content);
            else
                busyRoot = busyRoot.createBalanced(content);
        }

        private TodoNode retrieveItems() {
            mainLock.lock();
            try {
                while (head == null)
                    contentAvailableCondition.awaitUninterruptibly();
                TodoNode result = head;
                head = null;
                return result;
            } finally {
                mainLock.unlock();
            }
        }

        private void processItemsForSingleCommit() {
            do {
                TodoNode head = retrieveItems();
                do {
                    if (head.content == TERMINATOR) {
                        mainLock.lock();
                        try {
                            completedRoot = busyRoot;
                            completedCondition.signal();
                        } finally {
                            mainLock.unlock();
                        }
                    } else {
                        process((Content) head.content);
                    }
                    head = head.next;
                } while (head != null);
            } while (completedRoot == null);
        }
    }

    private static class TodoNode {
        final TodoNode next;
        final Object content;

        TodoNode(TodoNode next, Object content) {
            this.next = next;
            this.content = content;
        }
    }
}

=====================================================================

public class Node {

    public static int height(Node node) {
        return node == null ? 0 : node.height;
    }

    private static final int COMPARE_SPOT_ON = 0;
    private static final int COMPARE_GO_RIGHT = 1;
    private static final int COMPARE_GO_LEFT = -1;

    private final Content content;
    private final Node left;
    private final Node right;
    private final int height;

    public Node(Content content) {
        this(content, null, null);
    }

    public Node(Content content, Node left, Node right) {
        this.content = content;
        this.left = left;
        this.right = right;
        this.height = max(height(left), height(right)) + 1;
    }

    public Content getContent() {
        return content;
    }

    public Node singleRotateRight() {
        if (left == null)
            throw new IllegalStateException("to do a right rotate, the
left field can't be null");

        Node q = this;
        Node p = q.left;
        Node a = p.left;
        Node b = p.right;
        Node c = q.right;

        Node qNew = new Node(q.content, b, c);
        return new Node(p.content, a, qNew);
    }

    public Node doubleRotateRight() {
        Node newLeft = left.singleRotateLeft();
        return new Node(content, newLeft, right).singleRotateRight();
    }

    public Node singleRotateLeft() {
        if (right == null)
            throw new IllegalStateException("to do a left rotate, the
right field can't be null");

        Node p = this;
        Node q = p.right;
        Node a = p.left;
        Node b = q.left;
        Node c = q.right;
        Node pNew = new Node(p.content, a, b);
        return new Node(q.content, pNew, c);
    }

    public Node doubleRotateLeft() {
        Node newRight = right.singleRotateRight();
        return new Node(content, left, newRight).singleRotateLeft();
    }

    public Node createBalanced(Content change) {
        Node unbalanced = createUnbalanced(change);
        return unbalanced.balance();
    }

    public Node balance() {
        int balanceFactor = balanceFactor();
        switch (balanceFactor) {
            case 0:
                return this;
            case 1:
                return this;
            case -1:
                return this;
            case 2:
                //het is een right/right of een right/left case
                //is the right right heavy, or left heavy
                int rightBalanceFactor = right.balanceFactor();
                if (rightBalanceFactor == 1)
                    return this.singleRotateLeft();
                else
                    return this.doubleRotateLeft();
            case -2:
                //is the left/left  heavy, or left/right heavy
                int leftBalanceFactor = left.balanceFactor();
                if (leftBalanceFactor == -1)
                    return this.singleRotateRight();
                else
                    return this.doubleRotateRight();
            default:
                throw new RuntimeException("unhandeled balanceFactor:
" + balanceFactor);
        }
    }

    public Node createUnbalanced(Content change) {
        int compare = compare(change.getHandle());
        switch (compare) {
            case COMPARE_SPOT_ON:
                //since the left and right trees are balanced, the new
node will be balanced.
                return new Node(change, left, right);
            case COMPARE_GO_RIGHT:
                Node newRight;
                if (right == null)
                    newRight = new Node(change, null, null);
                else
                    newRight = right.createBalanced(change);
                return new Node(content, left, newRight);
            case COMPARE_GO_LEFT:
                Node newLeft;
                if (left == null)
                    newLeft = new Node(change, null, null);
                else
                    newLeft = left.createBalanced(change);
                return new Node(content, newLeft, right);
            default:
                throw new RuntimeException("unhandeled compare " + compare);
        }
    }

    public int size() {
        int size = 1;
        if (right != null)
            size += right.size();
        if (left != null)
            size += left.size();
        return size;
    }

    public int balanceFactor() {
        return height(right) - height(left);
    }

    public int compare(long otherHandle) {
        if (content.getHandle() == otherHandle) {
            return COMPARE_SPOT_ON;
        } else if (content.getHandle() < otherHandle) {
            return COMPARE_GO_RIGHT;
        } else {
            return COMPARE_GO_LEFT;
        }
    }

    public Node find(long handle) {
        Node node = this;
        do {
            switch (node.compare(handle)) {
                case COMPARE_SPOT_ON:
                    return node;
                case COMPARE_GO_RIGHT:
                    node = node.right;
                    break;
                case COMPARE_GO_LEFT:
                    node = node.left;
                    break;
                default:
                    throw new RuntimeException("unhandled case");
            }
        } while (node != null);

        return null;
    }

}


More information about the Concurrency-interest mailing list