[concurrency-interest] draft Carrier API

Doug Lea dl at cs.oswego.edu
Sat Mar 7 11:29:22 EST 2020


Collecting replies/responses, with updated sketches pasted below and at
http://gee.cs.oswego.edu/dl/wwwtmp/Carrier.java

On 3/6/20 3:56 PM, John Rose wrote:

> This design puts both endpoints on one type, as opposed to two 
> similar types, like InputStream and OutputStream. This leads to
> fewer types and objects (good) but broader ones. Broader is little
> less good, since most use points only care about 1/2 of the methods;
> the other 1/2 is then noise.

The main API design issue here is that there are three (not two) views
of a Carrier: the protocol state (closed, empty, etc), sender-side, and
 receiver-side operations. If you split them, you need at least four
interfaces/classes total (one to combine them). Doing this in a way that
does not result in nearly all usages needing the combined interface
requires other tradeoffs (like declaring some bookkeeping methods public
and breaking read-only-ness of base interface). But given the reaction
so far, I'm back to thinking this can be done in a way that more people
will prefer. (Aside: I've waffled on this many many times, including
pre-j.u.c
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
, and several previous Carrier drafts.)

So, back to a refreshed 4-interface version.

On 3/6/20 1:04 PM, Dávid Karnok wrote:

> onClose(Runnable): will this allow one handler ...

Thanks for the prod. It is much better to define:
  /** Returns a CompletableFuture that isDone when closed. */
  CompletionStage<Carriable<T>> onClose();
In which case these questions and others are already answered, and we
can also omit awaitClose method.

> 
> tryReceive(T def): always somewhat trouble to expect a default value 
> of T, perhaps return Optional<T> if T is planned to be non-null.

I think best to keep this, but also add a variant of your other suggestion:
  boolean tryConsume(Consumer<? super T> proc); // false if closed or empty

> + closeExceptionally(Throwable) :

Yes, thanks; for the same reasons we added to SubmissionPublisher. Also
adding getClosedException() method.

> + onSenderReady(Runnable r) receiveAsPublisher(Executor) sendAsSubscriber(Executor)

... among other possibilities. I'm leaving interplay with Flow as TBD
for now, in part because...

On 3/6/20 5:14 PM, Thomas May wrote:

> It also could introduce interesting patterns like multiplexing… (IE,
> having multiple receivers getting the same message)

We already have a good multicaster, SubmissionPublisher. But I'm still
not sure of the best linkages.

On 3/6/20 6:06 PM, Alex Otenko wrote:

> Can methods return something more useful than void? Eg something
> that can be used to test progress? (I am afraid my imagination is
> limited to returning a ticket number, and a sequencer API to inspect
> whether send / close has a matching receive / await for such an
> event)

I can't think of enough use cases to justified added cost. Can you?

> 
> SendSynchronously with timeout is ambiguous. If timeout occurs, was
> it placed in the buffer, and not received yet, or not even buffered?
> 

The only thing you know is that upon exception, the item cannot have
been (and never will be) received. This is no different than other
methods. (although you are right that bounded+synchronous+timeout is
the most complicated to implement.)

... pasting updated draft ...


// snapshot: Sat Mar  7 11:13:22 2020  Doug Lea  (dl at nuc40)

// API sketches, with "public" omitted throughout

/**
 * A component for sending and receiving data. Carriers support
 * usages similar to those of BlockingQueues, but additionally
 * implement AutoCloseable, and may be explicitly closed for sending,
 * receiving, or both.
 *
 * This interface combines three sets of methods, defined in three
 * interfaces: Carriable methods access protocol state and
 * configuration. Interfaces CarrierSender and CarrierReceiver extend
 * Carriable with sender- and receiver- side views. Finally, this
 * interface combines these views.
 *
 * To control interactions between Thread interrupts and state, rather
 * than throwing InterruptedExceptions, potentially blocking methods
 * rely on a provided policy to distinguish cancelling the operation
 * vs closing the carrier vs ignoring the interrupt. The default for
 * current implementations is CANCEL, because it is the least
 * constraining; for example some mixed usages can catch
 * CancellationException to then close only when desired.
 *
 * Concrete implementation classes may enforce a given capacity (after
 * which method send will block waiting for available space), or be
 * effectively unbounded, in which case method send will never block
 * but may fail with an OutOfMemoryError.
 */
