[concurrency-interest] ForkJoin refresh

Peter Levart peter.levart at gmail.com
Wed Feb 5 10:15:21 EST 2020


Hi,

On 2/4/20 8:52 PM, Doug Lea via Concurrency-interest wrote:
>   * Tasks constructed with the (@link #adaptInterruptible) adaptor
>   * track and interrupt the running thread upon {@code
>   * cancel(true)}. Reliable usage requires awareness of potential
>   * consequences: Method bodies should ignore stray interrupts to cope
>   * with the inherent possibility that a late interrupt issued by
>   * another thread after a given task has completed may (inadvertently)
>   * interrupt some future task. Further, interruptible tasks should not
>   * in general create subtasks, because an interrupt intended for a
>   * given task may be consumed by one of its subtasks, or vice versa.

Would something like the following be more "precise" in targeting the 
task and only the task and not any other code that executes after the 
task in the same thread? At least stray interrupts would not be possible 
as a consequence of cancel(true) for some unrelated task. Should anyone 
interrupt the pool thread outside of normal task execution, the 
interrupted status would not propagate to task code nor would it be lost 
after the task has finished.

The hot path cost of this would be a CAS instead of a volatile write to 
the runner field at the end of each task.

     static final class AdaptedInterruptibleCallable<T> extends 
ForkJoinTask<T>
         implements RunnableFuture<T> {
         @SuppressWarnings("serial") // Conditionally serializable
         final Callable<? extends T> callable;
         @SuppressWarnings("serial") // Conditionally serializable
         transient volatile Thread runner;
         @SuppressWarnings("serial") // Conditionally serializable
         transient volatile boolean cancelInterrupted;
         T result;

         AdaptedInterruptibleCallable(Callable<? extends T> callable) {
             if (callable == null) throw new NullPointerException();
             this.callable = callable;
         }

         public final T getRawResult() { return result; }

         public final void setRawResult(T v) { result = v; }

         public final boolean exec() {
             // save the pre-task interrupted status and clear it for task
             boolean wasInterrupted = Thread.interrupted();
             Thread t = Thread.currentThread();
             runner = t;
             try {
                 result = callable.call();
                 return true;
             } catch (RuntimeException rex) {
                 throw rex;
             } catch (Exception ex) {
                 throw new RuntimeException(ex);
             } finally {
                 if (!RUNNER.compareAndSet(this, t, (Thread) null)) {
                     // someone called cancel(true) and successfully CASed
                     // runner to null - we should wait for it to set the
                     // cancelInterrupted flag...
                     while (!cancelInterrupted) {
                         Thread.onSpinWait();
                     }
                     // ... and then clear the interrupt it set just 
before that
                     Thread.interrupted();
                 }
                 // restore pre-task interrupted status
                 if (wasInterrupted) {
                     Thread.currentThread().interrupt();
                 }
             }
         }

         public final void run() { invoke(); }

         public final boolean cancel(boolean mayInterruptIfRunning) {
             Thread t;
             boolean stat = super.cancel(false);
             if (mayInterruptIfRunning && (t = runner) != null &&
                 RUNNER.compareAndSet(this, t, (Thread) null)) {
                 try {
                     t.interrupt();
                 } catch (Throwable ignore) {
                 }
                 cancelInterrupted = true;
             }
             return stat;
         }

         public String toString() {
             return super.toString() + "[Wrapped task = " + callable + "]";
         }

         private static final VarHandle RUNNER;
         static {
             try {
                 RUNNER = MethodHandles
                     .lookup()
.findVarHandle(AdaptedInterruptibleCallable.class, "runner", Thread.class);
             } catch (NoSuchFieldException | IllegalAccessException e) {
                 throw new InternalError(e);
             }
         }

         private static final long serialVersionUID = 2838392045355241008L;
     }




Regards, Peter



More information about the Concurrency-interest mailing list