[concurrency-interest] How to implement task-local variables (like thread local variables but for Fork/Join tasks)

Per Mildner perm at sics.se
Tue Aug 9 09:01:38 EDT 2016


> On 08 Aug 2016, at 15:19, Dr Heinz M. Kabutz <heinz at javaspecialists.eu> wrote:
> 
> Could you perhaps post some code demonstrating what you are trying to achieve?


The following is a simplified version of the real code.

1. Each (RecursiveAction task knows its parent, and
2. it is possible for a static method to access the “current task”, implemented by letting each compute() method set a thread-local variable.
3. Once you have the “current task” you can follow the parent chain and find the ancestor KeyValuePair tasks that hold key/value pairs for the current task-sub-tree.

This works (at least the real implementation in our code does) but I am a little worried about the overhead of try+getCurrentTask()+setCurrentTask() in each compute() method, especially since it must me done by all our compute() methods, also those that never (indirectly) calls getTaskLocalValue().

However, what I really would like to be able to do is to access these ancestor KeyValuePair tasks also when I have no control over the ForkJoinTask sub-class, e.g. when using the new Java 8 Collection.parallelStream et al.

I guess that ForkJoinPool already has  the necessary information but I see no way to access it.

Note that the callers of getTaskLocalValue() are typically far away from the code that creates and calls the ForkJoinTask sub-classes, and often called via code we can not modify, so we can not easily pass the key/value pairs as extra arguments.

The example Main sets task local variables when processing some of the arguments but does not demonstrate that the key/value pair can be seen from all tasks in a task sub-tree.

package tlv;

import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.function.IntConsumer;

@SuppressWarnings("WeakerAccess")
public class Main
{
    private static final int THRESHOLD = 2;
    /**
     * The {@link IntRangeTask} currently running its {@link IntRangeTask#compute()} in this thread (or its ancestor {@link TaskNode}). May be null.
     */
    private static final ThreadLocal<TaskNode> cCurrentTask = new ThreadLocal<>();
    private static final TaskNode NO_PARENT = null;


    /**
     * The task currently running in the thread, or null if none or unknown.
     */
    static TaskNode getCurrentTask()
    {
        return cCurrentTask.get();
    }

    static void setCurrentTask(TaskNode task)
    {
        cCurrentTask.set(task);
    }

    /**
     * Return the value associated with key in the task tree, closest ancestor first, or defaultValue if none.
     *
     * @param key          the key whose associated value is to be returned
     * @param defaultValue returned if key is not present
     * @return the value (possibly null) associated with key, or defaultValue if none
     * @see #withTaskLocal(Object, Object, Runnable)
     */
    // This is called fairly often.
    static Object getTaskLocalValue(Object key, Object defaultValue)
    {
        TaskNode task = getCurrentTask();

        // Go towards the task-tree root, looking for a value for key.
        while (task != null)
        {
            if (task instanceof KeyValuePair)
            {
                if (((KeyValuePair) task).getKey().equals(key))
                {
                    return ((KeyValuePair) task).getValue();
                }
            }
            task = task.getParent();
        }
        return defaultValue;
    }


    /**
     * @param key      the (non-null) value to bind.
     * @param value    the value.
     * @param runnable the runnable to run with the binding in effect.
     */
    // This is rarely called so it is not speed-critical.
    static void withTaskLocal(Object key, Object value, Runnable runnable)
    {
        KeyValuePair task = new KeyValuePair(key, value, getCurrentTask());
        TaskNode outerTask = getCurrentTask();
        try
        {
            setCurrentTask(task);
            runnable.run();
        }
        finally
        {
            setCurrentTask(outerTask);
        }
    }

    /**
     * Represents a node in the task-tree. Some nodes have associated key/value pairs.
     */
    interface TaskNode
    {
        /**
         * The parent of this task-tree node, or null if none.
         */
        TaskNode getParent();
    }

    // Similar to IncrementTask in RecursiveAction JavaDocs.
    static class IntRangeTask extends RecursiveAction implements TaskNode
    {
        private final TaskNode iParent;
        private final IntConsumer iOperation;
        private final int iLowInclusive;
        private final int iHighExclusive;

        IntRangeTask(IntConsumer operation, int lowInclusive, int highExclusive, TaskNode parent)
        {
            super();
            iParent = parent;
            iOperation = operation;
            iLowInclusive = lowInclusive;
            iHighExclusive = highExclusive;
        }

        @Override
        public TaskNode getParent()
        {
            return iParent;
        }

        // Called often, so I worry about the try+getCurrentTask()+setCurrentTask() overhead here.
        @Override
        protected void compute()
        {
            if (iHighExclusive - iLowInclusive < THRESHOLD)
            {
                TaskNode outerTask = getCurrentTask();

                try
                {
                    setCurrentTask(this);
                    for (int i = iLowInclusive; i < iHighExclusive; ++i)
                    {
                        iOperation.accept(i);
                    }
                }
                finally
                {
                    setCurrentTask(outerTask);
                }
            }
            else
            {
                int mid = iLowInclusive + ((iHighExclusive - iLowInclusive) >>> 1);
                TaskNode parent = getParent(); // could use 'this' here, instead.

                invokeAll(new IntRangeTask(iOperation, iLowInclusive, mid, parent),
                          new IntRangeTask(iOperation, mid, iHighExclusive, parent));
            }
        }
    }

    static class KeyValuePair implements TaskNode
    {
        private final Object iKey;
        private final Object iValue;
        private final TaskNode iParent;

        KeyValuePair(Object key, Object value, TaskNode parent)
        {
            assert key != null;
            iKey = key;
            iValue = value;
            iParent = parent;
        }

        @Override
        public TaskNode getParent()
        {
            return iParent;
        }

        Object getKey()
        {
            return iKey;
        }

        Object getValue()
        {
            return iValue;
        }
    }

    /* java tlv.Main a b c d e f g h i j k l m n o p q r s t u v a
    Round 0
    ...
    Round 1
    a-->a at Thread[main,5,main] Task local value for a
    b-->b at Thread[main,5,main] DefaultResult
    c-->c at Thread[main,5,main] DefaultResult
    d-->d at Thread[main,5,main] DefaultResult
    e-->e at Thread[main,5,main] DefaultResult
    f-->f at Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    g-->g at Thread[ForkJoinPool.commonPool-worker-2,5,main] DefaultResult
    h-->h at Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    i-->i at Thread[ForkJoinPool.commonPool-worker-7,5,main] DefaultResult
    j-->j at Thread[ForkJoinPool.commonPool-worker-1,5,main] DefaultResult
    k-->k at Thread[ForkJoinPool.commonPool-worker-7,5,main] DefaultResult
    l-->l at Thread[ForkJoinPool.commonPool-worker-1,5,main] Task local value for l
    m-->m at Thread[ForkJoinPool.commonPool-worker-1,5,main] DefaultResult
    n-->n at Thread[ForkJoinPool.commonPool-worker-1,5,main] DefaultResult
    o-->o at Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    p-->p at Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    q-->q at Thread[ForkJoinPool.commonPool-worker-6,5,main] DefaultResult
    r-->r at Thread[ForkJoinPool.commonPool-worker-2,5,main] DefaultResult
    s-->s at Thread[ForkJoinPool.commonPool-worker-2,5,main] DefaultResult
    t-->t at Thread[ForkJoinPool.commonPool-worker-2,5,main] Task local value for t
    u-->u at Thread[ForkJoinPool.commonPool-worker-7,5,main] DefaultResult
    v-->v at Thread[ForkJoinPool.commonPool-worker-7,5,main] DefaultResult
    a-->a at Thread[ForkJoinPool.commonPool-worker-7,5,main] Task local value for a
    Round 2
    ...
    */
    public static void main(String[] args)
    {
        // These arguments will run with task local values.
        final String[] keys = {"a", "t", "l"};

        final String[] results = new String[args.length];

        final ForkJoinPool pool = ForkJoinPool.commonPool();

        for (int j = 0; j < 5; j++)
        {
            System.out.println("Round " + j);

            pool.invoke(new IntRangeTask((i) ->
                                         {
                                             final String arg = args[i];

                                             if (Arrays.asList(keys).contains(arg))
                                             {
                                                 withTaskLocal(arg, "Task local value for " + arg, () -> results[i] = doSomething(arg));
                                             }
                                             else
                                             {
                                                 results[i] = doSomething(arg);
                                             }
                                         },
                                         0, args.length, NO_PARENT));

            for (int i = 0; i < results.length; i++)
            {
                System.out.println(args[i] + "-->" + results[i]);
            }
        }
    }

    private static String doSomething(final String arg)
    {
        return arg + "@" + Thread.currentThread() + " " + getTaskLocalValue(arg, "DefaultResult");
    }
}


> 
> On Monday, 8 August 2016, Per Mildner <perm at sics.se> wrote:
> Ping?
> 
> > On 09 Jun 2016, at 22:56, Per Mildner <perm at sics.se> wrote:
> >
> > I need something that should behave like a thread-local variable but for a tree of RecursiveAction tasks.
> >
> > That is, a task inherits the task-local variables from the task that created it. Looking up a task-local variable is done by traversing the chain of parent tasks, looking for special task instances that holds task local values.
> >
> > I currently implement this by letting all tasks know their parent (the “current task”) which is passed to their constructor. This way I can traverse the chain of parents towards the root of the task tree.
> >
> > Keeping track of the current task is done by making all tasks set and restore themselves as the “current task” in a thread-local variable in their RecursiveAction.compute() but that seems wasteful since it is done also when the the task is running in the same thread as its parent task.
> >
> > So, any ideas how to implement this efficiently?
> >
> > Even better, is there a way to make something like this work with the default pool?
> >
> > Regards,
...
> 
> 
> -- 
> Dr Heinz M. Kabutz (PhD CompSci)
> Author of "The Java(tm) Specialists' Newsletter"
> Sun/Oracle Java Champion
> JavaOne Rockstar Speaker
> http://www.javaspecialists.eu
> Tel: +30 69 75 595 262
> Skype: kabutz
> 



Per Mildner
Per.Mildner at sics.se



-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20160809/c9dcd34d/attachment-0001.html>


More information about the Concurrency-interest mailing list