[concurrency-interest] jdk9 Candidate classes Flow and SubmissionPublisher

Remi Forax forax at univ-mlv.fr
Sat Jan 17 06:44:58 EST 2015

On 01/16/2015 04:53 PM, Viktor Klang wrote:
> Hi Rémi,
> Thanks for the great questions
> (answering inline)
> On Fri, Jan 16, 2015 at 4:03 PM, Remi Forax <forax at univ-mlv.fr 
> <mailto:forax at univ-mlv.fr>> wrote:
>     Viktor, Doug,
>     I see at least 3 changes that in my opinion should be done on the
>     current API.
>     - Subscriber.onSubscribe and onNext/onError/onComplete should not
>     be part of the same interface.
> Unfortunately that did not pan out (we've did a lot of research/PoCs, 
> I'd be surprised if we have missed any permutation) for the reason 
> that failure to create a Subscription due to a failure in the 
> Publisher requires the possibility of issuing onError without a 
> preceding onSubscribe.

This seems plainly wrong to me, it means that you have no way to 
distinguish between a rejection of the subscription and an error in the 
process. So how can you write something useful in onError given that at 
that point you have no way to distinguish those two kinds of error anymore.

> We are also considering allowing fast completion (avoiding having to 
> call onSubscribe if the stream is already completed).
>       This will make the workflow more explicit and avoid to have the
>     subscription object mutable
> The Subscription need not be mutable right now, did you mean Subscriber?

yes, the field that store the subscription is mutable which is a receipt 
for disaster because a Publisher like the SubmissionPublisher will 
expose the Subscriber but accessing the field that store the 
Subscription will lead to a race condition and an unsafe publication.

In my opinion, the Subscription object play the same role as a Future, 
you can control the observer from the inside or from the outside. So 
this object should be created before try to do any async work otherwise 
you mose the burden to access the Subscription object to the user 
instead of to the framework.

>     - onComplete and onError should be one method and not two,
>       let's call it onComplete(Throwable throwableOrNull).
>       I think it's better two says that you have to way to complete,
>     either normally or with an exception.
>       And from my experience, the code of onError and onComplete tend
>     to be fairly similar.
> That was an interesting suggestion & discussion we had quite early (1+ 
> year ago), but we decided both that `null`-checking as being an 
> anti-pattern, as well as mixing failure management code with 
> success-management code is brittle and mixes concerns.
> Also, a valid follow-on question was: where do you stop? 
> onNext(Either<Throwable,T> elemOrCompleteOrError)? 
> Function<Either<Throwable, T>, Void>? Function<Either<Subscription, 
> Either<Throwable, T>>, Void>?
> So it was deemed that having a distinct method not only made it easier 
> to avoid NPEs, made it easy to separate the concerns in the 
> implementations, easier to find what you were looking for in the 
> javadoc, the spec etc.

I don't ask to mix onNext and onError (or onNext, onError, onComplete) 
but only onError and onComplete because from my own experience the code 
you write here is fairly similar.
Agree that null-checking is not great, but we have Optional now so 
onComplete(Optional<Throwable>) is IMO fine.

This remember me a point I've forgotten to mention, onError should not 
take a Throwable as parameter,
it means that if a java.lang.Error is raised like OutOfMemoryError 
instead of bailing out, the API suggest to try to recover on this kind 
of error, worst most of the time, the Error will be propagated to all 
the Subscriber along the chain, each of them trying to do something that 
may require to allocate more memory.

>     - onComplete(Throwable) should be a default method (the
>     implementation do nothing),
>       so Observer and Consumer will be equivalent in term of function
>     type.
> Since the target is JDK9 onNext, onComplete and onError can definitely 
> be default methods, the reason why they aren't currently in the RS 
> spec is because we started before Java 8 shipped and keeping Java6-7 
> compatibility made sense.

Java6 compatibility make sense for the reactive-streams API not for the 
j.u.c.Flow API.

>       Re-using the example of Doug,
>       long requestSize = ...
>       Observer<T> observer = consumer::accept;   // auto-conversion
>     from a Consumer
>       publisher.subscribe(subscription -> {
>           long half  = requestSize / 2; // re-request when half consumed
>           subscription.request(half);
>           return new Observer<T>() {
>               private long count = half;
>               public void onNext(T item) {
>                   if (--count <= 0) {
>                       subscription.request(count = half);
>                   }
>                   observer.onNext(item);
>               }
>               public void onComplete(Throwable ex) {
>                 observer.onComplete(ex);
>               }
>           };
>       });
>       Note that in term of implementation you can still have an
>     implementation that implements both
>       Subscriber and Observer with onSubscribe() returning this.
>     and I'm still not convince that this should be integrated in JDK.
> I hope to be able to address those concerns :)
> Anything that I missed to reply to?

