[concurrency-interest] A race in SubmissionPublisher?

Pavel Rappo pavel.rappo at gmail.com
Mon Sep 25 09:21:01 EDT 2017


Hi,

I've been using SubmissionPublisher in my own publisher implementation in order
to reuse its complex state machine that serializes invocations to subscribers.

While testing my implementation I ran into what I believe might be a race
condition in SubmissionPublisher.

Consider the following example:

    public class SubmissionPublisherTest {

        private final static int N = 1 << 20;

        private final AtomicInteger numbers = new AtomicInteger();
        private final SubmissionPublisher<Integer> pub = new
SubmissionPublisher<>();
        private final ExecutorService pubExecutor =
Executors.newSingleThreadExecutor();
        private final CountDownLatch finished = new CountDownLatch(1);

        public static void main(String[] args) throws InterruptedException {
            new SubmissionPublisherTest().run();
        }

        private void run() throws InterruptedException {
            pub.subscribe(newSubscriber());
            try {
                finished.await(30, TimeUnit.SECONDS);
            } finally {
                pubExecutor.shutdownNow();
            }
            System.out.println("Finished");
        }

        private Flow.Subscriber<Integer> newSubscriber() {
            return new Flow.Subscriber<>() {

                Flow.Subscription sub;
                int received;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    (this.sub = s).request(N);
                    publish();
                }

                @Override
                public void onNext(Integer item) {
                    if (++received == N) finished.countDown();
                    publish();
                    System.out.println(item);
                }

                @Override public void onError(Throwable t) { }
                @Override public void onComplete() { }
            };
        }

        private void publish() {
            int number = numbers.incrementAndGet();
            BiPredicate<Flow.Subscriber<? super Integer>, Integer>
                    onDropReportError = (s, i) -> { throw new
InternalError(); };
            pubExecutor.execute(() -> pub.offer(number, onDropReportError));
    //        pub.offer(number, onDropReportError);
        }
    }

What happens here?

Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers from
SubmissionPublisher. Subscriber.onNext prints the integer it has received and
offers a single integer to the publisher. run() waits until all N integers have
been received and then returns.

onSubscribe offers an initial integer to the SubmissionPublisher. This
kick-starts
the "feedback loop".

If you run this snippet, chances are good you will not see all 1048576 integers
printed out to the console before the program terminates. Instead, the output
will stop at some number m < 1048576 for no apparent reason.

However, if you comment out the line:

    pubExecutor.execute(() -> pub.offer(number, onDropReportError));

and uncomment the previously commented line, everything will work as expected.
The difference is that in this case offers of integers happen synchronously
rather than from a separate thread.

P.S. I have debugged a little bit and my guess (I'm not a concurrency expert)
is that accesses to SubmissionPublisher.BufferedSubscription.tail field are
undersynchronized. In particular, it might be the case that updates to this
field are not always seen by the consumer task in
BufferedSubscription.checkEmpty.

Is there any chance `tail` field is simply missing volatile modifier?

Thanks,
-Pavel


More information about the Concurrency-interest mailing list