[concurrency-interest] 9b147: SubmissionPublisher.closeExceptionally doesn't call onSubscribe() in time

Dávid Karnok akarnokd at gmail.com
Mon Dec 5 04:03:43 EST 2016


I'm writing an interop library between RxJava 2 and JDK 9 Flow and run into
an odd situation.

The following test passes:

    SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

    TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
    .test();

    sp.submit(1);
    sp.submit(2);
    sp.submit(3);
    sp.submit(4);
    sp.submit(5);

    sp.close();

    ts.awaitDone(5, TimeUnit.SECONDS)
        .assertResult(1, 2, 3, 4, 5);



but when I call closeExceptionally() instead:

    SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

    TestSubscriber<Integer> ts = FlowInterop.fromFlowPublisher(sp)
        .test();

    sp.submit(1);
    sp.submit(2);
    sp.submit(3);
    sp.submit(4);
    sp.submit(5);

    sp.closeExceptionally(new IOException());


    ts.awaitDone(5, TimeUnit.SECONDS)
      .assertFailure(IOException.class, 1, 2, 3, 4, 5);


The test fails because there was no call to Flow.Subscriber.onSubscribe in
this case before calling Flow.Subscriber.onError with the IOException:

Caused by: java.lang.NullPointerException: onSubscribe not called in proper
order
at io.reactivex.subscribers.TestSubscriber.onError(TestSubscriber.java:221)
at
hu.akarnokd.rxjava2.interop.FlowableFromFlowPublisher$FromFlowPublisherSubscriber.onError(FlowableFromFlowPublisher.java:62)
*at
java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.checkControl(SubmissionPublisher.java:1463)*
* at
java.base/java.util.concurrent.SubmissionPublisher$BufferedSubscription.consume(SubmissionPublisher.java:1421)*
at
java.base/java.util.concurrent.SubmissionPublisher$ConsumerTask.exec(SubmissionPublisher.java:919)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1575)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:158)

Sleeping before the call to closeExceptionally() seems to make it work.

The relevant converter looks like this:

final class FlowableFromFlowPublisher<T> extends Flowable<T> {

    final Flow.Publisher<T> source;

    FlowableFromFlowPublisher(Flow.Publisher<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(org.reactivestreams.Subscriber<?
super T> s) {
        source.subscribe(new FromFlowPublisherSubscriber<>(s));
    }

    static final class FromFlowPublisherSubscriber<T> implements
Flow.Subscriber<T>, org.reactivestreams.Subscription {

        final org.reactivestreams.Subscriber<? super T> actual;

        Flow.Subscription s;

        FromFlowPublisherSubscriber(org.reactivestreams.Subscriber<?
super T> actual) {
            this.actual = actual;
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.s = subscription;
            actual.onSubscribe(this);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable throwable) {
            actual.onError(throwable);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void request(long n) {
            s.request(n);
        }

        @Override
        public void cancel() {
            s.cancel();
        }
    }
}

It looks like if there is a closeExceptionally() call, the protocol is not
held and onError is called without calling onSubscribe.

(Repository: https://github.com/akarnokd/RxJava2Jdk9Interop )
-- 
Best regards,
David Karnok
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20161205/24ce0b9e/attachment.html>


More information about the Concurrency-interest mailing list