[concurrency-interest]proposal for Executor, Future, SharedExecutor

Mark D. Anderson mda@discerning.com
Mon, 4 Feb 2002 19:12:20 -0800


Below is proposal with code for the following:

- a "FutureExecutor" interface that seems better to me than existing proposals

- a "SharedExecutor" that implements FutureExecutor and allows a set of Runnables
to be managed as a group, yet actually be executed by a separate FutureExecutor
that can be shared by multiple wrapping SharedExecutors.

- a subclass of dl's PooledExecutor that also implements FutureExecutor.
It is used by SharedExecutor as a demonstration.

Some comments:

- the code is written (and tested) using dl's existing EDU package. Wherever
a FutureResult is, imagine a Future.

- there are calls to a Log package which is not included but which does the
obvious thing.

- the EDU package has execute() raising an InterruptedException, while it
does not in the java.util proposal. I'm following the newer proposal.

- the EDU package has a Callable with no args, while the java.util proposal 
has one arg. I'm following the older package. It is easy enough to bind
a closure, even in java. Carrying around the extra arg is inconvenient, 
while still not being compatible with any other class, say with
java.lang.reflect.Method.

- Except I withdrew all use of Callable in my (Future)Executor interface anyway.
It is just too messy dealing with all these task-like objects: Callable,
Runnable, Future, RunnableTask, TimerTask, etc.
So now I am just using Runnable and FutureResult (aka Future) in the interface.
In most cases I can imagine, a Callable will get wrapped by a Future
anyway. And Executors just want to know about Runnables.

- I have provided a FutureExecutorAbstract implementation class that provides
convenience execute() functions that handle the different task-like
objects through a subclass of Future(Result) called MyFuture.

- Note that there is no CompositeTask interface. Instead I'm just using my
(modified) FutureExecutor interface for that purpose. I consider the concept I
want for task grouping to be more akin to an Executor than a Runnable, and
anyway I think some interesting things (as shown here with a stacked executor)
can be done if the interfaces are reused.

- I have added interruption to the FutureExecutor interface. I could imagine
that it could be added as an expanded FutureResult interface, but really,
the Executor is the one that knows about threads, so I think it belongs there.

- I have also put cancellation into FutureExecutor, except that I have
called it "unqueue" :). In this terminology, an "unqueue" is an action that
Executors do to prevent an unrun task from being run.
A  particular Future/FutureResult/Callable/etc. might
implement a cancel() function their desired behavior
if they never get run (because they are unqueued), or if they discover
isCancelled while running, or because they want cancel() to be called
prior to interrupt().
According to a policy class TBD, Executor calls that cancel() function upon
unqueue().

- I have put a cancel() (in the non-unqueue sense) function in MyFuture.
I think that putting cancel() into yet a 3rd interface ("RunnableTask"
or whatever) just gets really ungainly, unless in practice the classes
actually used will implement multiple interfaces.
Otherwise there just get to be too many "execute()" methods, etc.
I think it should be added to Future, since that has other state methods
anyway.

- I have a waitAll() method in FutureExecutor which I find more
convenient and easier to decipher than the interface in DL's PooledExecutor,
which is a combination of shutdownAfterProcessingCurrentlyQueuedTasks(), 
awaitTerminationAfterShutdown(msecs) and shutdownNow().
It also takes a parameter for timeout policy, which includes interrupting
or not.

-mda
------------------------------
/* 
 * Mark D. Anderson, mda@discerning.com. 
 * February 2002. 
 * Released under the same public domain terms as Doug Lea's concurrent package.
*/
package whatever;
import Log;

import EDU.oswego.cs.dl.util.concurrent.Callable;
import EDU.oswego.cs.dl.util.concurrent.FutureResult;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.TimeoutException;

import java.lang.reflect.InvocationTargetException;
import java.lang.InterruptedException;
import java.util.Iterator;
import java.util.Collection;

public class Concurrent {

