[concurrency-interest] A race in SubmissionPublisher?

Dávid Karnok akarnokd at gmail.com
Tue Sep 26 08:49:14 EDT 2017


Addendum to the rules listed:

§2.5: in practice, this should not happen with reasonable reactive
libraries. Depending on how you trust the Publisher, omitting such checks
is not a disaster. Plus, the JavaDoc's example is also hints about plain
store of the Flow.Subscription:

http://download.java.net/java/jdk9/docs/api/java/util/concurrent/SubmissionPublisher.html

class TransformProcessor<S,T> extends SubmissionPublisher<T>
   implements Flow.Processor<S,T> {
   final Function<? super S, ? extends T> function;
   Flow.Subscription subscription;
   TransformProcessor(Executor executor, int maxBufferCapacity,
                      Function<? super S, ? extends T> function) {
     super(executor, maxBufferCapacity);
     this.function = function;
   }
   public void onSubscribe(Flow.Subscription subscription) {
     *(this.subscription = subscription).request(1);*
   }
   public void onNext(S item) {
     subscription.request(1);
     *submit(function.apply(item));*
   }
   public void onError(Throwable ex) { closeExceptionally(ex); }
   public void onComplete() { close(); }
 }

§2.13: Yes, pubExecutor.execute() can throw a RejectedExecutionException in
general, but often you can't do much about that case. Plus again, the
example above uses submit() which can also throw:
http://download.java.net/java/jdk9/docs/api/java/util/concurrent/SubmissionPublisher.html#submit-T-

2017-09-26 14:36 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:

