[concurrency-interest] A race in SubmissionPublisher?

Viktor Klang viktor.klang at gmail.com
Tue Sep 26 09:42:13 EDT 2017


On Tue, Sep 26, 2017 at 2:49 PM, Dávid Karnok <akarnokd at gmail.com> wrote:

> Addendum to the rules listed:
>
> §2.5: in practice, this should not happen with reasonable reactive
> libraries. Depending on how you trust the Publisher, omitting such checks
> is not a disaster.
>

It's not a disaster, it's only non-compliance—it is a MUST rule, after all.


> Plus, the JavaDoc's example is also hints about plain store of the
> Flow.Subscription:
>
> http://download.java.net/java/jdk9/docs/api/java/util/concurrent/
> SubmissionPublisher.html
>
> class TransformProcessor<S,T> extends SubmissionPublisher<T>
>    implements Flow.Processor<S,T> {
>    final Function<? super S, ? extends T> function;
>    Flow.Subscription subscription;
>    TransformProcessor(Executor executor, int maxBufferCapacity,
>                       Function<? super S, ? extends T> function) {
>      super(executor, maxBufferCapacity);
>      this.function = function;
>    }
>    public void onSubscribe(Flow.Subscription subscription) {
>
>
I've been meaning to send some JavaDoc contributions, I'll keep this one in
mind! :)


>
>      *(this.subscription = subscription).request(1);*
>    }
>    public void onNext(S item) {
>      subscription.request(1);
>      *submit(function.apply(item));*
>    }
>    public void onError(Throwable ex) { closeExceptionally(ex); }
>    public void onComplete() { close(); }
>  }
>
> §2.13: Yes, pubExecutor.execute() can throw a RejectedExecutionException
> in general, but often you can't do much about that case.
>

Most definitely, but that exception should most definitely not infect the
issuer of the signal. The spec is unambiguous about this (for a reason!). :)