interface Carrier<T> extends CarrierSender<T>, CarrierReceiver<T> {
    // TBD: factory methods for jdk implementations
    // some utility methods, such as...
    static <E> CarrierReceiver<E> discardingCarrier(); // /dev/null analog
    // TBD: Flow (reactive stream) adaptors
}

/**
 * Methods accessing the protocol state and configuration of a
 * Carrier.
 */
interface Carriable<T> extends AutoCloseable {
    boolean isClosedForSending();
    boolean isClosedForReceiving();
    boolean isClosed();             // true if both sides closed
    boolean isOpen();               // { return !isClosed(); }
    boolean isEmpty();
    boolean isFull();               // never true if unbounded
    long    capacity();             // Long.MAX_VALUE if unbounded
    OnInterrupt interruptPolicy();  // return policy

    void close();                   // immediate close both sides
    void closeExceptionally(Throwable cause); // record as cause
    Throwable getClosedException();

    /** Returns a CompletableFuture that isDone when closed. */
    CompletionStage<Carriable<T>> onClose();
}

/**
 * Methods defining the sender-side view of a Carrier.
 */
interface CarrierSender<T> extends Carriable<T> {
    /**
     * Send item, throw if isClosedForSending, block if full.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    void send(T item) throws ClosedException, CancellationException;

    /** Send and block until item received */
    void sendSynchronously(T item) throws ClosedException,
CancellationException;

    /** Try to send, upon timeout, the item is no longer available. */
    void send(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;
    void sendSynchronously(T item, Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    boolean trySend(T item);        // false if closed or full
    void closeForSending();         // fully close when isClosedForReceiving

    // linkage support; locators are opaque cookie-like identifiers
    void registerSource(CarrierSender<? super T> c, long locator);
}

/**
 * Methods defining the receiver-side view of a Carrier.
 */
interface CarrierReceiver<T> extends Carriable<T> {
    /**
     * Consume item, throw if isClosedForReceiving, block if empty.
     * May cancel or close on interrupt, depending on OnInterrupt policy.
     */
    T receive() throws ClosedException, CancellationException;
    T receive(Duration timeout)
        throws ClosedException, CancellationException, TimeoutException;

    T tryReceive(T resultIfAbsent); // absent if closed or empty
    T peek(T resultIfAbsent);       // may false-positive

    void closeForReceiving();       // fully close when isClosedForSending

    boolean tryConsume(Consumer<? super T> proc); // false if closed or
empty
    Stream<T> stream();             // destructive (consume-on-traverse)

    // notification of send or close by registered source
    void sourceSent(CarrierSender<? extends T> source, long locator, T
item);
    void sourceClosed(CarrierSender<? extends T> source, long locator);
}

// TBD: provide abstract class AbstractCarrier<T>.

class LinkedCarrier<T> implements Carrier<T> {
    LinkedCarrier(OnInterrupt policy);
    LinkedCarrier() { this(OnInterrupt.CANCEL); } // default
    // main linked implementation
    // coming soon, based on LinkedTransferQueue algorithms
}

class BufferedCarrier<T> implemnts Carrier<T> {
    // main array-based implementation(s)
    // coming later, with single- vs multiple- sink/source options
}

/**
 * A Carrier that aggregates sources established in its constructor.
 * The receive method blocks waiting for any to become available, then
 * returns the corresponding item.
 */
class CarrierSelector<T> implements CarrierReceiver<T> {
    Selector(<CarrierSender<? extends T> c, ...) {
        // for each c { c.registerSource(this, locatorFor(c)); }
    }
}

/**
 * A policy for responding to Thread.interrupt in blocking methods in
 * classes implementing AutoCloseable
 */
static Enum OnInterrupt {
    IGNORE,  // continue waiting
    CANCEL,  // throw CancellationException
    CLOSE    // close and throw ClosedException
}

// This could be placed in java.lang for use with any AutoCloseable
class ClosedException extends IllegalStateException {
    ClosedException(AutoCloseable c); // the closed component
    // copied from ExecutionException:
    /**
     * Constructs an {@code ClosedException} with the specified cause.
     * The detail message is set to {@code (cause == null ? null :
     * cause.toString())} (which typically contains the class and
     * detail message of {@code cause}).
     *
     * @param  cause the cause (which is saved for later retrieval by the
     *         {@link #getCause()} method)
     */
    public ClosedException(Throwable cause) {
        super(cause);
    }
    // ...
}






More information about the Concurrency-interest mailing list