[concurrency-interest] A race in SubmissionPublisher?

Viktor Klang viktor.klang at gmail.com
Tue Sep 26 10:18:06 EDT 2017


Thanks for the link.

I think the most scalable approach to help interoperability is to
evangelize the use of the TCK whenever possible, helps people catch subtle
bugs quickly.


On Tue, Sep 26, 2017 at 4:13 PM, Dávid Karnok <akarnokd at gmail.com> wrote:

> Well, the spec is the spec. (Consequently, I guess you'll have much fun
> with this one: https://examples.javacodegeeks.com/core-java/
> java-9-reactive-streams-example/)
>
> 2017-09-26 15:42 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:
>
>>
>>
>> On Tue, Sep 26, 2017 at 2:49 PM, Dávid Karnok <akarnokd at gmail.com> wrote:
>>
>>> Addendum to the rules listed:
>>>
>>> §2.5: in practice, this should not happen with reasonable reactive
>>> libraries. Depending on how you trust the Publisher, omitting such checks
>>> is not a disaster.
>>>
>>
>> It's not a disaster, it's only non-compliance—it is a MUST rule, after
>> all.
>>
>>
>>> Plus, the JavaDoc's example is also hints about plain store of the
>>> Flow.Subscription:
>>>
>>> http://download.java.net/java/jdk9/docs/api/java/util/concur
>>> rent/SubmissionPublisher.html
>>>
>>> class TransformProcessor<S,T> extends SubmissionPublisher<T>
>>>    implements Flow.Processor<S,T> {
>>>    final Function<? super S, ? extends T> function;
>>>    Flow.Subscription subscription;
>>>    TransformProcessor(Executor executor, int maxBufferCapacity,
>>>                       Function<? super S, ? extends T> function) {
>>>      super(executor, maxBufferCapacity);
>>>      this.function = function;
>>>    }
>>>    public void onSubscribe(Flow.Subscription subscription) {
>>>
>>>
>> I've been meaning to send some JavaDoc contributions, I'll keep this one
>> in mind! :)
>>
>>
>>>
>>>      *(this.subscription = subscription).request(1);*
>>>    }
>>>    public void onNext(S item) {
>>>      subscription.request(1);
>>>      *submit(function.apply(item));*
>>>    }
>>>    public void onError(Throwable ex) { closeExceptionally(ex); }
>>>    public void onComplete() { close(); }
>>>  }
>>>
>>> §2.13: Yes, pubExecutor.execute() can throw a
>>> RejectedExecutionException in general, but often you can't do much about
>>> that case.
>>>
>>
>> Most definitely, but that exception should most definitely not infect the
>> issuer of the signal. The spec is unambiguous about this (for a reason!). :)
>>
>>
>>> Plus again, the example above uses submit() which can also throw:
>>> http://download.java.net/java/jdk9/docs/api/java/util/concur
>>> rent/SubmissionPublisher.html#submit-T-
>>>
>>> 2017-09-26 14:36 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:
>>>
>>>>
>>>>
>>>> On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <akarnokd at gmail.com>
>>>> wrote:
>>>>
>>>>> Viktor, can you be more specific? It just looks uncommon to me but
>>>>> otherwise legitimate. The fact that there is no cancel() call when all
>>>>> requested data has arrived is irrelevant from the perspective of the
>>>>> underlying bug in SubmissionPublisher.
>>>>>
>>>>
>>>> Absolutely, my comment was not about the SubmissionPublisher bug. I was
>>>> unsure whether the provided code was for reproduction only, or was written
>>>> like that in general, in the latter case it needs a bit of adjustment to
>>>> conform to spec.
>>>>
>>>>
>>>>>
>>>>> Here is a variant where the Subscriber is just consuming items and
>>>>> counting how many items have been received. The asynchronous sender reads
>>>>> this count to know when to send the next value to expose the internal race.
>>>>>
>>>>> public class SpPublishAlt {
>>>>>     @Test
>>>>>     public void test() throws Exception {
>>>>>         SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
>>>>>
>>>>>         ExecutorService exec = Executors.newSingleThreadExecutor();
>>>>>         try {
>>>>>             SpConsumer c = new SpConsumer();
>>>>>             sp.subscribe(c);
>>>>>
>>>>>             exec.submit(() -> {
>>>>>                 while (c.upstream == null) ;
>>>>>
>>>>>                 int i = 0;
>>>>>                 sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>>>>>
>>>>>                 while (i < SpConsumer.N) {
>>>>>                     while (c.getAcquire() == i);
>>>>>                     i++;
>>>>>                     sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>>>>>                 }
>>>>>             });
>>>>>
>>>>>             if (!c.cdl.await(10, TimeUnit.SECONDS)) {
>>>>>                 throw new AssertionError("Timed out " + c.getAcquire());
>>>>>             }
>>>>>         } finally {
>>>>>             exec.shutdownNow();
>>>>>         }
>>>>>     }
>>>>>
>>>>>     static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {
>>>>>
>>>>>         static final int N = 1 << 20;
>>>>>
>>>>>         final CountDownLatch cdl = new CountDownLatch(1);
>>>>>
>>>>>         volatile Flow.Subscription upstream;
>>>>>
>>>>>         @Override
>>>>>         public void onSubscribe(Flow.Subscription subscription) {
>>>>>             upstream = subscription;
>>>>>             subscription.request(N);
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public void onNext(Object item) {
>>>>>             System.out.println(item);
>>>>>             int i = getPlain() + 1;
>>>>>             setRelease(i);
>>>>>             if (i == N) {
>>>>>                 upstream.cancel();
>>>>>                 cdl.countDown();
>>>>>             }
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public void onError(Throwable throwable) {
>>>>>             throwable.printStackTrace();
>>>>>             cdl.countDown();
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public void onComplete() {
>>>>>             cdl.countDown();
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> 2017-09-26 13:58 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:
>>>>>
>>>>>> Hi Pavel,
>>>>>>
>>>>>> I trust that the code you submitted is for test case reproduction
>>>>>> primarily, but there are a few spec violations in your Subscriber
>>>>>> implementation that you'll want to address.
>>>>>>
>>>>>> We're currently in the process of releasing a bridge jar between
>>>>>> org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS
>>>>>> TCK to have them verified.
>>>>>>
>>>>>> On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <pavel.rappo at gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I've been using SubmissionPublisher in my own publisher
>>>>>>> implementation in order
>>>>>>> to reuse its complex state machine that serializes invocations to
>>>>>>> subscribers.
>>>>>>>
>>>>>>> While testing my implementation I ran into what I believe might be a
>>>>>>> race
>>>>>>> condition in SubmissionPublisher.
>>>>>>>
>>>>>>> Consider the following example:
>>>>>>>
>>>>>>>     public class SubmissionPublisherTest {
>>>>>>>
>>>>>>>         private final static int N = 1 << 20;
>>>>>>>
>>>>>>>         private final AtomicInteger numbers = new AtomicInteger();
>>>>>>>         private final SubmissionPublisher<Integer> pub = new
>>>>>>> SubmissionPublisher<>();
>>>>>>>         private final ExecutorService pubExecutor =
>>>>>>> Executors.newSingleThreadExecutor();
>>>>>>>         private final CountDownLatch finished = new
>>>>>>> CountDownLatch(1);
>>>>>>>
>>>>>>>         public static void main(String[] args) throws
>>>>>>> InterruptedException {
>>>>>>>             new SubmissionPublisherTest().run();
>>>>>>>         }
>>>>>>>
>>>>>>>         private void run() throws InterruptedException {
>>>>>>>             pub.subscribe(newSubscriber());
>>>>>>>             try {
>>>>>>>                 finished.await(30, TimeUnit.SECONDS);
>>>>>>>             } finally {
>>>>>>>                 pubExecutor.shutdownNow();
>>>>>>>             }
>>>>>>>             System.out.println("Finished");
>>>>>>>         }
>>>>>>>
>>>>>>>         private Flow.Subscriber<Integer> newSubscriber() {
>>>>>>>             return new Flow.Subscriber<>() {
>>>>>>>
>>>>>>>                 Flow.Subscription sub;
>>>>>>>                 int received;
>>>>>>>
>>>>>>>                 @Override
>>>>>>>                 public void onSubscribe(Flow.Subscription s) {
>>>>>>>                     (this.sub = s).request(N);
>>>>>>>                     publish();
>>>>>>>                 }
>>>>>>>
>>>>>>>                 @Override
>>>>>>>                 public void onNext(Integer item) {
>>>>>>>                     if (++received == N) finished.countDown();
>>>>>>>                     publish();
>>>>>>>                     System.out.println(item);
>>>>>>>                 }
>>>>>>>
>>>>>>>                 @Override public void onError(Throwable t) { }
>>>>>>>                 @Override public void onComplete() { }
>>>>>>>             };
>>>>>>>         }
>>>>>>>
>>>>>>>         private void publish() {
>>>>>>>             int number = numbers.incrementAndGet();
>>>>>>>             BiPredicate<Flow.Subscriber<? super Integer>, Integer>
>>>>>>>                     onDropReportError = (s, i) -> { throw new
>>>>>>> InternalError(); };
>>>>>>>             pubExecutor.execute(() -> pub.offer(number,
>>>>>>> onDropReportError));
>>>>>>>     //        pub.offer(number, onDropReportError);
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>> What happens here?
>>>>>>>
>>>>>>> Subscriber.onSubscribe bulk-requests a number (N = 1048576) of
>>>>>>> integers from
>>>>>>> SubmissionPublisher. Subscriber.onNext prints the integer it has
>>>>>>> received and
>>>>>>> offers a single integer to the publisher. run() waits until all N
>>>>>>> integers have
>>>>>>> been received and then returns.
>>>>>>>
>>>>>>> onSubscribe offers an initial integer to the SubmissionPublisher.
>>>>>>> This
>>>>>>> kick-starts
>>>>>>> the "feedback loop".
>>>>>>>
>>>>>>> If you run this snippet, chances are good you will not see all
>>>>>>> 1048576 integers
>>>>>>> printed out to the console before the program terminates. Instead,
>>>>>>> the output
>>>>>>> will stop at some number m < 1048576 for no apparent reason.
>>>>>>>
>>>>>>> However, if you comment out the line:
>>>>>>>
>>>>>>>     pubExecutor.execute(() -> pub.offer(number, onDropReportError));
>>>>>>>
>>>>>>> and uncomment the previously commented line, everything will work as
>>>>>>> expected.
>>>>>>> The difference is that in this case offers of integers happen
>>>>>>> synchronously
>>>>>>> rather than from a separate thread.
>>>>>>>
>>>>>>> P.S. I have debugged a little bit and my guess (I'm not a
>>>>>>> concurrency expert)
>>>>>>> is that accesses to SubmissionPublisher.BufferedSubscription.tail
>>>>>>> field are
>>>>>>> undersynchronized. In particular, it might be the case that updates
>>>>>>> to this
>>>>>>> field are not always seen by the consumer task in
>>>>>>> BufferedSubscription.checkEmpty.
>>>>>>>
>>>>>>> Is there any chance `tail` field is simply missing volatile modifier?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> -Pavel
>>>>>>> _______________________________________________
>>>>>>> Concurrency-interest mailing list
>>>>>>> Concurrency-interest at cs.oswego.edu
>>>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>>>>>>>>
>>>>>> _______________________________________________
>>>>>> Concurrency-interest mailing list
>>>>>> Concurrency-interest at cs.oswego.edu
>>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards,
>>>>> David Karnok
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>>>>>>
>>>
>>>
>>>
>>> --
>>> Best regards,
>>> David Karnok
>>>
>>
>>
>>
>> --
>> Cheers,
>>>>
>
>
>
> --
> Best regards,
> David Karnok
>



-- 
Cheers,
√
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170926/1c800a77/attachment-0001.html>


More information about the Concurrency-interest mailing list