[concurrency-interest] Spot the bug in stream processing, IntConsumer and Executor

Tom Lee concurrency-interest at tomlee.co
Sat Feb 25 12:37:53 EST 2017


FWIW I noticed that 100/1000 thing too, and still got "bad" results (all
zeros) after changing the 100 to 1000.

On Feb 25, 2017 9:32 AM, "kedar mhaswade" <kedar.mhaswade at gmail.com> wrote:

> Thanks Tom and Benjamin. Yes, I am trying to demonstrate a race condition,
> so yes, this code is for illustration purposes only.
>
> *And upon a closer look, I found the bug in my code:*
>
>         Runnable increment = () -> {
>             for (int i = 0; i < *1000*; i++)
>                 count++;
>         };
>
>         // update the shared variable as a side effect of an
> IntConsumer#accept
>         Runnable incrementStream = () -> IntStream.rangeClosed(1, *100*).forEach(i
> -> count++);
>
>
> The stream-oriented task was getting a *100* instead of a *1000* :-P.
> Fixing this gives very similar results. Of course, it clearly demonstrates
> that count++ is *not* an atomic operation and some updates are lost as
> expected. I also verified that by using an AtomicInteger instead of a
> simple volatile, the final value of count is 10,000, always.
>
> Regards,
> Kedar
>
>
> On Sat, Feb 25, 2017 at 12:09 AM, Tom Lee <concurrency-interest at tomlee.co>
> wrote:
>
>> Hi Kedar,
>>
>> Disclaimer: I'd generally say something about this approach to having
>> multiple threads incrementing a variable being a bad idea in general, but
>> it sounds like you're just trying to explore the behavior of this race
>> right?
>>
>> Benjamin's on the right track: you're seeing zeros because the code
>> submitted to the executor hasn't run by the time you print out the "task
>> ... updates count to ..." messages. Still, it's not a complete fix & you'll
>> very likely continue to see very different results even if you add an
>> awaitTermination call after the shutdown() call. Also probably depends on
>> how fast your machine is etc. etc. too (e.g. perhaps on slower machines /
>> fewer cores you'd see similar output for both).
>>
>> It's still kind of interesting why the exhibited behavior is so
>> different. Here's a bit of a hint -- with the following code I get very
>> similar output irrespective of whether I'm using "increment" or
>> "incrementStream":
>>
>> private static volatile int count = 0;
>> public static void main(String[] args) throws Exception {
>>
>>     // warmup (slower)
>>     run(false);
>>
>>     // do it for real (faster)
>>     count = 0;
>>     run(true);
>> }
>>
>> private static void run(final boolean show) throws Exception {
>>     Runnable increment = () -> {
>>         for (int i = 0; i < 1000; i++)
>>             count++;
>>     };
>>
>>     Runnable incrementStream = () -> {
>>         IntStream.rangeClosed(1, 1000).forEach(i -> count++);
>>     };
>>     ExecutorService exec = Executors.newCachedThreadPool();
>>     try {
>>         for (int i = 0; i < 10; i++) {
>>             exec.execute(increment);
>>             // exec.execute(incrementStream);
>>             if (show) System.out.println("task: " + i + " updates count to: " + count);
>>         }
>>     }
>>     finally {
>>         exec.shutdown();
>>         exec.awaitTermination(10, TimeUnit.SECONDS);
>>         if (show) System.out.println("final: " + count);
>>     }
>> }
>>
>> Weird right?
>>
>> Another hint: I added code to println() a message to the end of
>> incrementStream() in your original code ("hello" or something silly like
>> that). I didn't see any "hello" messages until after all the "task ...
>> updates count to ..." messages were displayed.
>>
>> Without going too deep, I suspect all your incrementStream() threads are
>> held up by class loading etc. Specifically, the stream APIs you're using in
>> incrementStream pull in maybe 40 additional classes on an Oracle JVM. To
>> see for yourself, run the JVM with -verbose:class and run both increment
>> and incrementStream -- notice the latter does a bunch of extra work. Since
>> classes are sort of loaded on-demand, that work needs to happen before your
>> incrementStream threads can run. Thus why the warmup step above improves
>> the situation.
>>
>> Put another way: even though logically increment and incrementStream are
>> doing something very similar, the latter has to do a bunch of additional
>> work before any of the stuff that touches the count variable even gets to
>> run. And if it's not obvious it should be noted that your scenario here is
>> very small/fast and this effect will be less pronounced (but not entirely
>> absent) in a larger test.
>>
>> Cheers,
>> Tom
>>
>>
>> On Fri, Feb 24, 2017 at 9:07 PM, Benjamin Manes <ben.manes at gmail.com>
>> wrote:
>>
>>> You need to wait until the executor has completed, or else the main
>>> method may complete prior to the task running.
>>>
>>> exec.awaitTermination(1, TimeUnit.MINUTES);
>>>
>>>
>>> On Fri, Feb 24, 2017 at 8:53 PM, kedar mhaswade <
>>> kedar.mhaswade at gmail.com> wrote:
>>>
>>>> ​Perhaps I had a long day. So, this might completely be a silly
>>>> mistake, but I need congenial help to figure it out.
>>>>
>>>> To demonstrate race condition, I wrote the following program. The
>>>> program has ten concurrent tasks that modify count, a volatile shared
>>>> variable. If I use the *increment* task (line 39), I get different
>>>> results (like, e.g. [2]) every time I run it, demonstrating the race
>>>> condition.
>>>>
>>>> However, if I use the *incrementStream* task instead (line 39), then
>>>> the count variable is not updated at all. The output is like [1] *every
>>>> time*. In a separate program not involving threads, I have verified
>>>> that the lambda expression like incrementStream updates a member variable
>>>> as expected.
>>>>
>>>> What am I doing wrong?
>>>>
>>>> Regards,
>>>> Kedar
>>>>
>>>>
>>>> public class RaceCondition {
>>>>
>>>>     private static volatile int count = 0;
>>>>     public static void main(String[] args) {
>>>>         // update the shared variable traditionally
>>>>         Runnable increment = () -> {
>>>>             for (int i = 0; i < 1000; i++)
>>>>                 count++;
>>>>         };
>>>>         // update the shared variable as a side effect of an
>>>> IntConsumer#accept
>>>>         Runnable incrementStream = () -> IntStream.rangeClosed(1,
>>>> 100).forEach(i -> count++);
>>>>         ExecutorService exec = Executors.newCachedThreadPool(); //
>>>> short lived tasks
>>>>         try {
>>>>             for (int i = 0; i < 10; i++) {
>>>>
>>>> ​// ​
>>>> exec.execute(incrementStream)
>>>> ​;
>>>> // line 38​
>>>> ​​
>>>>
>>>>               exec.execute(increment);
>>>> ​         // line 39​
>>>>
>>>>                 System.out.println("task: " + i + " updates count to: "
>>>> + count);
>>>>             }
>>>>         } finally {
>>>>             exec.shutdown();
>>>>             System.out.println("final: " + count);
>>>>         }
>>>>     }
>>>>
>>>> }
>>>> [1]
>>>> task: 0 updates count to: 0
>>>> task: 1 updates count to: 0
>>>> task: 2 updates count to: 0
>>>> task: 3 updates count to: 0
>>>> task: 4 updates count to: 0
>>>> task: 5 updates count to: 0
>>>> task: 6 updates count to: 0
>>>> task: 7 updates count to: 0
>>>> task: 8 updates count to: 0
>>>> task: 9 updates count to: 0
>>>> final: 0
>>>> [2]
>>>> task: 0 updates count to: 0
>>>> task: 1 updates count to: 1000
>>>> task: 2 updates count to: 1000
>>>> task: 3 updates count to: 2000
>>>> task: 4 updates count to: 4027
>>>> task: 5 updates count to: 5000
>>>> task: 6 updates count to: 6000
>>>> task: 7 updates count to: 7005
>>>> task: 8 updates count to: 8000
>>>> task: 9 updates count to: 8185
>>>> final: 9381
>>>>
>>>>
>>>> _______________________________________________
>>>> Concurrency-interest mailing list
>>>> Concurrency-interest at cs.oswego.edu
>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>>
>>>>
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.oswego.edu
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>
>>>
>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170225/d95228cd/attachment-0001.html>


More information about the Concurrency-interest mailing list