[concurrency-interest] jdk9 Candidate classes Flow and SubmissionPublisher

Viktor Klang viktor.klang at gmail.com
Mon Jan 19 05:00:54 EST 2015


Hi Rémi,

Thanks for your reply, I'm sorry my response got delayed. (answers inline)

On Sat, Jan 17, 2015 at 12:44 PM, Remi Forax <forax at univ-mlv.fr> wrote:

>
> On 01/16/2015 04:53 PM, Viktor Klang wrote:
>
> Hi Rémi,
>
>  Thanks for the great questions
> (answering inline)
>
> On Fri, Jan 16, 2015 at 4:03 PM, Remi Forax <forax at univ-mlv.fr> wrote:
>
>>  Viktor, Doug,
>>
>> I see at least 3 changes that in my opinion should be done on the current
>> API.
>> - Subscriber.onSubscribe and onNext/onError/onComplete should not be part
>> of the same interface.
>>
>
>  Unfortunately that did not pan out (we've did a lot of research/PoCs,
> I'd be surprised if we have missed any permutation) for the reason that
> failure to create a Subscription due to a failure in the Publisher requires
> the possibility of issuing onError without a preceding onSubscribe.
>
>
> This seems plainly wrong to me, it means that you have no way to
> distinguish between a rejection of the subscription and an error in the
> process. So how can you write something useful in onError given that at
> that point you have no way to distinguish those two kinds of error anymore.
>

First of all, let me assure you that the RS spec has been thoughtfully and
thoroughly put together—there's been a lot of time and hard work going into
the semantics of all of the interfaces and methods.

RS Subscriber.onError is primarily for fail-fast cleanup, as such there is
not a need for being able to distinguish the difference between
subscription rejection (which can occur at any time) and "error in the
process". I'd consider not leaking resources as a very useful thing,
especially for building robust applications.

The ability to programmatically distinguish between subscription rejection
and "error in the process" has "lo'-to-no" utility as they both have the
exact same consequences for the Subscriber instance. Capabilities are
slightly different for Processor (see spec).


>
>
>
>
>    We are also considering allowing fast completion (avoiding having to
> call onSubscribe if the stream is already completed).
>
>
>>    This will make the workflow more explicit and avoid to have the
>> subscription object mutable
>>
>
>  The Subscription need not be mutable right now, did you mean Subscriber?
>
>
> yes, the field that store the subscription is mutable which is a receipt
> for disaster because a Publisher like the SubmissionPublisher will expose
> the Subscriber but accessing the field that store the Subscription will
> lead to a race condition and an unsafe publication.
>

A Publisher does not expose a Subscriber (`subscribe` returns void). A
Subscription can have a final-field to the Subscriber and then subsequently
publish itself to the Subscriber safely:

val s = new WhateverSubscription(someSubscriber)
someSubscriber.onSubscribe(s)


>
> In my opinion, the Subscription object play the same role as a Future, you
> can control the observer from the inside or from the outside.
>

As per RS spec, it is to be used by the Subscriber.


> So this object should be created before try to do any async work otherwise
> you mose the burden to access the Subscription object to the user instead
> of to the framework.
>

Referring to my previous snippet/example—the Subscription is created
before. In RS the Subscription is the association between a Publisher and a
Subscriber, it is not to be exposed to the outside.


>
>
>
>
>>
>> - onComplete and onError should be one method and not two,
>>   let's call it onComplete(Throwable throwableOrNull).
>>   I think it's better two says that you have to way to complete, either
>> normally or with an exception.
>>   And from my experience, the code of onError and onComplete tend to be
>> fairly similar.
>>
>
> That was an interesting suggestion & discussion we had quite early (1+
> year ago), but we decided both that `null`-checking as being an
> anti-pattern, as well as mixing failure management code with
> success-management code is brittle and mixes concerns.
> Also, a valid follow-on question was: where do you stop?
> onNext(Either<Throwable,T> elemOrCompleteOrError)?
> Function<Either<Throwable, T>, Void>? Function<Either<Subscription,
> Either<Throwable, T>>, Void>?
>
>  So it was deemed that having a distinct method not only made it easier
> to avoid NPEs, made it easy to separate the concerns in the
> implementations, easier to find what you were looking for in the javadoc,
> the spec etc.
>
>
> I don't ask to mix onNext and onError (or onNext, onError, onComplete) but
> only onError and onComplete because from my own experience the code you
> write here is fairly similar.
>

In my experience it is quite different (just as the code I write in a catch
block typically looks different from the code I write in a try block).

A very important fact is that the semantics for onError and onComplete are
quite dissimilar: onError is an ASAP signal, and onComplete is an ALAP
signal.


> Agree that null-checking is not great, but we have Optional now so
> onComplete(Optional<Throwable>) is IMO fine.
>
>
This suggestion is definitely better, but does not address the core problem
of mixing concerns and dissimilar semantics, do you agree?


> This remember me a point I've forgotten to mention, onError should not
> take a Throwable as parameter,
> it means that if a java.lang.Error is raised like OutOfMemoryError instead
> of bailing out, the API suggest to try to recover on this kind of error,
> worst most of the time, the Error will be propagated to all the Subscriber
> along the chain, each of them trying to do something that may require to
> allocate more memory.
>

(onError is not about error recovery, it is about fail-fast cleanup of
resources.)

Whether or not a Publisher implementation catches OOME (which it typically
shouldn't (IMO) as per: "An Error is a subclass of Throwable that indicates
serious problems that a reasonable application should not try to catch." -
https://docs.oracle.com/javase/6/docs/api/java/lang/Error.html)
is not really about RS at all—the same rules apply as whenever a try-catch
is written in any piece of logic.

The reason for onError to take a Throwable rather than an Exception is:
there exist Throwables that aren't Errors and aren't Exceptions that
Publishers would want to be able to signal downstream, and enforcing
onError to take an Exception rather than Throwable wouldn't solve anything,
as a Publisher who would like to pass down an OOME would just pass it down
as the cause of a wrapper Exception.

Besides that, there is quite a bit of precedence for having Throwables in
parameters:
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#completeExceptionally-java.lang.Throwable-
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#obtrudeException-java.lang.Throwable-
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#exceptionally-java.util.function.Function-
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#handle-java.util.function.BiFunction-
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#handleAsync-java.util.function.BiFunction-
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#handleAsync-java.util.function.BiFunction-java.util.concurrent.Executor-
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#whenComplete-java.util.function.BiConsumer-
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#whenCompleteAsync-java.util.function.BiConsumer-
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#whenCompleteAsync-java.util.function.BiConsumer-
http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#whenCompleteAsync-java.util.function.BiConsumer-java.util.concurrent.Executor-

(For even more prior art, search for "with parameters of type Throwable" in
http://docs.oracle.com/javase/8/docs/api/java/lang/class-use/Throwable.html)


>
>
>>
>> - onComplete(Throwable) should be a default method (the implementation do
>> nothing),
>>   so Observer and Consumer will be equivalent in term of function type.
>>
>
>  Since the target is JDK9 onNext, onComplete and onError can definitely
> be default methods, the reason why they aren't currently in the RS spec is
> because we started before Java 8 shipped and keeping Java6-7 compatibility
> made sense.
>
>
> Java6 compatibility make sense for the reactive-streams API not for the
> j.u.c.Flow API.
>

Default methods would not really add value to the RS interfaces (there
isn't AFAICT anything that they could do by default) and since jdk9 would
be compiled with at least Java 8 classfile format version, JDK 6-7 would
still need to use a backport jar, just as jsr166 uses if you want to use
FJP on pre JDK7 meaning that this [the compatibility argument] is not a
problem for RS.


>
>
>
>
>>
>>   Re-using the example of Doug,
>>
>>   long requestSize = ...
>>   Observer<T> observer = consumer::accept;   // auto-conversion from a
>> Consumer
>>   publisher.subscribe(subscription -> {
>>       long half  = requestSize / 2; // re-request when half consumed
>>       subscription.request(half);
>>       return new Observer<T>() {
>>           private long count = half;
>>           public void onNext(T item) {
>>               if (--count <= 0) {
>>                    subscription.request(count = half);
>>               }
>>               observer.onNext(item);
>>           }
>>           public void onComplete(Throwable ex) {
>>             observer.onComplete(ex);
>>           }
>>       };
>>   });
>>
>>   Note that in term of implementation you can still have an
>> implementation that implements both
>>   Subscriber and Observer with onSubscribe() returning this.
>>
>> and I'm still not convince that this should be integrated in JDK.
>>
>
>  I hope to be able to address those concerns :)
> Anything that I missed to reply to?
>
>
> Why the reactive-streams API has to be integrated in JDK9,
> the only compelling reason I see is the one given by Doug, people will try
> to use Stream for that otherwise.
>

For Doug's compelling reasons, but also as a way for library writers and
advanced users to integrate with (the most common users of j.u.c, to have a
common platform for interop in the JDK).
(see Greggs comment earlier)


>
> In my opinion, integrated the reactive-streams API is a kind of
> loose-loose deal,
> from the reactive-streams API, it will be set in stone so no v2 that
> overcome the shortcoming of the v1,
>

This is not an issue—RS has been developed with this in mind.


> from the JDK perspective, having to integrated a sub-optimal API because
> the API has to be Java 6 compatible,
>

This is not an issue either—the RS as in j.u.c.Flow would not need to be
Java 6 compatible since it's easy to keep a compat jar.


> knowing that we will have to maintain forever an API that has changed
> several times in the past few years.
>

I'm not sure we're talking about the same thing here, are you talking about
Rx and not RS?

Since the Reactive Streams effort started, the -only- API change to
Publisher/Subscriber/Subscription/Processor has been switching from `int`
to `long` for `Subscription.request`. (see:
https://github.com/reactive-streams/reactive-streams/blob/e58fed62249ad6fbd36467d1bbe5c486f31a8c0e/spi/src/main/scala/asyncrx/spi/Publisher.scala
)

Fortunately (well, to be honest, by design) the RS API is interfaces
only—with a spec, a TCK and a set of example implementations that are not
only documented but also of course pass the TCK.

With that in mind, I cannot imagine anything more easy to maintain besides
not maintaining it at all, can you?


>
> Rémi
>
>
>
>
>>
>> Rémi
>>
>>
>> On 01/16/2015 01:59 PM, Viktor Klang wrote:
>>
>> Hi Rémi,
>>
>>  I'll try to address your concerns below:
>>
>> On Fri, Jan 16, 2015 at 1:48 AM, Remi Forax <forax at univ-mlv.fr> wrote:
>>
>>>
>>>
>>> On 01/16/2015 12:50 AM, Doug Lea wrote:
>>>
>>>> On 01/15/2015 05:59 PM, Remi Forax wrote:
>>>>
>>>>> I think it's too soon to try to include an API like this in JDK9.
>>>>> Currently, everybody seems to think it can come with a better API for
>>>>> a reactive
>>>>> framework.
>>>>>
>>>>
>>>> Where a lot of those everybodies were involved in formulating
>>>> these APIs. See the list at http://www.reactive-streams.org/
>>>>
>>>
>>>  yes, I know, having a lot of people hammering the API is great thing,
>>> but why do you want to include that in the JDK,
>>
>>  it's better for the API to stay out of the JDK because the release cycle
>>> is faster,
>>
>>  it's better for the JDK because the API is still moving.
>>
>>
>>  The RS interfaces and method signatures has not seen an update in a
>> long time and is considered done (RS is in 1.0.0.RC1 with an RC2 shipped
>> soon with spec clarifications and improvements to the TCK and 1.0.0.final
>> is expected shortly thereafter).
>>
>>
>>>
>>>
>>>
>>>> I think it's better to wait and see.
>>>>>
>>>>
>>>> We waited. We saw :-)
>>>>
>>>
>>>  I heard about Rx in 2009, and since that I have observed two things,
>>> the use cases have changed, the APIs have changed, even the manifesto
>>> has changed recently.
>>
>>  This is not a bad thing, it means the community around the reactive
>>> stuff is vibrant so
>>> why do you want to stop that rush of cool ideas.
>>>
>>
>>  RS is specifically for interop—a common vocabulary and semantics to
>> build asynchronous, non-blocking streaming transformation with non-blocking
>> back pressure. So from an API perspective it is targeted towards library
>> developers, just as most of the things in java.util.concurrent. End user
>> APIs are provided on top, through implementations like SubmissionPublisher
>> or RxJava, Project Reactor, Akka Streams, Ratpack etc.
>>
>>
>>>
>>> And, if you think you want to freeze the API, I think a JSR is a better
>>> vehicle.
>>>
>>
>>  Given the the target and scope of RS, it seems to me that it fits
>> nicely within the scope of JSR166?
>>  There is a spec, 4 pure interfaces with a sum total of 7 methods and a
>> TCK to validate implementations:
>> https://github.com/reactive-streams/reactive-streams
>>
>>
>>>
>>>> -Doug
>>>>
>>>
>>> Rémi
>>>
>>>
>>>
>>>>
>>>>
>>>>> Rémi
>>>>>
>>>>> On 01/15/2015 06:25 PM, Doug Lea wrote:
>>>>>
>>>>>>
>>>>>> Here's the only set of candidates for new jdk9 j.u.c classes:
>>>>>>
>>>>>> As discussed a few months ago, there is no single best fluent
>>>>>> async/parallel API. CompletableFuture/CompletionStage best supports
>>>>>> continuation-style programming on futures, and java.util.stream best
>>>>>> supports (multi-stage, possibly-parallel) "pull" style operations on
>>>>>> the elements of collections. Until now, one missing category was
>>>>>> "push" style operations on items as they become available from an
>>>>>> active source. We are not alone in wanting a standard way to support
>>>>>> this. Over the past year, the "reactive-streams"
>>>>>> (http://www.reactive-streams.org/) effort has been defining a minimal
>>>>>> set of interfaces expressing commonalities and allowing
>>>>>> interoperablility across frameworks (including Rx and Akka Play), that
>>>>>> is nearing release. These interfaces include provisions for a simple
>>>>>> form of async flow control allowing developers to address resource
>>>>>> control issues that can otherwise cause problems in push-based
>>>>>> systems. Supporting this mini-framework helps avoid unpleasant
>>>>>> surprises possible when trying to use pull-style APIs for "hot"
>>>>>> reactive sources (but conversely is not as good a choice as
>>>>>> java.util.Stream for "cold" sources like collections).
>>>>>>
>>>>>> The four intertwined interfaces (Publisher, Subscriber, Subscription,
>>>>>> Processor) are defined within the same class "Flow", that also
>>>>>> includes the first of some planned support methods to establish and
>>>>>> use Flow components, including tie-ins to java.util.streams and
>>>>>> CompletableFutures. See
>>>>>>
>>>>>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html
>>>>>>
>>>>>> (Logistically, the only alternative to class Flow would have been
>>>>>> to introduce a subpackage, which unnecessarily complicates usage. And
>>>>>> "import static java.util.concurrent.Flow;" is about as convenient as
>>>>>> "import java.util.concurrent.flow.*;" would be.)
>>>>>>
>>>>>> Also, the new stand-alone class SubmissionPublisher can serve as a
>>>>>> bridge from various kinds of item producers to Flow components, and is
>>>>>> useful in its own right. It is a form of ordered multicaster that
>>>>>> people have complained that we don't support in j.u.c, but now do.
>>>>>> See
>>>>>>
>>>>>> http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html
>>>>>>
>>>>>>
>>>>>> Disclaimers: These are only candidates for inclusion.  The are in
>>>>>> preliminary form and will change. But comments and suggestions would
>>>>>> be welcome. As with the other candidate additions, if you are brave,
>>>>>> you can try out snapshots on jdk8+ by getting
>>>>>>   http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
>>>>>> and running java -Xbootclasspath/p:jsr166.jar
>>>>>>
>>>>>> -Doug
>>>>>>
>>>>>> _______________________________________________
>>>>>> Concurrency-interest mailing list
>>>>>> Concurrency-interest at cs.oswego.edu
>>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>>
>>>>>
>>>>> _______________________________________________
>>>>> Concurrency-interest mailing list
>>>>> Concurrency-interest at cs.oswego.edu
>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>
>>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> Concurrency-interest mailing list
>>>> Concurrency-interest at cs.oswego.edu
>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.oswego.edu
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>
>>
>>
>>
>>  --
>>   Cheers,
>>>>
>>
>>
>
>
>  --
>   Cheers,
>>
>
>


-- 
Cheers,
√
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20150119/b4dbc360/attachment-0001.html>


More information about the Concurrency-interest mailing list