   /**
    * A Future with conventient constructors for use with Callable and Runnable.
    * Also has a cancel() function.
    */
   public static class MyFuture extends FutureResult /* implements Future */ {
      Runnable runnable_;

      public Runnable getRunnable() {return runnable_;}
      public void setRunnable(Runnable runnable) {runnable_ = runnable;}
      /**
       * Create a another runnable which will call this wrapped runnable and
       * then set the value to null.
       * Does not change the state of this object.
       */
      public Runnable wrap(final Runnable runnable) {
         return new Runnable() {
               public void run() {
                  try {
                     runnable.run();
                     set(null);
                  }
                  catch(Throwable ex) {
                     setException(ex);
                  }
               }
            };
      }
      public void setWrappedRunnable(Runnable runnable) {setRunnable(wrap(runnable));}

      public MyFuture(Callable callable) {setRunnable(this.setter(callable));}

      /**
       * The runnable stored in the instance is actually a wrapping Runnable, not
       * the one passed in.
       */
      public MyFuture(Runnable runnable) {setWrappedRunnable(runnable);}

      boolean isCancelled_ = false;
      public synchronized void setCancelled(boolean isCancelled) {isCancelled_ = isCancelled;}
      public synchronized boolean isCancelled() {return isCancelled_;}
      /**
       * The base class implementation does nothing but set a boolean.
       */
      public synchronized void cancel() {setCancelled(true);}
    };


   /**
    * handler to implement policy about timed out Runnables
    */
   interface FutureTimeoutHandler {
      /**
       * @return true if the handling was successful, whatever that is considered to mean
       */
      boolean timeout(FutureResult future, Runnable runnable);
   }


   /**
    * Execute a set of Runnables.
    */
   interface FutureExecutor /* extends Executor */ {
      /**
       * Execute the supplied Runnable, which will result in setting this Future.
       * It is up to the caller to tie them together, before calling this method.
       * (The supplied Runnable might for example come from calling the FutureResult's setter() method.)
       * <p>
       * (TBD: can any control be done through the FutureResult/Future?)
       * (Note: unlike EDU package, this does not throw InterruptedException)
       * (TBD: allow future to be null, if just execution is desired?)
       */
      void execute(FutureResult future, Runnable runnable); 

      /**
       * Collection of all the Futures
       */
      java.util.Collection futures();

      /**
       * Collection of all the Runnables
       */
      java.util.Collection runnables();

      /**
       * Wait up to msecs for all to complete.
       * If all complete sooner, then return when that happens.
       * If some are not done at the timeout point, then the timeout_handler
       * is called on each of them (if timeout_handler is not null, in which
       * case they are left as is.)
       * <p>
       * (TBD: also whether to call cancel() on the incomplete ones, if
       * the Runnable or FutureResult has such a method.)
       *
       * @return the number which were not done at the timeout point
       *   (returns 0 if all got done before timeout).
       * @exception InterruptedException if this is interrupted (should not generally happen)
       */
      int waitAll(long msecs, FutureTimeoutHandler timeout_handler) throws InterruptedException;

      /** 
       * interrupt the given Runnable if it is currently running (assigned to a thread).
       * if it is not yet running, it is unqueue'd (TBD: make this controllable?).
       * <p>
       * (TBD: this could take a FutureResult as a parameter instead.)
       * (TBD: if the Runnable or FutureResult is a class that has a cancel() function, it gets called.)
       * (Note: this is called "interruptTask" just for my clarity; for symmetry it should
       * just be called "interrupt").
       *
       * @return true if it was running, and an interrupt was delivered, or 
       *   if it was not yet running, the result of unqueue().
       * @exception IllegalArgumentException Might throw upon an unknown Runnable.
       */
      boolean interruptTask(Runnable runnable);

