[concurrency-interest] Upcoming jdk9 j.u.c JEP

Roland Kuhn rk at rkuhn.info
Fri Jul 24 01:55:32 EDT 2015

Hi Greg,

the reasoning behind the asymmetric RS design is that this communication primitive targets unidirectional communication, bidirectional conversations would utilize two such streams running in opposite directions. This means that for a single stream data elements (of which onError is but a special one) flow downstream and only demand flows upstream. Publishers only need to know about when and if to produce the next element(s), hence we didn’t see a use-case for propagating more information than “N elements needed” and “no more elements needed”. 

If a single Reactive Stream could transport data upstream then we would need to implement back-pressure on that back channel as well, leading to the same complexity as having two RS running in opposite directions. Another reason why we made this separation lies in not burdening the API designers of conforming implementations with an impossible task: the combinators offered on stream transformation APIs flow with the (English) language from left to right and describe sequences of transformation stages but with data flowing upstream there would be the need for also describing how to handle that—even if it is “only” an error channel—and since these data flow in the opposite direction there would be no natural way to write this down.

Learning about the reason behind cancellation seems geared towards recovery in the sense that the Publisher would then construct and attach a different Subscriber afterwards—please let me know if you have something else in mind—and if you want to do that then the Subscriber will in any case be under the Publisher’s control and can use a different channel to communicate the onError signal back to the data source. Since that channel would transport data it would be a separate one flowing in the opposite direction as mentioned above, at least conceptually; with a single element like you describe it could well be a simpler callback mechanism and might not need full back-pressure.

I hope this clarifies some of the background behind the RS design. Please share more of your intended use of an error back-channel so that we can understand what exactly the upstream components would do with that data in the example case you mention.



> 24 jul 2015 kl. 00:35 skrev Greg Wilkins <gregw at webtide.com>:
> On 24 July 2015 at 00:23, Doug Lea <dl at cs.oswego.edu <mailto:dl at cs.oswego.edu>> wrote:
> * Reactive-stream users may be disappointed that we do not include any
> net/IO-based Flow.Publisher/Subscriber classes, considering that
> reactive-streams are mainly motivated by net-based frameworks.  The
> reasons for triaging these out are that (1) IO generally falls outside
> of java.util.concurrent (2) Most net-based frameworks seem to use
> custom data representation etc (e.g., JSON) that are even further out
> of scope.  However class SubmissionPublisher can be used as an adaptor
> to turn just about any kind of source into a Publisher, so provides a
> nearly universal way of constructing a good non-custom Publisher even
> from IO-based sources.  (Also notice that SubmissionPublisher can
> serve as the basis of other actor-like frameworks, including those
> turning off back-pressure by calling
> subscription.request(Long.MAX_VALUE) in onSubscribe).
> Doug et al,
> The Jetty project has been experimenting with the reactive streams API: https://github.com/jetty-project/jetty-reactive <https://github.com/jetty-project/jetty-reactive> albiet not with the JDK-9 version of it, but inspired by the proposed inclusion of it.
> We very much like the API and what it can bring to our space.  We don't see that it needs direct IO support and that it's power is actually bridging domains with a good asynchronous model that supports flow control.   
> We've also begun some preliminary discussions about developing RS based proposal for the Servlet 4.0 specification.    Currently the Servlet API does well support asynchronous IO and behaviour, but the API is deceptively difficult to use correctly and gives no support for back pressure.   With RS's we can envisage solutions that look like:
> A database provides a RS Producer that provides the large results of a query asynchronously from a remote database server
> Some business logic is encapsulated as a RS Processor subscribed to the database producer
> Some framework provided  Porocessors subscribe to the business logic Processor to perform a chain of functions such as serialization, compression
> A container provided Subscriber terminates the chain and sends the resulting byte out over HTTP/HTTP2 or Websocket.   The flow control mechanisms of these protocols would be the basis of the RS back pressure.  
> In such solutions, a full HTTP/2 flow control window would result in back pressure on the remote database server, allowing threadless waiting without unlimited queuing of data.
> However, we have a significant concern with the API in that we do not like it's error handling design.  Specifically that it is asymmetric and an error in the middle of a chain of processors can be propagated downstream with onError(Throwable) but can only be propagated upstream with cancel().
> We believe that cancel without reason is an insufficient semantic to build a robust ecosystem of RS Processors that can be used to build applications.   Consider the above example, it would be ideal if the object serialization was handled by a 3rd party Processor (let's say JSONEncodingProcessor). If the business logic erroneously sent an non-jsonable object, or if the JSON converter was incorrectly configured then the JSONEcondiingProcessor could encounter an error during its onNext(Object item) handling and it's only permitted handling of that is to cancel the stream, without explanation.
> I have raised this as an issue on the RS github and it the current recommendation is to log and cancel: https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544 <https://github.com/reactive-streams/reactive-streams-jvm/issues/271#issuecomment-121974544>   However I believe that log and cancel is a insufficient semantic.   Logging in assembled applications is often fraught as each component provider will fight over which logging framework is best.  RS chains may cross jurisdictional boundaries and logs may not even be readily available.
> The solution we see is to replace/augment cancel() with either cancel(Throwable reason) or an upstream onError(Throwable reason).  I acknowledge that the passed reason may not always be meaningful to the upstream processors and publishers, but it is better to ignore a meaningless reason than to be ignorant of a meaningful one.
> When considering this API, we have to look beyond usages that work well and consider usages that will fail well also!
> cheers
> -- 
> Greg Wilkins <gregw at webtide.com <mailto:gregw at webtide.com>>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest

I'm a physicist: I have a basic working knowledge of the universe and everything it contains!
    - Sheldon Cooper (The Big Bang Theory)

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20150724/faba46b5/attachment.html>

More information about the Concurrency-interest mailing list