[concurrency-interest] draft Carrier API

Doug Lea dl at cs.oswego.edu
Fri Mar 6 10:21:54 EST 2020


[Cross-posting concurrency-interest and loom-dev.]

To continue improving java.util.concurrent support for increasingly
diverse programming styles (while still avoiding arguments about whether
any of them are best!), it would be helpful to provide "BlockingQueues
meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
A sketch is pasted below. To avoid mail-reader glitches, you might want
to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

Suggestions and comments are welcome. An initial implementation class
(LinkedCarrier) should be available shortly after API issues settle;
others later.

...

// API sketches, with "public" omitted throughout

/**
 * A component for sending and receiving messages. Carriers support
 * usages similar to those of BlockingQueues, but additionally
 * implement AutoCloseable, and may be explicitly closed for sending,
 * receiving, or both. Carriers also provide policy-based control for
 * responses to Thread.interrupt while blocked (ignoring, cancelling
 * the current operation only, or closing the carrier). Concrete
 * implementation classes may be created with a given capacity (after
 * which method send will block waiting for available space), or
 * effectively unbounded, in which case method send will never block
 * but may fail with an OutOfMemoryError.
 *
 * Design notes:
 *
 * (1) Both send and receive methods are declared here, but allowing
 * either side to be permanently (vs eventually) closed for send-only
 * or receive-only components. This loses some static type checking
 * opportunities of separate send and receive APIs. However the class
 * includes methods (in the style of Collections.unmodifiableX) to
 * produce views that provide dynamic directionality enforcement.
 *
 * (2) This is an abstract class (rather than interface) providing
 * uniform Observer-syle methods for Selectors and related
 * classes. The alternative is some sort of SPI.
 *
 * (3) To control interactions between Thread interrupts and state,
 * rather than throwing InterruptedExceptions, potentially blocking
 * methods rely on a provided policy to distinguish cancelling the
 * operation vs closing the carrier vs ignoring the interrupt. The
 * default is CANCEL, because it is the least constraining; for
 * example some mixed usages can catch CancellationException to then
 * close only when desired.
 *
 * (4) To broaden coverage of channel-based programming styles,
 * implementations support sendSynchronously, which is otherwise
 * available in BlockingQueues only as the poorly-named and underused
 * method LinkedTransferQueue.transfer.
 */
abstract class Carrier<T> implements AutoCloseable {
    Carrier(OnInterrupt policy);
    Carrier() { this(OnInterrupt.CANCEL); } // default

    // Basic messaging

    /**
     * Consume item, throw if isClosedForReceiving, block if empty.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    T receive() throws ClosedException, CancellationException;

    /**
     * Send item, throw if isClosedForSending, block if full.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    void send(T item) throws ClosedException, CancellationException;

    /** Send and block until item received */
    void sendSynchronously(T item) throws ClosedException,
CancellationException;

    // Timeout versions
    T receive(Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void send(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void sendSynchronously(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    // Non-blocking access
    boolean trySend(T item);        // false if closed or full
    T tryReceive(T resultIfAbsent); // absent if closed or empty
    T peek(T resultIfAbsent);       // may false-positive

    // Termination
    void closeForSending();         // fully close when isClosedForReceiving
    void closeForReceiving();       // vice-versa
    void close();                   // immediate close
    void awaitClose() throws interruptedException;
    void onClose(Runnable closeHandler); // run by thread triggering close

    // Status
    boolean isClosedForSending();
    boolean isClosedForReceiving();
    boolean isClosed();             // true if both sides closed
    boolean isOpen()                { return !isClosed(); }
    boolean isEmpty();
    boolean isFull();               // never true if unbounded
    long    capacity();             // Long.MAX_VALUE if unbounded
    OnInterrupt interruptPolicy();  // return policy

    // linkage support, noops here; locators are opaque cookie-like
identifiers
    protected void registerSource(Carrier<? super T> c, long locator) {}
    // notification of send or close by registered carrier
    protected void sourceEvent(long locator, boolean isClosed) {}

    // views to disable one direction; similar to Collections.unmodifiableX
    static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
    static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);

    // other possible utilities
    Stream<T> stream();             // destructive (consume-on-traverse)
    static <E> Carrier<E> discardingCarrier(); // /dev/null analog
    // TBD: selector as static factory method vs class (as below)
    // TBD: Flow (reactive stream) adaptors
}

class LinkedCarrier<T> extends Carrier<T> {
    // main linked implementation
    // coming soon, based on LinkedTransferQueue algorithms
}

class BufferedCarrier<T> extends Carrier<T> {
    // main array-based implementation(s)
    // coming later, with single- vs multiple- sink/source options
}

/**
 * A Carrier that aggregates sources established in its constructor.
 * The receive method blocks waiting for any to become available, then
 * returns the corresponding item. Selectors are always closed for
 * sending, and may become fully closed when all sources close.
 */
class Selector<T> extends Carrier<T> { // possibly a more specific name
    Selector(<Carrier<? extends T> c, ...) {
        // for each c { c.registerSource(this, locatorFor(c)); }
    }
    boolean isClosedForSending() { return true; }
    // ...
}

/**
 * A policy for responding to Thread.interrupt in blocking methods in
 * classes implementing AutoCloseable
 */
static Enum OnInterrupt {
    IGNORE,  // continue waiting
    CANCEL,  // throw CancellationException
    CLOSE    // close and throw ClosedException
}

// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
    ClosedException(AutoCloseable c); // the closed component
    // ...
}



More information about the Concurrency-interest mailing list