      /** 
       * Unqueue the particular runnable. 
       * If it is already running or done, the Executor does nothing and returns false.
       * If it is not yet running, the Executor arranges to never run it, and returns
       * true if it was not previously unqueued.
       * <p>
       * (TBD: if the Runnable or FutureResult is a class that has a cancel() function, it gets called.)
       * (TBD: this could take a FutureResult as a parameter instead.)
       * <p>
       * Note that unqueue() and interruptTask() do not actually remove a runnable from an Executor;
       * they just change the state. (TBD: need a "List drain()" function too? unqueueAll?)
       *
       * @return returns true if it was not yet running, and it was not previously unqueued.
       */
      boolean unqueue(Runnable runnable);
   }


   /**
    * extra convenience methods
    */
   public static abstract class FutureExecutorAbstract implements FutureExecutor {
      /**
       * A FutureTimeoutHandler that cancel's if supported, and interrupts.
       */
      public class InterruptHandler implements FutureTimeoutHandler {
         public boolean timeout(FutureResult future, Runnable runnable) {
            // maybe cancel it
            try {
               // if has a cancel() function, cancel it
               // TBD: this is ugly; perhaps we could just insist on Future having a cancel().
               if (runnable instanceof java.util.TimerTask) { ((java.util.TimerTask)runnable).cancel(); }
               if (future instanceof MyFuture) { ((MyFuture)future).cancel(); }
            }
            catch(Throwable e) {Log.warn("got exception while canceling", e);}

            // interrupt it.
            // TODO: avoid duplicate interrupts somehow
            try {
               return interruptTask(runnable);
            }
            catch(Throwable e) {Log.warn("got exception while interrupting", e);}
            return false;
         }
      }

      /**
       * Same as execute(future, future.getRunnable()).
       * @return the parameter.
       */
      MyFuture execute(MyFuture future) {execute(future, future.getRunnable()); return future;}

      /**
       * Same as execute(new MyFuture(callable)).
       * Returns the MyFuture created, and from that you can extract the Runnable that
       * is going to be used.
       */
      MyFuture execute(Callable callable) {return execute(new MyFuture(callable));}

      /**
       * Same as execute(new MyFuture(runnable)).
       * Note that the parameter runnable is <em>not</em> the one used by the Executor;
       * rather a wrapping one is used, which may be retrieved by future.getRunnable().
       * <p>
       * (temporary: using "executeTask" to avoid potential conflict with EDU "void execute(Runnable)")
       */
      MyFuture executeTask(Runnable runnable) {return execute(new MyFuture(runnable));}

      /**
       * Same as waitAll(msecs, interrupt_incomplete ? new InterruptHandler() : null);
       * if interrupt_incomplete is false, then the ones not done are just
       * left as they are.
       */
      int waitAll(long msecs, boolean interrupt_incomplete) throws InterruptedException {
         return waitAll(msecs, interrupt_incomplete ? new InterruptHandler() : null);
      }

      /**
       * implementation of waitAll. Requires that the subclass implement getRunnable
       * if it doesn't override waitAll.
       */
      protected abstract Runnable getRunnable(FutureResult future, int index);

      public int waitAll(long max_msecs, FutureTimeoutHandler timeout_handler) {
         long wait_msecs = max_msecs;
         long start_msecs = System.currentTimeMillis();
         boolean is_time_left = true;
         int num_unready = 0;
         int ind = 0;
         for(Iterator it=futures().iterator(); it.hasNext(); ++ind) {
            FutureResult future = (FutureResult)it.next();
            /*
            if (!future.isReady()) {
               future.timedGet(wait_msecs);
            }
            */

            // TODO: isn't there a risk i'll block on synchronize?
            synchronized(future) {
               // RunnableTask.isDone()
               // java.util.concurrent.Future.isAvailable
               boolean is_done = future.isReady();
               if (!is_done) {
                  // some time remaining, wait on it 
                  if (is_time_left) {
                     // assumes that a future will notify
                     try {future.wait(wait_msecs);} // InterruptedException
                     catch(InterruptedException e) {Log.warn("got exception while waiting for future " + ind + ": " + e);}
                     is_done = future.isReady();

                     wait_msecs = max_msecs - (System.currentTimeMillis() - start_msecs);
                     if (wait_msecs <= 0) is_time_left = false;
                     else {if (!is_done) Log.warn("time still left, yet future is still not done: " + wait_msecs + " msecs");}
                  }
                  if (!is_done) {
                     num_unready++; 
                     Log.debug("future timed out, index ", ind);
                     if (timeout_handler != null) {
                        timeout_handler.timeout(future, getRunnable(future, ind));
                     }
                  }
               }
            }
         }
         return num_unready;
      }



