[concurrency-interest] ForkJoin refresh

Peter Levart peter.levart at gmail.com
Wed Feb 5 11:00:07 EST 2020


Ooops,

There's also a race at the beginning of task execution. The task might 
already be executing and another thread might call cancel(true) even 
before the 'runner' field is set. In this case, the code of task will 
not get interrupted. So we should check the cancelation status after 
setting the 'runner' field and bail out if it was canceled.

This might do:

     static final class AdaptedInterruptibleCallable<T> extends 
ForkJoinTask<T>
         implements RunnableFuture<T> {
         @SuppressWarnings("serial") // Conditionally serializable
         final Callable<? extends T> callable;
         transient volatile Thread runner;
         transient volatile boolean cancelInterrupted;
         @SuppressWarnings("serial") // Conditionally serializable
         T result;

         AdaptedInterruptibleCallable(Callable<? extends T> callable) {
             if (callable == null) throw new NullPointerException();
             this.callable = callable;
         }

         public final T getRawResult() { return result; }

         public final void setRawResult(T v) { result = v; }

         public final boolean exec() {
             Thread t = Thread.currentThread();
             // set runner filed
             runner = t;
             try {
                 // check for concurrent cancel() that just missed the 
runner field
                 if (isCancelled()) {
                     throw new CancellationException();
                 } else {
                     result = callable.call();
                 }
                 return true;
             } catch (RuntimeException rex) {
                 throw rex;
             } catch (Exception ex) {
                 throw new RuntimeException(ex);
             } finally {
                 if (!RUNNER.compareAndSet(this, t, (Thread) null)) {
                     // someone called cancel(true) and successfully CASed
                     // runner to null - we should wait for it to set the
                     // cancelInterrupted flag...
                     while (!cancelInterrupted) {
                         Thread.onSpinWait();
                     }
                     // ... and then clear the interrupt it set just 
before that
                     Thread.interrupted();
                 }
             }
         }

         public final void run() { invoke(); }

         public final boolean cancel(boolean mayInterruptIfRunning) {
             Thread t;
             boolean stat = super.cancel(false);
             if (mayInterruptIfRunning && (t = runner) != null &&
                 RUNNER.compareAndSet(this, t, (Thread) null)) {
                 try {
                     t.interrupt();
                 } catch (Throwable ignore) {
                 }
                 cancelInterrupted = true;
             }
             return stat;
         }

         public String toString() {
             return super.toString() + "[Wrapped task = " + callable + "]";
         }

         private static final VarHandle RUNNER;
         static {
             try {
                 RUNNER = MethodHandles
                     .lookup()
.findVarHandle(AdaptedInterruptibleCallable.class, "runner", Thread.class);
             } catch (NoSuchFieldException | IllegalAccessException e) {
                 throw new InternalError(e);
             }
         }

         private static final long serialVersionUID = 2838392045355241008L;
     }


Regards, Peter



More information about the Concurrency-interest mailing list