[concurrency-interest] A race in SubmissionPublisher?

Dávid Karnok akarnokd at gmail.com
Tue Sep 26 08:29:35 EDT 2017


Viktor, can you be more specific? It just looks uncommon to me but
otherwise legitimate. The fact that there is no cancel() call when all
requested data has arrived is irrelevant from the perspective of the
underlying bug in SubmissionPublisher.

Here is a variant where the Subscriber is just consuming items and counting
how many items have been received. The asynchronous sender reads this count
to know when to send the next value to expose the internal race.

public class SpPublishAlt {
    @Test
    public void test() throws Exception {
        SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();

        ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            SpConsumer c = new SpConsumer();
            sp.subscribe(c);

            exec.submit(() -> {
                while (c.upstream == null) ;

                int i = 0;
                sp.offer(i, (a, b) -> { throw new RuntimeException(); });

                while (i < SpConsumer.N) {
                    while (c.getAcquire() == i);
                    i++;
                    sp.offer(i, (a, b) -> { throw new RuntimeException(); });
                }
            });

            if (!c.cdl.await(10, TimeUnit.SECONDS)) {
                throw new AssertionError("Timed out " + c.getAcquire());
            }
        } finally {
            exec.shutdownNow();
        }
    }

    static final class SpConsumer extends AtomicInteger implements
Flow.Subscriber<Object> {

        static final int N = 1 << 20;

        final CountDownLatch cdl = new CountDownLatch(1);

        volatile Flow.Subscription upstream;

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            upstream = subscription;
            subscription.request(N);
        }

        @Override
        public void onNext(Object item) {
            System.out.println(item);
            int i = getPlain() + 1;
            setRelease(i);
            if (i == N) {
                upstream.cancel();
                cdl.countDown();
            }
        }

        @Override
        public void onError(Throwable throwable) {
            throwable.printStackTrace();
            cdl.countDown();
        }

        @Override
        public void onComplete() {
            cdl.countDown();
        }
    }
}


2017-09-26 13:58 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:

> Hi Pavel,
>
> I trust that the code you submitted is for test case reproduction
> primarily, but there are a few spec violations in your Subscriber
> implementation that you'll want to address.
>
> We're currently in the process of releasing a bridge jar between
> org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS
> TCK to have them verified.
>
> On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <pavel.rappo at gmail.com>
> wrote:
>
>> Hi,
>>
>> I've been using SubmissionPublisher in my own publisher implementation in
>> order
>> to reuse its complex state machine that serializes invocations to
>> subscribers.
>>
>> While testing my implementation I ran into what I believe might be a race
>> condition in SubmissionPublisher.
>>
>> Consider the following example:
>>
>>     public class SubmissionPublisherTest {
>>
>>         private final static int N = 1 << 20;
>>
>>         private final AtomicInteger numbers = new AtomicInteger();
>>         private final SubmissionPublisher<Integer> pub = new
>> SubmissionPublisher<>();
>>         private final ExecutorService pubExecutor =
>> Executors.newSingleThreadExecutor();
>>         private final CountDownLatch finished = new CountDownLatch(1);
>>
>>         public static void main(String[] args) throws
>> InterruptedException {
>>             new SubmissionPublisherTest().run();
>>         }
>>
>>         private void run() throws InterruptedException {
>>             pub.subscribe(newSubscriber());
>>             try {
>>                 finished.await(30, TimeUnit.SECONDS);
>>             } finally {
>>                 pubExecutor.shutdownNow();
>>             }
>>             System.out.println("Finished");
>>         }
>>
>>         private Flow.Subscriber<Integer> newSubscriber() {
>>             return new Flow.Subscriber<>() {
>>
>>                 Flow.Subscription sub;
>>                 int received;
>>
>>                 @Override
>>                 public void onSubscribe(Flow.Subscription s) {
>>                     (this.sub = s).request(N);
>>                     publish();
>>                 }
>>
>>                 @Override
>>                 public void onNext(Integer item) {
>>                     if (++received == N) finished.countDown();
>>                     publish();
>>                     System.out.println(item);
>>                 }
>>
>>                 @Override public void onError(Throwable t) { }
>>                 @Override public void onComplete() { }
>>             };
>>         }
>>
>>         private void publish() {
>>             int number = numbers.incrementAndGet();
>>             BiPredicate<Flow.Subscriber<? super Integer>, Integer>
>>                     onDropReportError = (s, i) -> { throw new
>> InternalError(); };
>>             pubExecutor.execute(() -> pub.offer(number,
>> onDropReportError));
>>     //        pub.offer(number, onDropReportError);
>>         }
>>     }
>>
>> What happens here?
>>
>> Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers
>> from
>> SubmissionPublisher. Subscriber.onNext prints the integer it has received
>> and
>> offers a single integer to the publisher. run() waits until all N
>> integers have
>> been received and then returns.
>>
>> onSubscribe offers an initial integer to the SubmissionPublisher. This
>> kick-starts
>> the "feedback loop".
>>
>> If you run this snippet, chances are good you will not see all 1048576
>> integers
>> printed out to the console before the program terminates. Instead, the
>> output
>> will stop at some number m < 1048576 for no apparent reason.
>>
>> However, if you comment out the line:
>>
>>     pubExecutor.execute(() -> pub.offer(number, onDropReportError));
>>
>> and uncomment the previously commented line, everything will work as
>> expected.
>> The difference is that in this case offers of integers happen
>> synchronously
>> rather than from a separate thread.
>>
>> P.S. I have debugged a little bit and my guess (I'm not a concurrency
>> expert)
>> is that accesses to SubmissionPublisher.BufferedSubscription.tail field
>> are
>> undersynchronized. In particular, it might be the case that updates to
>> this
>> field are not always seen by the consumer task in
>> BufferedSubscription.checkEmpty.
>>
>> Is there any chance `tail` field is simply missing volatile modifier?
>>
>> Thanks,
>> -Pavel
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>
>
>
> --
> Cheers,
>>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>


-- 
Best regards,
David Karnok
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170926/90af0e38/attachment-0001.html>


More information about the Concurrency-interest mailing list