[concurrency-interest] A race in SubmissionPublisher?

Viktor Klang viktor.klang at gmail.com
Tue Sep 26 08:36:52 EDT 2017


On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <akarnokd at gmail.com> wrote:

> Viktor, can you be more specific? It just looks uncommon to me but
> otherwise legitimate. The fact that there is no cancel() call when all
> requested data has arrived is irrelevant from the perspective of the
> underlying bug in SubmissionPublisher.
>

Absolutely, my comment was not about the SubmissionPublisher bug. I was
unsure whether the provided code was for reproduction only, or was written
like that in general, in the latter case it needs a bit of adjustment to
conform to spec.


>
> Here is a variant where the Subscriber is just consuming items and
> counting how many items have been received. The asynchronous sender reads
> this count to know when to send the next value to expose the internal race.
>
> public class SpPublishAlt {
>     @Test
>     public void test() throws Exception {
>         SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
>
>         ExecutorService exec = Executors.newSingleThreadExecutor();
>         try {
>             SpConsumer c = new SpConsumer();
>             sp.subscribe(c);
>
>             exec.submit(() -> {
>                 while (c.upstream == null) ;
>
>                 int i = 0;
>                 sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>
>                 while (i < SpConsumer.N) {
>                     while (c.getAcquire() == i);
>                     i++;
>                     sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>                 }
>             });
>
>             if (!c.cdl.await(10, TimeUnit.SECONDS)) {
>                 throw new AssertionError("Timed out " + c.getAcquire());
>             }
>         } finally {
>             exec.shutdownNow();
>         }
>     }
>
>     static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {
>
>         static final int N = 1 << 20;
>
>         final CountDownLatch cdl = new CountDownLatch(1);
>
>         volatile Flow.Subscription upstream;
>
>         @Override
>         public void onSubscribe(Flow.Subscription subscription) {
>             upstream = subscription;
>             subscription.request(N);
>         }
>
>         @Override
>         public void onNext(Object item) {
>             System.out.println(item);
>             int i = getPlain() + 1;
>             setRelease(i);
>             if (i == N) {
>                 upstream.cancel();
>                 cdl.countDown();
>             }
>         }
>
>         @Override
>         public void onError(Throwable throwable) {
>             throwable.printStackTrace();
>             cdl.countDown();
>         }
>
>         @Override
>         public void onComplete() {
>             cdl.countDown();
>         }
>     }
> }
>
>
> 2017-09-26 13:58 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:
>
>> Hi Pavel,
>>
>> I trust that the code you submitted is for test case reproduction
>> primarily, but there are a few spec violations in your Subscriber
>> implementation that you'll want to address.
>>
>> We're currently in the process of releasing a bridge jar between
>> org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS
>> TCK to have them verified.
>>
>> On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <pavel.rappo at gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I've been using SubmissionPublisher in my own publisher implementation
>>> in order
>>> to reuse its complex state machine that serializes invocations to
>>> subscribers.
>>>
>>> While testing my implementation I ran into what I believe might be a race
>>> condition in SubmissionPublisher.
>>>
>>> Consider the following example:
>>>
>>>     public class SubmissionPublisherTest {
>>>
>>>         private final static int N = 1 << 20;
>>>
>>>         private final AtomicInteger numbers = new AtomicInteger();
>>>         private final SubmissionPublisher<Integer> pub = new
>>> SubmissionPublisher<>();
>>>         private final ExecutorService pubExecutor =
>>> Executors.newSingleThreadExecutor();
>>>         private final CountDownLatch finished = new CountDownLatch(1);
>>>
>>>         public static void main(String[] args) throws
>>> InterruptedException {
>>>             new SubmissionPublisherTest().run();
>>>         }
>>>
>>>         private void run() throws InterruptedException {
>>>             pub.subscribe(newSubscriber());
>>>             try {
>>>                 finished.await(30, TimeUnit.SECONDS);
>>>             } finally {
>>>                 pubExecutor.shutdownNow();
>>>             }
>>>             System.out.println("Finished");
>>>         }
>>>
>>>         private Flow.Subscriber<Integer> newSubscriber() {
>>>             return new Flow.Subscriber<>() {
>>>
>>>                 Flow.Subscription sub;
>>>                 int received;
>>>
>>>                 @Override
>>>                 public void onSubscribe(Flow.Subscription s) {
>>>                     (this.sub = s).request(N);
>>>                     publish();
>>>                 }
>>>
>>>                 @Override
>>>                 public void onNext(Integer item) {
>>>                     if (++received == N) finished.countDown();
>>>                     publish();
>>>                     System.out.println(item);
>>>                 }
>>>
>>>                 @Override public void onError(Throwable t) { }
>>>                 @Override public void onComplete() { }
>>>             };
>>>         }
>>>
>>>         private void publish() {
>>>             int number = numbers.incrementAndGet();
>>>             BiPredicate<Flow.Subscriber<? super Integer>, Integer>
>>>                     onDropReportError = (s, i) -> { throw new
>>> InternalError(); };
>>>             pubExecutor.execute(() -> pub.offer(number,
>>> onDropReportError));
>>>     //        pub.offer(number, onDropReportError);
>>>         }
>>>     }
>>>
>>> What happens here?
>>>
>>> Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers
>>> from
>>> SubmissionPublisher. Subscriber.onNext prints the integer it has
>>> received and
>>> offers a single integer to the publisher. run() waits until all N
>>> integers have
>>> been received and then returns.
>>>
>>> onSubscribe offers an initial integer to the SubmissionPublisher. This
>>> kick-starts
>>> the "feedback loop".
>>>
>>> If you run this snippet, chances are good you will not see all 1048576
>>> integers
>>> printed out to the console before the program terminates. Instead, the
>>> output
>>> will stop at some number m < 1048576 for no apparent reason.
>>>
>>> However, if you comment out the line:
>>>
>>>     pubExecutor.execute(() -> pub.offer(number, onDropReportError));
>>>
>>> and uncomment the previously commented line, everything will work as
>>> expected.
>>> The difference is that in this case offers of integers happen
>>> synchronously
>>> rather than from a separate thread.
>>>
>>> P.S. I have debugged a little bit and my guess (I'm not a concurrency
>>> expert)
>>> is that accesses to SubmissionPublisher.BufferedSubscription.tail field
>>> are
>>> undersynchronized. In particular, it might be the case that updates to
>>> this
>>> field are not always seen by the consumer task in
>>> BufferedSubscription.checkEmpty.
>>>
>>> Is there any chance `tail` field is simply missing volatile modifier?
>>>
>>> Thanks,
>>> -Pavel
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.oswego.edu
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>
>>
>>
>>
>> --
>> Cheers,
>>>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>>
>
>
> --
> Best regards,
> David Karnok
>



-- 
Cheers,
√
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170926/03bbb34c/attachment-0001.html>


More information about the Concurrency-interest mailing list