>
>
> On Tue, Sep 26, 2017 at 2:29 PM, Dávid Karnok <akarnokd at gmail.com> wrote:
>
>> Viktor, can you be more specific? It just looks uncommon to me but
>> otherwise legitimate. The fact that there is no cancel() call when all
>> requested data has arrived is irrelevant from the perspective of the
>> underlying bug in SubmissionPublisher.
>>
>
> Absolutely, my comment was not about the SubmissionPublisher bug. I was
> unsure whether the provided code was for reproduction only, or was written
> like that in general, in the latter case it needs a bit of adjustment to
> conform to spec.
>
>
>>
>> Here is a variant where the Subscriber is just consuming items and
>> counting how many items have been received. The asynchronous sender reads
>> this count to know when to send the next value to expose the internal race.
>>
>> public class SpPublishAlt {
>>     @Test
>>     public void test() throws Exception {
>>         SubmissionPublisher<Integer> sp = new SubmissionPublisher<>();
>>
>>         ExecutorService exec = Executors.newSingleThreadExecutor();
>>         try {
>>             SpConsumer c = new SpConsumer();
>>             sp.subscribe(c);
>>
>>             exec.submit(() -> {
>>                 while (c.upstream == null) ;
>>
>>                 int i = 0;
>>                 sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>>
>>                 while (i < SpConsumer.N) {
>>                     while (c.getAcquire() == i);
>>                     i++;
>>                     sp.offer(i, (a, b) -> { throw new RuntimeException(); });
>>                 }
>>             });
>>
>>             if (!c.cdl.await(10, TimeUnit.SECONDS)) {
>>                 throw new AssertionError("Timed out " + c.getAcquire());
>>             }
>>         } finally {
>>             exec.shutdownNow();
>>         }
>>     }
>>
>>     static final class SpConsumer extends AtomicInteger implements Flow.Subscriber<Object> {
>>
>>         static final int N = 1 << 20;
>>
>>         final CountDownLatch cdl = new CountDownLatch(1);
>>
>>         volatile Flow.Subscription upstream;
>>
>>         @Override
>>         public void onSubscribe(Flow.Subscription subscription) {
>>             upstream = subscription;
>>             subscription.request(N);
>>         }
>>
>>         @Override
>>         public void onNext(Object item) {
>>             System.out.println(item);
>>             int i = getPlain() + 1;
>>             setRelease(i);
>>             if (i == N) {
>>                 upstream.cancel();
>>                 cdl.countDown();
>>             }
>>         }
>>
>>         @Override
>>         public void onError(Throwable throwable) {
>>             throwable.printStackTrace();
>>             cdl.countDown();
>>         }
>>
>>         @Override
>>         public void onComplete() {
>>             cdl.countDown();
>>         }
>>     }
>> }
>>
>>
>> 2017-09-26 13:58 GMT+02:00 Viktor Klang <viktor.klang at gmail.com>:
>>
>>> Hi Pavel,
>>>
>>> I trust that the code you submitted is for test case reproduction
>>> primarily, but there are a few spec violations in your Subscriber
>>> implementation that you'll want to address.
>>>
>>> We're currently in the process of releasing a bridge jar between
>>> org.reactivestreams and j.u.c.Flow so you can plug in your Flow.* to the RS
>>> TCK to have them verified.
>>>
>>> On Mon, Sep 25, 2017 at 3:21 PM, Pavel Rappo <pavel.rappo at gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I've been using SubmissionPublisher in my own publisher implementation
>>>> in order
>>>> to reuse its complex state machine that serializes invocations to
>>>> subscribers.
>>>>
>>>> While testing my implementation I ran into what I believe might be a
>>>> race
>>>> condition in SubmissionPublisher.
>>>>
>>>> Consider the following example:
>>>>
>>>>     public class SubmissionPublisherTest {
>>>>
>>>>         private final static int N = 1 << 20;
>>>>
>>>>         private final AtomicInteger numbers = new AtomicInteger();
>>>>         private final SubmissionPublisher<Integer> pub = new
>>>> SubmissionPublisher<>();
>>>>         private final ExecutorService pubExecutor =
>>>> Executors.newSingleThreadExecutor();
>>>>         private final CountDownLatch finished = new CountDownLatch(1);
>>>>
>>>>         public static void main(String[] args) throws
>>>> InterruptedException {
>>>>             new SubmissionPublisherTest().run();
>>>>         }
>>>>
>>>>         private void run() throws InterruptedException {
>>>>             pub.subscribe(newSubscriber());
>>>>             try {
>>>>                 finished.await(30, TimeUnit.SECONDS);
>>>>             } finally {
>>>>                 pubExecutor.shutdownNow();
>>>>             }
>>>>             System.out.println("Finished");
>>>>         }
>>>>
>>>>         private Flow.Subscriber<Integer> newSubscriber() {
>>>>             return new Flow.Subscriber<>() {
>>>>
>>>>                 Flow.Subscription sub;
>>>>                 int received;
>>>>
>>>>                 @Override
>>>>                 public void onSubscribe(Flow.Subscription s) {
>>>>                     (this.sub = s).request(N);
>>>>                     publish();
>>>>                 }
>>>>
>>>>                 @Override
>>>>                 public void onNext(Integer item) {
>>>>                     if (++received == N) finished.countDown();
>>>>                     publish();
>>>>                     System.out.println(item);
>>>>                 }
>>>>
>>>>                 @Override public void onError(Throwable t) { }
>>>>                 @Override public void onComplete() { }
>>>>             };
>>>>         }
>>>>
>>>>         private void publish() {
>>>>             int number = numbers.incrementAndGet();
>>>>             BiPredicate<Flow.Subscriber<? super Integer>, Integer>
>>>>                     onDropReportError = (s, i) -> { throw new
>>>> InternalError(); };
>>>>             pubExecutor.execute(() -> pub.offer(number,
>>>> onDropReportError));
>>>>     //        pub.offer(number, onDropReportError);
>>>>         }
>>>>     }
>>>>
>>>> What happens here?
>>>>
>>>> Subscriber.onSubscribe bulk-requests a number (N = 1048576) of integers
>>>> from
>>>> SubmissionPublisher. Subscriber.onNext prints the integer it has
>>>> received and
>>>> offers a single integer to the publisher. run() waits until all N
>>>> integers have
>>>> been received and then returns.
>>>>
>>>> onSubscribe offers an initial integer to the SubmissionPublisher. This
>>>> kick-starts
>>>> the "feedback loop".
>>>>
>>>> If you run this snippet, chances are good you will not see all 1048576
>>>> integers
>>>> printed out to the console before the program terminates. Instead, the
>>>> output
>>>> will stop at some number m < 1048576 for no apparent reason.
>>>>
>>>> However, if you comment out the line:
>>>>
>>>>     pubExecutor.execute(() -> pub.offer(number, onDropReportError));
>>>>
>>>> and uncomment the previously commented line, everything will work as
>>>> expected.
>>>> The difference is that in this case offers of integers happen
>>>> synchronously
>>>> rather than from a separate thread.
>>>>
>>>> P.S. I have debugged a little bit and my guess (I'm not a concurrency
>>>> expert)
>>>> is that accesses to SubmissionPublisher.BufferedSubscription.tail
>>>> field are
>>>> undersynchronized. In particular, it might be the case that updates to
>>>> this
>>>> field are not always seen by the consumer task in
>>>> BufferedSubscription.checkEmpty.
>>>>
>>>> Is there any chance `tail` field is simply missing volatile modifier?
>>>>
>>>> Thanks,
>>>> -Pavel
>>>> _______________________________________________
>>>> Concurrency-interest mailing list
>>>> Concurrency-interest at cs.oswego.edu
>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>
>>>
>>>
>>>
>>> --
>>> Cheers,
>>>>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.oswego.edu
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>
>>>
>>
>>
>> --
>> Best regards,
>> David Karnok
>>
>
>
>
> --
> Cheers,
>>



-- 
Best regards,
David Karnok
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170926/01cf9243/attachment-0001.html>


More information about the Concurrency-interest mailing list