[concurrency-interest] 9b147: SubmissionPublisher.closeExceptionally doesn't call onSubscribe() in time
Doug Lea
dl at cs.oswego.edu
Mon Dec 5 19:29:23 EST 2016
Thanks! I agree that this it was inconsistent to give onError
precedence over onSubscribe. It is now changed to work in the
same way across close and closeExceptionally. (It will take a
while to get this into jdk9 builds though.)
-Doug
On 12/05/2016 04:03 AM, Dávid Karnok wrote:
> I'm writing an interop library between RxJava 2 and JDK 9 Flow and run
> into an odd situation.
>
> The following test passes:
>
> SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
>
> TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
> .test();
>
> sp.submit(1);
> sp.submit(2);
> sp.submit(3);
> sp.submit(4);
> sp.submit(5);
>
> sp.close();
>
> ts.awaitDone(5, TimeUnit.SECONDS)
> .assertResult(1, 2, 3, 4, 5);
>
>
>
> but when I call closeExceptionally() instead:
>
> SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
>
> TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
> .test();
>
> sp.submit(1);
> sp.submit(2);
> sp.submit(3);
> sp.submit(4);
> sp.submit(5);
>
> sp.closeExceptionally(new IOException());
>
>
> ts.awaitDone(5, TimeUnit.SECONDS)
> .assertFailure(IOException.class, 1, 2, 3, 4, 5);
>
>
> The test fails because there was no call to Flow.Subscriber.onSubscribe
> in this case before calling Flow.Subscriber.onError with the IOException:
>
> Caused by: java.lang.NullPointerException: onSubscribe not called in
> proper order
> at io.reactivex.subscribers.TestSubscriber.onError(TestSubscriber.java:221)
> at
> hu.akarnokd.rxjava2.interop.FlowableFromFlowPublisher$FromFlowPublisherSubscriber.onError(FlowableFromFlowPublisher.java:62)
> *at
> java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.checkControl(SubmissionPublisher.java:1463)*
> *at
> java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.consume(SubmissionPublisher.java:1421)*
> at
> java.base/java.util.concurrent.SubmissionPublisher$ConsumerTask.exec(SubmissionPublisher.java:919)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1575)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:158)
>
> Sleeping before the call to closeExceptionally() seems to make it work.
>
> The relevant converter looks like this:
>
> final class FlowableFromFlowPublisher<T> extends Flowable<T> {
>
> final Flow.Publisher<T> source;
>
> FlowableFromFlowPublisher(Flow.Publisher<T> source) {
> this.source = source;
> }
>
> @Override
> protected void subscribeActual(org.reactivestreams.Subscriber<? super T> s) {
> source.subscribe(new FromFlowPublisherSubscriber<>(s));
> }
>
> static final class FromFlowPublisherSubscriber<T> implements Flow.Subscriber<T>, org.reactivestreams.Subscription {
>
> final org.reactivestreams.Subscriber<? super T> actual;
>
> Flow.Subscription s;
>
> FromFlowPublisherSubscriber(org.reactivestreams.Subscriber<? super T> actual) {
> this.actual = actual;
> }
>
> @Override
> public void onSubscribe(Flow.Subscription subscription) {
> this.s = subscription;
> actual.onSubscribe(this);
> }
>
> @Override
> public void onNext(T t) {
> actual.onNext(t);
> }
>
> @Override
> public void onError(Throwable throwable) {
> actual.onError(throwable);
> }
>
> @Override
> public void onComplete() {
> actual.onComplete();
> }
>
> @Override
> public void request(long n) {
> s.request(n);
> }
>
> @Override
> public void cancel() {
> s.cancel();
> }
> }
> }
>
> It looks like if there is a closeExceptionally() call, the protocol is
> not held and onError is called without calling onSubscribe.
>
> (Repository: https://github.com/akarnokd/RxJava2Jdk9Interop )
> --
> Best regards,
> David Karnok
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
More information about the Concurrency-interest
mailing list