[concurrency-interest] A race in SubmissionPublisher?

Viktor Klang viktor.klang at gmail.com
Tue Sep 26 07:58:31 EDT 2017


Hi Pavel,

I trust that the code you submitted is for test case reproduction
primarily, but there are a few spec violations in your Subscriber
implementation that you'll want to address.

We're currently in the process of releasing a bridge jar between
org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS
TCK to have them verified.

On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <pavel.rappo at gmail.com> wrote:

> Hi,
>
> I've been using SubmissionPublisher in my own publisher implementation in
> order
> to reuse its complex state machine that serializes invocations to
> subscribers.
>
> While testing my implementation I ran into what I believe might be a race
> condition in SubmissionPublisher.
>
> Consider the following example:
>
>     public class SubmissionPublisherTest {
>
>         private final static int N = 1 << 20;
>
>         private final AtomicInteger numbers = new AtomicInteger();
>         private final SubmissionPublisher<Integer> pub = new
> SubmissionPublisher<>();
>         private final ExecutorService pubExecutor =
> Executors.newSingleThreadExecutor();
>         private final CountDownLatch finished = new CountDownLatch(1);
>
>         public static void main(String[] args) throws InterruptedException
> {
>             new SubmissionPublisherTest().run();
>         }
>
>         private void run() throws InterruptedException {
>             pub.subscribe(newSubscriber());
>             try {
>                 finished.await(30, TimeUnit.SECONDS);
>             } finally {
>                 pubExecutor.shutdownNow();
>             }
>             System.out.println("Finished");
>         }
>
>         private Flow.Subscriber<Integer> newSubscriber() {
>             return new Flow.Subscriber<>() {
>
>                 Flow.Subscription sub;
>                 int received;
>
>                 @Override
>                 public void onSubscribe(Flow.Subscription s) {
>                     (this.sub = s).request(N);
>                     publish();
>                 }
>
>                 @Override
>                 public void onNext(Integer item) {
>                     if (++received == N) finished.countDown();
>                     publish();
>                     System.out.println(item);
>                 }
>
>                 @Override public void onError(Throwable t) { }
>                 @Override public void onComplete() { }
>             };
>         }
>
>         private void publish() {
>             int number = numbers.incrementAndGet();
>             BiPredicate<Flow.Subscriber<? super Integer>, Integer>
>                     onDropReportError = (s, i) -> { throw new
> InternalError(); };
>             pubExecutor.execute(() -> pub.offer(number,
> onDropReportError));
>     //        pub.offer(number, onDropReportError);
>         }
>     }
>
> What happens here?
>
> Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers
> from
> SubmissionPublisher. Subscriber.onNext prints the integer it has received
> and
> offers a single integer to the publisher. run() waits until all N integers
> have
> been received and then returns.
>
> onSubscribe offers an initial integer to the SubmissionPublisher. This
> kick-starts
> the "feedback loop".
>
> If you run this snippet, chances are good you will not see all 1048576
> integers
> printed out to the console before the program terminates. Instead, the
> output
> will stop at some number m < 1048576 for no apparent reason.
>
> However, if you comment out the line:
>
>     pubExecutor.execute(() -> pub.offer(number, onDropReportError));
>
> and uncomment the previously commented line, everything will work as
> expected.
> The difference is that in this case offers of integers happen synchronously
> rather than from a separate thread.
>
> P.S. I have debugged a little bit and my guess (I'm not a concurrency
> expert)
> is that accesses to SubmissionPublisher.BufferedSubscription.tail field
> are
> undersynchronized. In particular, it might be the case that updates to this
> field are not always seen by the consumer task in
> BufferedSubscription.checkEmpty.
>
> Is there any chance `tail` field is simply missing volatile modifier?
>
> Thanks,
> -Pavel
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>



-- 
Cheers,
√
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170926/6fd662e8/attachment.html>


More information about the Concurrency-interest mailing list