      /** 
       * iterates through the list interrupting each one.
       *
       * @return the number for which interruptTask() returned true.
       * @exception IllegalArgumentException Might throw upon an unknown Runnable.
       */
      public int interruptAll(Iterator runnables) {
         int count = 0;
         while(runnables.hasNext()) {
            Runnable command = (Runnable)runnables.next(); 
            if (interruptTask(command)) ++count;
         }
         return count;
      }

      /** 
       * Interrupts all Runnables which are currently running (assigned to a thread).
       * (Note: temporarily named "interruptAllTasks" to avoid conflict with PooledExecutor.)
       * 
       * @return the number for which interruptTask() returned true.
       */
      //int interruptAll();
      int interruptAllTasks() {return interruptAll(runnables().iterator());}
   }


   /**
    * subclass of DL's PooledExecutor so that:
    * <ul>
    * <li>i can track what Runnable each Worker is doing.
    * <li>i can implement cancellation
    * <li>the other functions in FutureExecutor
    * </ul>
    *
    */
   public static class MyPooledExecutor extends PooledExecutor implements FutureExecutor {
      /*
       * additional instance variables
       */
      java.util.Map cancelled_ = new java.util.HashMap();
      protected java.util.Map getCancelled() {return cancelled_;}

      /*
       * subclass PooledExecutor.Worker so i can track the current task, and so
       * i can avoid running cancelled ones
       */
      protected class MyWorker /*implements Runnable*/ extends PooledExecutor.Worker {
         protected Runnable currentTask_;

         // protected MyWorker(Runnable firstTask) { currentTask_ = firstTask; }
         protected MyWorker(Runnable firstTask) { super(null); currentTask_ = firstTask; }

         
         public synchronized boolean interruptIf(Runnable task, Thread t) {
            if (task == currentTask_) {t.interrupt(); return true;}
            return false;
         }
         public synchronized Runnable nextTask() throws InterruptedException {
            while (true) {
               currentTask_ = getTask(); // InterruptedException
               if (currentTask_ == null || !getCancelled().containsKey(currentTask_)) 
                  return currentTask_;
               Log.debug("skipping removed task");
               getCancelled().remove(currentTask_);
            }
         }
         
         public void run() {
            try {
               if (currentTask_ != null)
                  currentTask_.run();

               while (nextTask() != null)
                  currentTask_.run();

            }
            catch (InterruptedException ex) { } // fall through
            finally {
               workerDone(this);
            }
         }
      }

      /*
       * constructor
       */

      public MyPooledExecutor(int max_threads) {super(max_threads);}

      /*
       * stuff to implement the FutureExecutor interface, or pretend to.
       * the ones that aren't really implemented properly (futures(), 
       * runnables(), waitAll()) are also not currently used by SharedExecutor.
       */
      public void execute(FutureResult future, Runnable runnable) {
         // TODO: when can this throw? the new proposed Executor interface doesn't do so
         try {execute(runnable);}
         catch(InterruptedException e) {
            Log.warn("execute got interrupted", e); 
            throw new Error("execute got interrupted: " + e.getMessage());
         }
      }
      public Collection futures() {throw new Error("unimplemented");}
      public Collection runnables() {throw new Error("unimplemented");}
      public int waitAll(long max_msecs, FutureTimeoutHandler timeout_handler) throws InterruptedException {
         shutdownAfterProcessingCurrentlyQueuedTasks();
         if (awaitTerminationAfterShutdown(max_msecs)) {
            // BUG: we assume that if timeout_handler is not null, we want to interrupt
            if (timeout_handler != null) shutdownNow();
            // BUG: no convenient way to know how many we interrupted
            return 1;
         }
         return 0;
      }

