[concurrency-interest] A race in SubmissionPublisher?

Dávid Karnok akarnokd at gmail.com
Tue Sep 26 10:13:42 EDT 2017


Well, the spec is the spec. (Consequently, I guess you'll have much fun
with this one:
https://examples.javacodegeeks.com/core-java/java-9-reactive-streams-example/
)

2017-09-26 15:42 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:

>
>
> On Tue, Sep 26, 2017 at 2:49 PM, Dávid Karnok <akarnokd at gmail.com> wrote:
>
>> Addendum to the rules listed:
>>
>> §2.5: in practice, this should not happen with reasonable reactive
>> libraries. Depending on how you trust the Publisher, omitting such checks
>> is not a disaster.
>>
>
> It's not a disaster, it's only non-compliance—it is a MUST rule, after all.
>
>
>> Plus, the JavaDoc's example is also hints about plain store of the
>> Flow.Subscription:
>>
>> http://download.java.net/java/jdk9/docs/api/java/util/concur
>> rent/SubmissionPublisher.html
>>
>> class TransformProcessor<S,T> extends SubmissionPublisher<T>
>>    implements Flow.Processor<S,T> {
>>    final Function<? super S, ? extends T> function;
>>    Flow.Subscription subscription;
>>    TransformProcessor(Executor executor, int maxBufferCapacity,
>>                       Function<? super S, ? extends T> function) {
>>      super(executor, maxBufferCapacity);
>>      this.function = function;
>>    }
>>    public void onSubscribe(Flow.Subscription subscription) {
>>
>>
> I've been meaning to send some JavaDoc contributions, I'll keep this one
> in mind! :)
>
>
>>
>>      *(this.subscription = subscription).request(1);*
>>    }
>>    public void onNext(S item) {
>>      subscription.request(1);
>>      *submit(function.apply(item));*
>>    }
>>    public void onError(Throwable ex) { closeExceptionally(ex); }
>>    public void onComplete() { close(); }
>>  }
>>
>> §2.13: Yes, pubExecutor.execute() can throw a RejectedExecutionException
>> in general, but often you can't do much about that case.
>>
>
> Most definitely, but that exception should most definitely not infect the
> issuer of the signal. The spec is unambiguous about this (for a reason!). :)
>
>
>> Plus again, the example above uses submit() which can also throw:
>> http://download.java.net/java/jdk9/docs/api/java/util/concur
>> rent/SubmissionPublisher.html#submit-T-
>>
>> 2017-09-26 14:36 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:
>>
>>>
>>>
>>> On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <akarnokd at gmail.com>
>>> wrote:
>>>
>>>> Viktor, can you be more specific? It just looks uncommon to me but
>>>> otherwise legitimate. The fact that there is no cancel() call when all
>>>> requested data has arrived is irrelevant from the perspective of the
>>>> underlying bug in SubmissionPublisher.
>>>>
>>>
>>> Absolutely, my comment was not about the SubmissionPublisher bug. I was
>>> unsure whether the provided code was for reproduction only, or was written
>>> like that in general, in the latter case it needs a bit of adjustment to
>>> conform to spec.
>>>
>>>
>>>>
>>>> Here is a variant where the Subscriber is just consuming items and
>>>> counting how many items have been received. The asynchronous sender reads
>>>> this count to know when to send the next value to expose the internal race.
>>>>
>>>> public class SpPublishAlt {
>>>>     @Test
>>>>     public void test() throws Exception {
>>>>         SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
>>>>
>>>>         ExecutorService exec = Executors.newSingleThreadExecutor();
>>>>         try {
>>>>             SpConsumer c = new SpConsumer();
>>>>             sp.subscribe(c);
>>>>
>>>>             exec.submit(() -> {
>>>>                 while (c.upstream == null) ;
>>>>
>>>>                 int i = 0;
>>>>                 sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>>>>
>>>>                 while (i < SpConsumer.N) {
>>>>                     while (c.getAcquire() == i);
>>>>                     i++;
>>>>                     sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>>>>                 }
>>>>             });
>>>>
>>>>             if (!c.cdl.await(10, TimeUnit.SECONDS)) {
>>>>                 throw new AssertionError("Timed out " + c.getAcquire());
>>>>             }
>>>>         } finally {
>>>>             exec.shutdownNow();
>>>>         }
>>>>     }
>>>>
>>>>     static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {
>>>>
>>>>         static final int N = 1 << 20;
>>>>
>>>>         final CountDownLatch cdl = new CountDownLatch(1);
>>>>
>>>>         volatile Flow.Subscription upstream;
>>>>
>>>>         @Override
>>>>         public void onSubscribe(Flow.Subscription subscription) {
>>>>             upstream = subscription;
>>>>             subscription.request(N);
>>>>         }
>>>>
>>>>         @Override
>>>>         public void onNext(Object item) {
>>>>             System.out.println(item);
>>>>             int i = getPlain() + 1;
>>>>             setRelease(i);
>>>>             if (i == N) {
>>>>                 upstream.cancel();
>>>>                 cdl.countDown();
>>>>             }
>>>>         }
>>>>
>>>>         @Override
>>>>         public void onError(Throwable throwable) {
>>>>             throwable.printStackTrace();
>>>>             cdl.countDown();
>>>>         }
>>>>
>>>>         @Override
>>>>         public void onComplete() {
>>>>             cdl.countDown();
>>>>         }
>>>>     }
>>>> }
>>>>
>>>>
>>>> 2017-09-26 13:58 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:
>>>>
>>>>> Hi Pavel,
>>>>>
>>>>> I trust that the code you submitted is for test case reproduction
>>>>> primarily, but there are a few spec violations in your Subscriber
>>>>> implementation that you'll want to address.
>>>>>
>>>>> We're currently in the process of releasing a bridge jar between
>>>>> org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS
>>>>> TCK to have them verified.
>>>>>
>>>>> On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <pavel.rappo at gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I've been using SubmissionPublisher in my own publisher
>>>>>> implementation in order
>>>>>> to reuse its complex state machine that serializes invocations to
>>>>>> subscribers.
>>>>>>
>>>>>> While testing my implementation I ran into what I believe might be a
>>>>>> race
>>>>>> condition in SubmissionPublisher.
>>>>>>
>>>>>> Consider the following example:
>>>>>>
>>>>>>     public class SubmissionPublisherTest {
>>>>>>
>>>>>>         private final static int N = 1 << 20;
>>>>>>
>>>>>>         private final AtomicInteger numbers = new AtomicInteger();
>>>>>>         private final SubmissionPublisher<Integer> pub = new
>>>>>> SubmissionPublisher<>();
>>>>>>         private final ExecutorService pubExecutor =
>>>>>> Executors.newSingleThreadExecutor();
>>>>>>         private final CountDownLatch finished = new CountDownLatch(1);
>>>>>>
>>>>>>         public static void main(String[] args) throws
>>>>>> InterruptedException {
>>>>>>             new SubmissionPublisherTest().run();
>>>>>>         }
>>>>>>
>>>>>>         private void run() throws InterruptedException {
>>>>>>             pub.subscribe(newSubscriber());
>>>>>>             try {
>>>>>>                 finished.await(30, TimeUnit.SECONDS);
>>>>>>             } finally {
>>>>>>                 pubExecutor.shutdownNow();
>>>>>>             }
>>>>>>             System.out.println("Finished");
>>>>>>         }
>>>>>>
>>>>>>         private Flow.Subscriber<Integer> newSubscriber() {
>>>>>>             return new Flow.Subscriber<>() {
>>>>>>
>>>>>>                 Flow.Subscription sub;
>>>>>>                 int received;
>>>>>>
>>>>>>                 @Override
>>>>>>                 public void onSubscribe(Flow.Subscription s) {
>>>>>>                     (this.sub = s).request(N);
>>>>>>                     publish();
>>>>>>                 }
>>>>>>
>>>>>>                 @Override
>>>>>>                 public void onNext(Integer item) {
>>>>>>                     if (++received == N) finished.countDown();
>>>>>>                     publish();
>>>>>>                     System.out.println(item);
>>>>>>                 }
>>>>>>
>>>>>>                 @Override public void onError(Throwable t) { }
>>>>>>                 @Override public void onComplete() { }
>>>>>>             };
>>>>>>         }
>>>>>>
>>>>>>         private void publish() {
>>>>>>             int number = numbers.incrementAndGet();
>>>>>>             BiPredicate<Flow.Subscriber<? super Integer>, Integer>
>>>>>>                     onDropReportError = (s, i) -> { throw new
>>>>>> InternalError(); };
>>>>>>             pubExecutor.execute(() -> pub.offer(number,
>>>>>> onDropReportError));
>>>>>>     //        pub.offer(number, onDropReportError);
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>> What happens here?
>>>>>>
>>>>>> Subscriber.onSubscribe bulk-requests a number (N = 1048576) of
>>>>>> integers from
>>>>>> SubmissionPublisher. Subscriber.onNext prints the integer it has
>>>>>> received and
>>>>>> offers a single integer to the publisher. run() waits until all N
>>>>>> integers have
>>>>>> been received and then returns.
>>>>>>
>>>>>> onSubscribe offers an initial integer to the SubmissionPublisher. This
>>>>>> kick-starts
>>>>>> the "feedback loop".
>>>>>>
>>>>>> If you run this snippet, chances are good you will not see all
>>>>>> 1048576 integers
>>>>>> printed out to the console before the program terminates. Instead,
>>>>>> the output
>>>>>> will stop at some number m < 1048576 for no apparent reason.
>>>>>>
>>>>>> However, if you comment out the line:
>>>>>>
>>>>>>     pubExecutor.execute(() -> pub.offer(number, onDropReportError));
>>>>>>
>>>>>> and uncomment the previously commented line, everything will work as
>>>>>> expected.
>>>>>> The difference is that in this case offers of integers happen
>>>>>> synchronously
>>>>>> rather than from a separate thread.
>>>>>>
>>>>>> P.S. I have debugged a little bit and my guess (I'm not a concurrency
>>>>>> expert)
>>>>>> is that accesses to SubmissionPublisher.BufferedSubscription.tail
>>>>>> field are
>>>>>> undersynchronized. In particular, it might be the case that updates
>>>>>> to this
>>>>>> field are not always seen by the consumer task in
>>>>>> BufferedSubscription.checkEmpty.
>>>>>>
>>>>>> Is there any chance `tail` field is simply missing volatile modifier?
>>>>>>
>>>>>> Thanks,
>>>>>> -Pavel
>>>>>> _______________________________________________
>>>>>> Concurrency-interest mailing list
>>>>>> Concurrency-interest at cs.oswego.edu
>>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Cheers,
>>>>>>>>>>
>>>>> _______________________________________________
>>>>> Concurrency-interest mailing list
>>>>> Concurrency-interest at cs.oswego.edu
>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards,
>>>> David Karnok
>>>>
>>>
>>>
>>>
>>> --
>>> Cheers,
>>>>>>
>>
>>
>>
>> --
>> Best regards,
>> David Karnok
>>
>
>
>
> --
> Cheers,
>>



-- 
Best regards,
David Karnok
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170926/ed842135/attachment-0001.html>


More information about the Concurrency-interest mailing list