[concurrency-interest]yet another proposal for Executor

Mark D. Anderson mda@discerning.com
Wed, 6 Feb 2002 09:48:36 -0800


my previous proposal does cover additional things (in particular, SharedExecutor), but
here is another idea that actually comes back to being closer to DL's proposal.

i like this one even better :)

Some features:

- now cancel() is part of Callable()

- instead of unqueue() and interrupt() being part of Executor, they
are part of the ControllableFuture (don't you like that name?) which
is returned by FutureExecutor.execute().
This is convenient for the client programmer and more efficient for implementor.

- using the interfaces, waitAll() can now be an external static function

-mda
------------

public interface Callable {
  Object call() throws Exception;
  /** to be called before interrupting or unqueueing */
  void cancel();
}

public interface ControllableFuture extends Future {
  /** 
   * States are: QUEUED, UNQUEUED, RUNNING, DONE_EXCEPTION, DONE_OK
   * Note that there is no INTERRUPTED, since we can't know how that'll be dealt with.
   * isAvailable() == (state == DONE_EXCEPTION || state == DONE_OK)
   */
  int getState();
  boolean unqueue();
  boolean interrupt();
  boolean isInterrupted();
  /** something to synchronize() on to prevent change in state, etc. */
  Object getSynchronizable()
  /** something to wait() on if it is not done. should be done inside synchronize. */
  Object getWaitable();
}

public interface FutureExecutor {
  ControllableFuture execute(Runnable runnable);
  ControllableFuture execute(Callable callable);
}

public interface FutureTimeoutHandler {
   boolean timeout(ControllableFuture future, int index);
}

public static int waitAll(Iterator controllable_futures, long max_msecs, FutureTimeoutHandler timeout_handler) {
  long wait_msecs = max_msecs;
  long start_msecs = System.currentTimeMillis();
  boolean is_time_left = true;
  int num_unqueued = 0;
  int num_timed_out = 0;
  int ind = -1;
  while(controllable_futures.hasNext()) {
    ControllableFuture future = (ControllableFuture)controllable_futures.next();
    ++ind;

    // TODO: isn't there a risk i'll block on synchronize?
    synchronized(future.getSynchronizable()) {
      int state = future.getState();
      if (state == STATE_UNQUEUED) {++num_unqueued; continue;}
      if (state == STATE_DONE_OK || state == STATE_DONE_EXCEPTION) {continue;}

      boolean timed_out = true;
      // some time remaining, wait on it 
      if (is_time_left) {
         // assumes that a future will notify
         try {future.getWaitable().wait(wait_msecs);} // InterruptedException
         catch(InterruptedException e) {Log.warn("got exception while waiting for future " + ind + ": " + e);}
         timed_out = !future.isAvailable();

         wait_msecs = max_msecs - (System.currentTimeMillis() - start_msecs);
         if (wait_msecs <= 0) is_time_left = false;
         else {if (timed_out) Log.warn("time still left, yet future is still not done: " + wait_msecs + " msecs");}
      }
      if (timed_out) {
         ++num_timed_out; 
          Log.debug("future timed out, index ", ind);
          if (timeout_handler != null) {
             timeout_handler.timeout(future, ind);
          }
          // TODO: maybe see if this made it change its state? can it do so while we are synchronized?
       }
     }
  }
  return num_unqueued + num_timed_out;
}