      /**
       * remove a task that does not yet have a worker.
       * <p>
       * doing this with handOff_ looks horrifically ugly, like
       * draining them all, then putting back the ones that were not removed.
       * so instead we maintain a hash of cancelled ones, and skip
       * them when they are taken.
       */
      public synchronized boolean unqueue(Runnable command) {
         cancelled_.put(command, null);
         // BUG: we have no way to easily tell if it is queued, so we always return true.
         return true;
      }

      /**
       * interrupt the particular Runnable.
       * <p>
       * synchronized since doing things with threads_.
       * addThread is inside synchronized(this), so maybe this is sufficient?
       * except that shutdown_ is set true in PooledExecutor to prevent queuing/handoff.
       * so are we in danger of a task being moved between handoff_ and threads_
       * while we are doing this?
       * we could set shutdown_ = true, but then that might cause an error if a
       * new task comes into execute. what we want to do is prevent just new taking,
       * not new executing.
       */
      public synchronized boolean interruptTask(Runnable command) {
         for (Iterator it = threads_.entrySet().iterator(); it.hasNext(); ) {
            java.util.Map.Entry entry = (java.util.Map.Entry)it.next();
            MyWorker w = (MyWorker)entry.getKey();
            if (w.interruptIf(command, (Thread)(entry.getValue()) ) ) {
               Log.debug("interrupted runnable " + command);
               return true;
            }
            /*if (w.currentTask_ == command) {
               Thread t = (Thread)(entry.getValue());
               t.interrupt();
               return true;
            }*/
         }
         Log.warn("did not find a worker running task " + command + "; making sure it isn't queued");
         return unqueue(command);
      }

      // override base class method to use MyWorker
      protected void addThread(Runnable command) {
         Worker worker = new MyWorker(command);
         Thread thread = getThreadFactory().newThread(worker);
         threads_.put(worker, thread);
         ++poolSize_;
         thread.start();
      }

      // could override workerDone too, if wanted to not have to subclass Worker
      /*
      protected synchronized void workerDone(MyWorker w) {
         threads_.remove(w);
         if (--poolSize_ == 0 && shutdown_) {
            maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads
            notifyAll(); // notify awaitTerminationAfterShutdown
         }
      }
      */

   }


   /**
    * A FutureExecutor which allows group-wise operations on the tasks submitted to it,
    * but actual execution is controlled by an underlying FutureExecutor shared among
    * multiple SharedExecutors.
    * <p>
    * This is not thread safe, though we rely on the shared Executor underneath us to be so.
    * We do not expect to have more execute'd while we are waiting, 
    * and we use ArrayList.
    */
   public static class SharedExecutor extends FutureExecutorAbstract { 
      java.util.List runnables_;
      java.util.List futures_;

      FutureExecutor shared_;
      //MyPooledExecutor shared_;

      public SharedExecutor(/*MyPooledExecutor*/FutureExecutor executor, int capacity) {
         super();
         shared_ = executor;
         runnables_ = new java.util.ArrayList(capacity);
         futures_ = new java.util.ArrayList(capacity);
      }

      public Collection futures() {return futures_;}
      public Collection runnables() {return runnables_;}
      public Runnable getRunnable(FutureResult future, int ind) {return (Runnable)runnables_.get(ind);}

      public void execute(FutureResult future, Runnable runnable) {
         futures_.add(future);
         runnables_.add(runnable);
         shared_.execute(future, runnable);
      }

      public boolean interruptTask(Runnable runnable) {
         return shared_.interruptTask(runnable);
      }

      public boolean unqueue(Runnable runnable) {
         return shared_.unqueue(runnable);
      }
   }
}