Why the reactive-streams API has to be integrated in JDK9,
the only compelling reason I see is the one given by Doug, people will 
try to use Stream for that otherwise.

In my opinion, integrated the reactive-streams API is a kind of 
loose-loose deal,
from the reactive-streams API, it will be set in stone so no v2 that 
overcome the shortcoming of the v1,
from the JDK perspective, having to integrated a sub-optimal API because 
the API has to be Java 6 compatible,
knowing that we will have to maintain forever an API that has changed 
several times in the past few years.


>     Rémi
>     On 01/16/2015 01:59 PM, Viktor Klang wrote:
>>     Hi Rémi,
>>     I'll try to address your concerns below:
>>     On Fri, Jan 16, 2015 at 1:48 AM, Remi Forax <forax at univ-mlv.fr
>>     <mailto:forax at univ-mlv.fr>> wrote:
>>         On 01/16/2015 12:50 AM, Doug Lea wrote:
>>             On 01/15/2015 05:59 PM, Remi Forax wrote:
>>                 I think it's too soon to try to include an API like
>>                 this in JDK9.
>>                 Currently, everybody seems to think it can come with
>>                 a better API for a reactive
>>                 framework.
>>             Where a lot of those everybodies were involved in formulating
>>             these APIs. See the list at http://www.reactive-streams.org/
>>         yes, I know, having a lot of people hammering the API is
>>         great thing,
>>         but why do you want to include that in the JDK, 
>>         it's better for the API to stay out of the JDK because the
>>         release cycle is faster, 
>>         it's better for the JDK because the API is still moving.
>>     The RS interfaces and method signatures has not seen an update in
>>     a long time and is considered done (RS is in 1.0.0.RC1 with an
>>     RC2 shipped soon with spec clarifications and improvements to the
>>     TCK and 1.0.0.final is expected shortly thereafter).
>>                 I think it's better to wait and see.
>>             We waited. We saw :-)
>>         I heard about Rx in 2009, and since that I have observed two
>>         things,
>>         the use cases have changed, the APIs have changed, even the
>>         manifesto has changed recently.
>>         This is not a bad thing, it means the community around the
>>         reactive stuff is vibrant so
>>         why do you want to stop that rush of cool ideas.
>>     RS is specifically for interop—a common vocabulary and semantics
>>     to build asynchronous, non-blocking streaming transformation with
>>     non-blocking back pressure. So from an API perspective it is
>>     targeted towards library developers, just as most of the things
>>     in java.util.concurrent. End user APIs are provided on top,
>>     through implementations like SubmissionPublisher or RxJava,
>>     Project Reactor, Akka Streams, Ratpack etc.
>>         And, if you think you want to freeze the API, I think a JSR
>>         is a better vehicle.
>>     Given the the target and scope of RS, it seems to me that it fits
>>     nicely within the scope of JSR166?
>>     There is a spec, 4 pure interfaces with a sum total of 7
>>     methods and a TCK to validate implementations:
>>     https://github.com/reactive-streams/reactive-streams
>>             -Doug
>>         Rémi
>>                 Rémi
>>                 On 01/15/2015 06:25 PM, Doug Lea wrote:
>>                     Here's the only set of candidates for new jdk9
>>                     j.u.c classes:
>>                     As discussed a few months ago, there is no single
>>                     best fluent
>>                     async/parallel API.
>>                     CompletableFuture/CompletionStage best supports
>>                     continuation-style programming on futures, and
>>                     java.util.stream best
>>                     supports (multi-stage, possibly-parallel) "pull"
>>                     style operations on
>>                     the elements of collections. Until now, one
>>                     missing category was
>>                     "push" style operations on items as they become
>>                     available from an
>>                     active source. We are not alone in wanting a
>>                     standard way to support
>>                     this. Over the past year, the "reactive-streams"
>>                     (http://www.reactive-streams.org/) effort has
>>                     been defining a minimal
>>                     set of interfaces expressing commonalities and
>>                     allowing
>>                     interoperablility across frameworks (including Rx
>>                     and Akka Play), that
>>                     is nearing release. These interfaces include
>>                     provisions for a simple
>>                     form of async flow control allowing developers to
>>                     address resource
>>                     control issues that can otherwise cause problems
>>                     in push-based
>>                     systems. Supporting this mini-framework helps
>>                     avoid unpleasant
>>                     surprises possible when trying to use pull-style
>>                     APIs for "hot"
>>                     reactive sources (but conversely is not as good a
>>                     choice as
>>                     java.util.Stream for "cold" sources like
>>                     collections).
>>                     The four intertwined interfaces (Publisher,
>>                     Subscriber, Subscription,
>>                     Processor) are defined within the same class
>>                     "Flow", that also
>>                     includes the first of some planned support
>>                     methods to establish and
>>                     use Flow components, including tie-ins to
>>                     java.util.streams and
>>                     CompletableFutures. See
>>                     http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/Flow.html
>>                     (Logistically, the only alternative to class Flow
>>                     would have been
>>                     to introduce a subpackage, which unnecessarily
>>                     complicates usage. And
>>                     "import static java.util.concurrent.Flow;" is
>>                     about as convenient as
>>                     "import java.util.concurrent.flow.*;" would be.)
>>                     Also, the new stand-alone class
>>                     SubmissionPublisher can serve as a
>>                     bridge from various kinds of item producers to
>>                     Flow components, and is
>>                     useful in its own right. It is a form of ordered
>>                     multicaster that
>>                     people have complained that we don't support in
>>                     j.u.c, but now do.
>>                     See
>>                     http://gee.cs.oswego.edu/dl/jsr166/dist/docs/java/util/concurrent/SubmissionPublisher.html
>>                     Disclaimers: These are only candidates for
>>                     inclusion.  The are in
>>                     preliminary form and will change. But comments
>>                     and suggestions would
>>                     be welcome. As with the other candidate
>>                     additions, if you are brave,
>>                     you can try out snapshots on jdk8+ by getting
>>                     http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166.jar
>>                     and running java -Xbootclasspath/p:jsr166.jar
>>                     -Doug
>>                     _______________________________________________
>>                     Concurrency-interest mailing list
>>                     Concurrency-interest at cs.oswego.edu
>>                     <mailto:Concurrency-interest at cs.oswego.edu>
>>                     http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>                 _______________________________________________
>>                 Concurrency-interest mailing list
>>                 Concurrency-interest at cs.oswego.edu
>>                 <mailto:Concurrency-interest at cs.oswego.edu>
>>                 http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>             _______________________________________________
>>             Concurrency-interest mailing list
>>             Concurrency-interest at cs.oswego.edu
>>             <mailto:Concurrency-interest at cs.oswego.edu>
>>             http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>         _______________________________________________
>>         Concurrency-interest mailing list
>>         Concurrency-interest at cs.oswego.edu
>>         <mailto:Concurrency-interest at cs.oswego.edu>
>>         http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>     -- 
>>     Cheers,
> -- 
> Cheers,
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20150117/ce6cc1cf/attachment-0001.html>

More information about the Concurrency-interest mailing list