[concurrency-interest] Spot the bug in stream processing, IntConsumer and Executor

kedar mhaswade kedar.mhaswade at gmail.com
Sat Feb 25 12:32:54 EST 2017


Thanks Tom and Benjamin. Yes, I am trying to demonstrate a race condition,
so yes, this code is for illustration purposes only.

*And upon a closer look, I found the bug in my code:*

        Runnable increment = () -> {
            for (int i = 0; i < *1000*; i++)
                count++;
        };

        // update the shared variable as a side effect of an
IntConsumer#accept
        Runnable incrementStream = () -> IntStream.rangeClosed(1,
*100*).forEach(i
-> count++);


The stream-oriented task was getting a *100* instead of a *1000* :-P.
Fixing this gives very similar results. Of course, it clearly demonstrates
that count++ is *not* an atomic operation and some updates are lost as
expected. I also verified that by using an AtomicInteger instead of a
simple volatile, the final value of count is 10,000, always.

Regards,
Kedar


On Sat, Feb 25, 2017 at 12:09 AM, Tom Lee <concurrency-interest at tomlee.co>
wrote:

> Hi Kedar,
>
> Disclaimer: I'd generally say something about this approach to having
> multiple threads incrementing a variable being a bad idea in general, but
> it sounds like you're just trying to explore the behavior of this race
> right?
>
> Benjamin's on the right track: you're seeing zeros because the code
> submitted to the executor hasn't run by the time you print out the "task
> ... updates count to ..." messages. Still, it's not a complete fix & you'll
> very likely continue to see very different results even if you add an
> awaitTermination call after the shutdown() call. Also probably depends on
> how fast your machine is etc. etc. too (e.g. perhaps on slower machines /
> fewer cores you'd see similar output for both).
>
> It's still kind of interesting why the exhibited behavior is so different.
> Here's a bit of a hint -- with the following code I get very similar output
> irrespective of whether I'm using "increment" or "incrementStream":
>
> private static volatile int count = 0;
> public static void main(String[] args) throws Exception {
>
>     // warmup (slower)
>     run(false);
>
>     // do it for real (faster)
>     count = 0;
>     run(true);
> }
>
> private static void run(final boolean show) throws Exception {
>     Runnable increment = () -> {
>         for (int i = 0; i < 1000; i++)
>             count++;
>     };
>
>     Runnable incrementStream = () -> {
>         IntStream.rangeClosed(1, 1000).forEach(i -> count++);
>     };
>     ExecutorService exec = Executors.newCachedThreadPool();
>     try {
>         for (int i = 0; i < 10; i++) {
>             exec.execute(increment);
>             // exec.execute(incrementStream);
>             if (show) System.out.println("task: " + i + " updates count to: " + count);
>         }
>     }
>     finally {
>         exec.shutdown();
>         exec.awaitTermination(10, TimeUnit.SECONDS);
>         if (show) System.out.println("final: " + count);
>     }
> }
>
> Weird right?
>
> Another hint: I added code to println() a message to the end of
> incrementStream() in your original code ("hello" or something silly like
> that). I didn't see any "hello" messages until after all the "task ...
> updates count to ..." messages were displayed.
>
> Without going too deep, I suspect all your incrementStream() threads are
> held up by class loading etc. Specifically, the stream APIs you're using in
> incrementStream pull in maybe 40 additional classes on an Oracle JVM. To
> see for yourself, run the JVM with -verbose:class and run both increment
> and incrementStream -- notice the latter does a bunch of extra work. Since
> classes are sort of loaded on-demand, that work needs to happen before your
> incrementStream threads can run. Thus why the warmup step above improves
> the situation.
>
> Put another way: even though logically increment and incrementStream are
> doing something very similar, the latter has to do a bunch of additional
> work before any of the stuff that touches the count variable even gets to
> run. And if it's not obvious it should be noted that your scenario here is
> very small/fast and this effect will be less pronounced (but not entirely
> absent) in a larger test.
>
> Cheers,
> Tom
>
>
> On Fri, Feb 24, 2017 at 9:07 PM, Benjamin Manes <ben.manes at gmail.com>
> wrote:
>
>> You need to wait until the executor has completed, or else the main
>> method may complete prior to the task running.
>>
>> exec.awaitTermination(1, TimeUnit.MINUTES);
>>
>>
>> On Fri, Feb 24, 2017 at 8:53 PM, kedar mhaswade <kedar.mhaswade at gmail.com
>> > wrote:
>>
>>> ​Perhaps I had a long day. So, this might completely be a silly mistake,
>>> but I need congenial help to figure it out.
>>>
>>> To demonstrate race condition, I wrote the following program. The
>>> program has ten concurrent tasks that modify count, a volatile shared
>>> variable. If I use the *increment* task (line 39), I get different
>>> results (like, e.g. [2]) every time I run it, demonstrating the race
>>> condition.
>>>
>>> However, if I use the *incrementStream* task instead (line 39), then
>>> the count variable is not updated at all. The output is like [1] *every
>>> time*. In a separate program not involving threads, I have verified
>>> that the lambda expression like incrementStream updates a member variable
>>> as expected.
>>>
>>> What am I doing wrong?
>>>
>>> Regards,
>>> Kedar
>>>
>>>
>>> public class RaceCondition {
>>>
>>>     private static volatile int count = 0;
>>>     public static void main(String[] args) {
>>>         // update the shared variable traditionally
>>>         Runnable increment = () -> {
>>>             for (int i = 0; i < 1000; i++)
>>>                 count++;
>>>         };
>>>         // update the shared variable as a side effect of an
>>> IntConsumer#accept
>>>         Runnable incrementStream = () -> IntStream.rangeClosed(1,
>>> 100).forEach(i -> count++);
>>>         ExecutorService exec = Executors.newCachedThreadPool(); //
>>> short lived tasks
>>>         try {
>>>             for (int i = 0; i < 10; i++) {
>>>
>>> ​// ​
>>> exec.execute(incrementStream)
>>> ​;
>>> // line 38​
>>> ​​
>>>
>>>               exec.execute(increment);
>>> ​         // line 39​
>>>
>>>                 System.out.println("task: " + i + " updates count to: "
>>> + count);
>>>             }
>>>         } finally {
>>>             exec.shutdown();
>>>             System.out.println("final: " + count);
>>>         }
>>>     }
>>>
>>> }
>>> [1]
>>> task: 0 updates count to: 0
>>> task: 1 updates count to: 0
>>> task: 2 updates count to: 0
>>> task: 3 updates count to: 0
>>> task: 4 updates count to: 0
>>> task: 5 updates count to: 0
>>> task: 6 updates count to: 0
>>> task: 7 updates count to: 0
>>> task: 8 updates count to: 0
>>> task: 9 updates count to: 0
>>> final: 0
>>> [2]
>>> task: 0 updates count to: 0
>>> task: 1 updates count to: 1000
>>> task: 2 updates count to: 1000
>>> task: 3 updates count to: 2000
>>> task: 4 updates count to: 4027
>>> task: 5 updates count to: 5000
>>> task: 6 updates count to: 6000
>>> task: 7 updates count to: 7005
>>> task: 8 updates count to: 8000
>>> task: 9 updates count to: 8185
>>> final: 9381
>>>
>>>
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.oswego.edu
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>
>>>
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20170225/11c5a778/attachment.html>


More information about the Concurrency-interest mailing list