[concurrency-interest] draft Carrier API

Kasper Nielsen kasperni at gmail.com
Sun Mar 8 18:59:20 EDT 2020


Just a quick comment,
I would much prefer if Carrier stuck with using the standard [long
timeout, TimeUnit unit]
used throughout java.util.concurrent. Instead of adopting Duration for
a single class.

/Kasper

On Fri, 6 Mar 2020 at 15:22, Doug Lea <dl at cs.oswego.edu> wrote:
>
>
> [Cross-posting concurrency-interest and loom-dev.]
>
> To continue improving java.util.concurrent support for increasingly
> diverse programming styles (while still avoiding arguments about whether
> any of them are best!), it would be helpful to provide "BlockingQueues
> meet AutoCloseable" APIs that are loom-friendly, but not loom-specific.
> A sketch is pasted below. To avoid mail-reader glitches, you might want
> to read updated snapshots at gee.cs.oswego.edu/dl/wwwtmp/Carrier.java
>
> Suggestions and comments are welcome. An initial implementation class
> (LinkedCarrier) should be available shortly after API issues settle;
> others later.
>
> ...
>
> // API sketches, with "public" omitted throughout
>
> /**
>  * A component for sending and receiving messages. Carriers support
>  * usages similar to those of BlockingQueues, but additionally
>  * implement AutoCloseable, and may be explicitly closed for sending,
>  * receiving, or both. Carriers also provide policy-based control for
>  * responses to Thread.interrupt while blocked (ignoring, cancelling
>  * the current operation only, or closing the carrier). Concrete
>  * implementation classes may be created with a given capacity (after
>  * which method send will block waiting for available space), or
>  * effectively unbounded, in which case method send will never block
>  * but may fail with an OutOfMemoryError.
>  *
>  * Design notes:
>  *
>  * (1) Both send and receive methods are declared here, but allowing
>  * either side to be permanently (vs eventually) closed for send-only
>  * or receive-only components. This loses some static type checking
>  * opportunities of separate send and receive APIs. However the class
>  * includes methods (in the style of Collections.unmodifiableX) to
>  * produce views that provide dynamic directionality enforcement.
>  *
>  * (2) This is an abstract class (rather than interface) providing
>  * uniform Observer-syle methods for Selectors and related
>  * classes. The alternative is some sort of SPI.
>  *
>  * (3) To control interactions between Thread interrupts and state,
>  * rather than throwing InterruptedExceptions, potentially blocking
>  * methods rely on a provided policy to distinguish cancelling the
>  * operation vs closing the carrier vs ignoring the interrupt. The
>  * default is CANCEL, because it is the least constraining; for
>  * example some mixed usages can catch CancellationException to then
>  * close only when desired.
>  *
>  * (4) To broaden coverage of channel-based programming styles,
>  * implementations support sendSynchronously, which is otherwise
>  * available in BlockingQueues only as the poorly-named and underused
>  * method LinkedTransferQueue.transfer.
>  */
> abstract class Carrier<T> implements AutoCloseable {
>     Carrier(OnInterrupt policy);
>     Carrier() { this(OnInterrupt.CANCEL); } // default
>
>     // Basic messaging
>
>     /**
>      * Consume item, throw if isClosedForReceiving, block if empty.
>      * May cancel or close on interrupt, depending on OnInterrupt policy.
>      */
>     T receive() throws ClosedException, CancellationException;
>
>     /**
>      * Send item, throw if isClosedForSending, block if full.
>      * May cancel or close on interrupt, depending on OnInterrupt policy.
>      */
>     void send(T item) throws ClosedException, CancellationException;
>
>     /** Send and block until item received */
>     void sendSynchronously(T item) throws ClosedException,
> CancellationException;
>
>     // Timeout versions
>     T receive(Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>     void send(T item, Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>     void sendSynchronously(T item, Duration timeout)
>         throws ClosedException, CancellationException, TimeoutException;
>
>     // Non-blocking access
>     boolean trySend(T item);        // false if closed or full
>     T tryReceive(T resultIfAbsent); // absent if closed or empty
>     T peek(T resultIfAbsent);       // may false-positive
>
>     // Termination
>     void closeForSending();         // fully close when isClosedForReceiving
>     void closeForReceiving();       // vice-versa
>     void close();                   // immediate close
>     void awaitClose() throws interruptedException;
>     void onClose(Runnable closeHandler); // run by thread triggering close
>
>     // Status
>     boolean isClosedForSending();
>     boolean isClosedForReceiving();
>     boolean isClosed();             // true if both sides closed
>     boolean isOpen()                { return !isClosed(); }
>     boolean isEmpty();
>     boolean isFull();               // never true if unbounded
>     long    capacity();             // Long.MAX_VALUE if unbounded
>     OnInterrupt interruptPolicy();  // return policy
>
>     // linkage support, noops here; locators are opaque cookie-like
> identifiers
>     protected void registerSource(Carrier<? super T> c, long locator) {}
>     // notification of send or close by registered carrier
>     protected void sourceEvent(long locator, boolean isClosed) {}
>
>     // views to disable one direction; similar to Collections.unmodifiableX
>     static <E> Carrier<E> sendOnlyCarrier(Carrier<E> c);
>     static <E> Carrier<E> receiveOnlyCarrier(Carrier<E> c);
>
>     // other possible utilities
>     Stream<T> stream();             // destructive (consume-on-traverse)
>     static <E> Carrier<E> discardingCarrier(); // /dev/null analog
>     // TBD: selector as static factory method vs class (as below)
>     // TBD: Flow (reactive stream) adaptors
> }
>
> class LinkedCarrier<T> extends Carrier<T> {
>     // main linked implementation
>     // coming soon, based on LinkedTransferQueue algorithms
> }
>
> class BufferedCarrier<T> extends Carrier<T> {
>     // main array-based implementation(s)
>     // coming later, with single- vs multiple- sink/source options
> }
>
> /**
>  * A Carrier that aggregates sources established in its constructor.
>  * The receive method blocks waiting for any to become available, then
>  * returns the corresponding item. Selectors are always closed for
>  * sending, and may become fully closed when all sources close.
>  */
> class Selector<T> extends Carrier<T> { // possibly a more specific name
>     Selector(<Carrier<? extends T> c, ...) {
>         // for each c { c.registerSource(this, locatorFor(c)); }
>     }
>     boolean isClosedForSending() { return true; }
>     // ...
> }
>
> /**
>  * A policy for responding to Thread.interrupt in blocking methods in
>  * classes implementing AutoCloseable
>  */
> static Enum OnInterrupt {
>     IGNORE,  // continue waiting
>     CANCEL,  // throw CancellationException
>     CLOSE    // close and throw ClosedException
> }
>
> // This could be placed in java.lang for use with any AutoCloseable
> class ClosedException extends IllegalStateException {
>     ClosedException(AutoCloseable c); // the closed component
>     // ...
> }
>


More information about the Concurrency-interest mailing list