> Plus again, the example above uses submit() which can also throw:
> http://download.java.net/java/jdk9/docs/api/java/util/concurrent/
> SubmissionPublisher.html#submit-T-
>
> 2017-09-26 14:36 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:
>
>>
>>
>> On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <akarnokd at gmail.com> wrote:
>>
>>> Viktor, can you be more specific? It just looks uncommon to me but
>>> otherwise legitimate. The fact that there is no cancel() call when all
>>> requested data has arrived is irrelevant from the perspective of the
>>> underlying bug in SubmissionPublisher.
>>>
>>
>> Absolutely, my comment was not about the SubmissionPublisher bug. I was
>> unsure whether the provided code was for reproduction only, or was written
>> like that in general, in the latter case it needs a bit of adjustment to
>> conform to spec.
>>
>>
>>>
>>> Here is a variant where the Subscriber is just consuming items and
>>> counting how many items have been received. The asynchronous sender reads
>>> this count to know when to send the next value to expose the internal race.
>>>
>>> public class SpPublishAlt {
>>>     @Test
>>>     public void test() throws Exception {
>>>         SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
>>>
>>>         ExecutorService exec = Executors.newSingleThreadExecutor();
>>>         try {
>>>             SpConsumer c = new SpConsumer();
>>>             sp.subscribe(c);
>>>
>>>             exec.submit(() -> {
>>>                 while (c.upstream == null) ;
>>>
>>>                 int i = 0;
>>>                 sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>>>
>>>                 while (i < SpConsumer.N) {
>>>                     while (c.getAcquire() == i);
>>>                     i++;
>>>                     sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>>>                 }
>>>             });
>>>
>>>             if (!c.cdl.await(10, TimeUnit.SECONDS)) {
>>>                 throw new AssertionError("Timed out " + c.getAcquire());
>>>             }
>>>         } finally {
>>>             exec.shutdownNow();
>>>         }
>>>     }
>>>
>>>     static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {
>>>
>>>         static final int N = 1 << 20;
>>>
>>>         final CountDownLatch cdl = new CountDownLatch(1);
>>>
>>>         volatile Flow.Subscription upstream;
>>>
>>>         @Override
>>>         public void onSubscribe(Flow.Subscription subscription) {
>>>             upstream = subscription;
>>>             subscription.request(N);
>>>         }
>>>
>>>         @Override
>>>         public void onNext(Object item) {
>>>             System.out.println(item);
>>>             int i = getPlain() + 1;
>>>             setRelease(i);
>>>             if (i == N) {
>>>                 upstream.cancel();
>>>                 cdl.countDown();
>>>             }
>>>         }
>>>
>>>         @Override
>>>         public void onError(Throwable throwable) {
>>>             throwable.printStackTrace();
>>>             cdl.countDown();
>>>         }
>>>
>>>         @Override
>>>         public void onComplete() {
>>>             cdl.countDown();
>>>         }
>>>     }
>>> }
>>>
>>>
>>> 2017-09-26 13:58 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:
>>>
>>>> Hi Pavel,
>>>>
>>>> I trust that the code you submitted is for test case reproduction
>>>> primarily, but there are a few spec violations in your Subscriber
>>>> implementation that you'll want to address.
>>>>
>>>> We're currently in the process of releasing a bridge jar between
>>>> org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS
>>>> TCK to have them verified.
>>>>
>>>> On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <pavel.rappo at gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I've been using SubmissionPublisher in my own publisher implementation
>>>>> in order
>>>>> to reuse its complex state machine that serializes invocations to
>>>>> subscribers.
>>>>>
>>>>> While testing my implementation I ran into what I believe might be a
>>>>> race
>>>>> condition in SubmissionPublisher.
>>>>>
>>>>> Consider the following example:
>>>>>
>>>>>     public class SubmissionPublisherTest {
>>>>>
>>>>>         private final static int N = 1 << 20;
>>>>>
>>>>>         private final AtomicInteger numbers = new AtomicInteger();
>>>>>         private final SubmissionPublisher<Integer> pub = new
>>>>> SubmissionPublisher<>();
>>>>>         private final ExecutorService pubExecutor =
>>>>> Executors.newSingleThreadExecutor();
>>>>>         private final CountDownLatch finished = new CountDownLatch(1);
>>>>>
>>>>>         public static void main(String[] args) throws
>>>>> InterruptedException {
>>>>>             new SubmissionPublisherTest().run();
>>>>>         }
>>>>>
>>>>>         private void run() throws InterruptedException {
>>>>>             pub.subscribe(newSubscriber());
>>>>>             try {
>>>>>                 finished.await(30, TimeUnit.SECONDS);
>>>>>             } finally {
>>>>>                 pubExecutor.shutdownNow();
>>>>>             }
>>>>>             System.out.println("Finished");
>>>>>         }
>>>>>
>>>>>         private Flow.Subscriber<Integer> newSubscriber() {
>>>>>             return new Flow.Subscriber<>() {
>>>>>
>>>>>                 Flow.Subscription sub;
>>>>>                 int received;
>>>>>
>>>>>                 @Override
>>>>>                 public void onSubscribe(Flow.Subscription s) {
>>>>>                     (this.sub = s).request(N);
>>>>>                     publish();
>>>>>                 }
>>>>>
>>>>>                 @Override
>>>>>                 public void onNext(Integer item) {
>>>>>                     if (++received == N) finished.countDown();
>>>>>                     publish();
>>>>>                     System.out.println(item);
>>>>>                 }
>>>>>
>>>>>                 @Override public void onError(Throwable t) { }
>>>>>                 @Override public void onComplete() { }
>>>>>             };
>>>>>         }
>>>>>
>>>>>         private void publish() {
>>>>>             int number = numbers.incrementAndGet();
>>>>>             BiPredicate<Flow.Subscriber<? super Integer>, Integer>
>>>>>                     onDropReportError = (s, i) -> { throw new
>>>>> InternalError(); };
>>>>>             pubExecutor.execute(() -> pub.offer(number,
>>>>> onDropReportError));
>>>>>     //        pub.offer(number, onDropReportError);
>>>>>         }
>>>>>     }
>>>>>
>>>>> What happens here?
>>>>>
>>>>> Subscriber.onSubscribe bulk-requests a number (N = 1048576) of
>>>>> integers from
>>>>> SubmissionPublisher. Subscriber.onNext prints the integer it has
>>>>> received and
>>>>> offers a single integer to the publisher. run() waits until all N
>>>>> integers have
>>>>> been received and then returns.
>>>>>
>>>>> onSubscribe offers an initial integer to the SubmissionPublisher. This
>>>>> kick-starts
>>>>> the "feedback loop".
>>>>>
>>>>> If you run this snippet, chances are good you will not see all 1048576
>>>>> integers
>>>>> printed out to the console before the program terminates. Instead, the
>>>>> output
>>>>> will stop at some number m < 1048576 for no apparent reason.
>>>>>
>>>>> However, if you comment out the line:
>>>>>
>>>>>     pubExecutor.execute(() -> pub.offer(number, onDropReportError));
>>>>>
>>>>> and uncomment the previously commented line, everything will work as
>>>>> expected.
>>>>> The difference is that in this case offers of integers happen
>>>>> synchronously
>>>>> rather than from a separate thread.
>>>>>
>>>>> P.S. I have debugged a little bit and my guess (I'm not a concurrency
>>>>> expert)
>>>>> is that accesses to SubmissionPublisher.BufferedSubscription.tail
>>>>> field are
>>>>> undersynchronized. In particular, it might be the case that updates to
>>>>> this
>>>>> field are not always seen by the consumer task in
>>>>> BufferedSubscription.checkEmpty.
>>>>>
>>>>> Is there any chance `tail` field is simply missing volatile modifier?
>>>>>
>>>>> Thanks,
>>>>> -Pavel
>>>>> _______________________________________________
>>>>> Concurrency-interest mailing list
>>>>> Concurrency-interest at cs.oswego.edu
>>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Cheers,
>>>>>>>>
>>>> _______________________________________________
>>>> Concurrency-interest mailing list
>>>> Concurrency-interest at cs.oswego.edu
>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>
>>>>
>>>
>>>
>>> --
>>> Best regards,
>>> David Karnok
>>>
>>
>>
>>
>> --
>> Cheers,
>>>>
>
>
>
> --
> Best regards,
> David Karnok
>



-- 
Cheers,
√
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170926/9e7b31ff/attachment-0001.html>


More information about the Concurrency-interest mailing list