[concurrency-interest] Fun with AbstractQueuedSynchronizers

Doug Lea dl@cs.oswego.edu
Tue, 30 Dec 2003 11:55:24 -0500


We've been fine-tuning AbstractQueuedSynchronizer and checking out
whether it serves as a useful basis for various custom sync utilities
and locks that we don't otherwise provide.  An example of one of these
is below.  I couldn't figure out what to do with it after making it,
so decided to post it here.

It's an analog of a WIN32 "Consumable" Event, that can be done in only
a few lines of implementation code, plus declarations and relays to
tie these to public methods.  (We once contemplated providing
something like this class in j.u.c. The reason we don't is that there
are only rare occasions when you'd prefer to use this over a
Semaphore, which is normally a better choice because it doen't "lose"
signals.)

...
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;

/**
 * A ConsumedIndicator usually represents the occurrence of some kind
 * of event, for example a state transition or a sensed physical
 * phenomenon. It's behavior is similar to that of WIN32 "Consumable
 * Events".  The <tt>set</tt> operation establishes the existence of
 * the event and signals a waiting thread; this is consumed by the
 * <tt>>poll</tt> and <tt>await</tt> methods. Unlike similar
 * constructs such as Semaphores, signals in an ConsumedIndicator do
 * not "accumulate". A <tt>ConsumedIndicator</tt> is either signalled
 * or not, and the signalled status is cleared upon a successful
 * <tt>poll</tt> or <tt>await</tt>. Because of this,
 * ConsumedIndicators are not appropriate when each signal must be
 * dealt with.
 * 
 * <p><b>Sample usage:</b> 
 * <pre>
 * class HouseOfTheFuture {
 *   ConsumedIndicator newspaperArrived = new ConsumedIndicator();
 *   class DoorStepSensor implements Runnable { // run in some thread
 *      public void run() {
 *        for (;;) {
 *           if (sensedNewspaper()) 
 *              newspaperArrived.set();
 *           // ...
 *        }
 *     }
 *   }
 *   class RoboticDog implements Runnable { // run in some thread
 *      public void run() {
 *        for (;;) {
 *           try {
 *              newspaperArrived.await();
 *              fetchNewspaper();
 *              // ...
 *            } catch (InterruptedException ie) { 
 *              return; 
 *            }
 *         }
 *      }
 *   }
 *   // ...
 * }
 * </pre>
 */

public class ConsumedIndicator {
    /**
     * Synchronization control for ConsumedIndicator.
     * Uses AQS state to represent firing: zero for clear, one for set.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        public int acquireExclusiveState(boolean isQueued, int ignore) {
            return compareAndSet(1, 0)? 0 : -1;
        }

        public boolean releaseExclusiveState(int ignore) {
            return compareAndSet(0, 1);
        }
    }

    /** Performs all sync control */
    private final Sync sync = new Sync();

    /**
     * Constructs a <tt>ConsumedIndicator</tt> with initially clear status.
     */
    public ConsumedIndicator() { 
    }

    /**
     * Set the indicator, waking a thread that may be waiting for it
     */
    public void set() {
        sync.releaseExclusive(0);
    }

    /**
     * Wait, if necessary, until indicator has been set, and when it
     * does, clear it.
     * @throws InterruptedException if the current thread is interrupted
     * while waiting.
     */
    public void await() throws InterruptedException {
        sync.acquireExclusiveInterruptibly(0);
    }
  
    /**
     * Return whether the indicator has been set, if so, clearing it.
     * @return true if the indicator has been set
     */
    public boolean poll() {
        return sync.acquireExclusiveState(false, 0) >= 0;
    }

    /**
     * Return whether the indicator has been set in given time, if so clearing it.
     * @param timeout the maximum time to wait
     * @param unit the time unit of the <tt>timeout</tt> argument.
     * @return true if the indicator has been set in the given time
     * @throws InterruptedException if the current thread is interrupted
     * while waiting.
     */
    public boolean poll(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.acquireExclusiveTimed(0, unit.toNanos(timeout));
    }
}