[concurrency-interest] Unsafe + DirectBuffer concurrency bug?

Dmitry Vyazelenko vyazelenko at yahoo.com
Mon Sep 8 17:29:11 EDT 2014


Nitsan, nailed it. Moving isFlagSet() call before reading writerCounter fixes the program.


Best regards,
Dmitry Vyazelenko
On Sep 8, 2014, at 23:14 , Martin Thompson <mjpt777 at gmail.com> wrote:

> No worries. I'll have a look tomorrow and see if it makes sense to me. I was hoping for the bug not being in the JVM/JDK, if not then that is a great result :-)
> 
> On a quick review I think you are correct.
> 
> On 8 September 2014 21:59, Nitsan Wakart <nitsanw at yahoo.com> wrote:
> Sorry for the short hand reply, no brusqueness intended.
> Hope it hits the mark.
>  
> 
> 
> On Monday, September 8, 2014 10:53 PM, Nitsan Wakart <nitsanw at yahoo.com> wrote:
> 
> 
> 1    readCounter = getLongVolatile(READER_COUNTER_OFFSET);
> 2      writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
> 3      if (readCounter == writeCounter && isFlagSet()) {
> 4           return false;
> 5       } else if (readCounter < writeCounter) {
> 6           putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
> 7       }
> 
> It is possible for 1,2 to execute and see 0, then have the writer thread blaze through it's loop and set the flag to 1before you hit 3.
> The HB relationship assumption is in the wrong order, you should read flag first and flag visibility necessitates the counter visibility.
> 
> 
> On Monday, September 8, 2014 10:23 PM, Martin Thompson <mjpt777 at gmail.com> wrote:
> 
> 
> The version below fails for me every time but at different iteration values.
> 
> Regards,
> Martin...
> 
> import sun.misc.Unsafe;
> 
> import java.io.File;
> import java.io.IOException;
> import java.io.RandomAccessFile;
> import java.lang.reflect.Field;
> import java.nio.MappedByteBuffer;
> import java.nio.channels.FileChannel;
> import java.nio.file.Files;
> import java.nio.file.StandardOpenOption;
> 
> class IpcBase implements AutoCloseable
> {
>     private static final long PADDING = 64;
>     static final long READER_COUNTER_OFFSET = PADDING;
>     static final long WRITER_COUNTER_OFFSET = READER_COUNTER_OFFSET + PADDING;
>     static final long FLAG_OFFSET = WRITER_COUNTER_OFFSET + PADDING;
>     static final long TOTAL_LENGTH = FLAG_OFFSET + PADDING;
> 
>     private static final Unsafe UNSAFE;
> 
>     static
>     {
>         try
>         {
>             final Field field = Unsafe.class.getDeclaredField("theUnsafe");
>             field.setAccessible(true);
>             UNSAFE = (Unsafe)field.get(null);
>         }
>         catch (final Exception ex)
>         {
>             throw new RuntimeException(ex);
>         }
>     }
> 
>     private final FileChannel channel;
>     private final MappedByteBuffer buffer; // hold reference to prevent GC
>     private final long baseAddress;
> 
>     public IpcBase(final File file) throws IOException
>     {
>         channel = new RandomAccessFile(file, "rw").getChannel();
>         buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, TOTAL_LENGTH);
>         baseAddress = ((sun.nio.ch.DirectBuffer)buffer).address();
>     }
> 
>     final void putLongVolatile(final long offset, final long value)
>     {
>         UNSAFE.putLongVolatile(null, baseAddress + offset, value);
>     }
> 
>     final long getLongVolatile(final long offset)
>     {
>         return UNSAFE.getLongVolatile(null, baseAddress + offset);
>     }
> 
>     @Override
>     public void close() throws Exception
>     {
>         channel.close();
>     }
> }
> 
> class Writer extends IpcBase
> {
>     long writerCounter;
> 
>     public Writer(final File file) throws IOException
>     {
>         super(file);
>     }
> 
>     @Override
>     public void close() throws Exception
>     {
>         putLongVolatile(FLAG_OFFSET, 1L);
>         super.close();
>     }
> 
>     public long write()
>     {
>         writerCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         putLongVolatile(WRITER_COUNTER_OFFSET, ++writerCounter);
>         return writerCounter;
>     }
> }
> 
> class Reader extends IpcBase
> {
>     long readCounter;
>     long writeCounter;
> 
>     public Reader(final File file) throws IOException
>     {
>         super(file);
>     }
> 
>     public boolean read()
>     {
>         readCounter = getLongVolatile(READER_COUNTER_OFFSET);
>         writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         if (readCounter == writeCounter && isFlagSet())
>         {
>             return false;
>         }
>         else if (readCounter < writeCounter)
>         {
>             putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
>         }
> 
>         return true;
>     }
> 
>     private boolean isFlagSet()
>     {
>         return 1L == getLongVolatile(FLAG_OFFSET);
>     }
> }
> 
> public class IpcBugTest
> {
>     public static int iteration = 0;
> 
>     public static void main(String[] args) throws Exception
>     {
>         final File file = createFile();
> 
>         for (int i = 0; i < 1_000_000; i++)
>         {
>             iteration = i;
>             testIpc(file);
>         }
> 
>         System.out.println("NO BUG HERE! :-)");
>     }
> 
>     private static File createFile() throws IOException
>     {
>         final File file = File.createTempFile("ipc-", ".dat");
>         file.deleteOnExit();
> 
>         return file;
>     }
> 
>     private static void testIpc(final File file) throws Exception
>     {
>         writeZeros(file);
> 
>         final Reader reader = new Reader(file);
>         final Thread readerThread = new Thread("READER")
>         {
>             @Override
>             public void run()
>             {
>                 try
>                 {
>                     try
>                     {
>                         while (reader.read())
>                         {
>                             // busy spin
>                         }
>                     }
>                     finally
>                     {
>                         reader.close();
>                     }
>                 }
>                 catch (Exception ex)
>                 {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         readerThread.start();
> 
>         final Writer writer = new Writer(file);
>         final Thread writerThread = new Thread("WRITER")
>         {
>             @Override
>             public void run()
>             {
>                 try
>                 {
>                     try
>                     {
>                         while (writer.write() < 42)
>                         {
>                             // busy spin
>                         }
>                     }
>                     finally
>                     {
>                         writer.close();
>                     }
>                 }
>                 catch (Exception ex)
>                 {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         writerThread.start();
> 
>         writerThread.join();
>         readerThread.join();
> 
>         if (reader.readCounter != writer.writerCounter)
>         {
>             throw new Error(
>                 "Reader exited prematurely: Reader.readCounter=" + reader.readCounter +
>                 ", Reader.writeCounter=" + reader.writeCounter +
>                 ", Writer.writeCounter=" + writer.writerCounter +
>                 ", iteration=" + iteration);
>         }
>     }
> 
>     private static void writeZeros(final File file) throws IOException
>     {
>         Files.write(file.toPath(), new byte[(int)IpcBase.TOTAL_LENGTH], StandardOpenOption.WRITE);
>     }
> }
> 
> Date: Mon, 8 Sep 2014 08:08:29 -0700
> From: Dmitry Vyazelenko <vyazelenko at yahoo.com>
> To: "concurrency-interest at cs.oswego.edu"
>         <concurrency-interest at cs.oswego.edu>
> Subject: [concurrency-interest] Unsafe + DirectBuffer concurrency bug?
> Message-ID:
>         <1410188909.78423.YahooMailNeo at web162206.mail.bf1.yahoo.com>
> Content-Type: text/plain; charset="utf-8"
> 
> Hi all,
> 
> Please find standalone test that points to potential Unsafe/DirectBuffer issue. The test uses memory mapped file (DirectBuffer) and Unsafe to communicate between reader and writer.
> The data layout within the buffer:
> <64 bytes padding><reader counter><64 bytes padding><writer counter><64 bytes padding><end of data flag><64 bytes padding>
> Total number of bytes is 256.
> 
> The Writer thread keeps incrementing writerCounter until it reaches value 42. At which point it also writes endOfDataFlag and closes the FileChannel.
> 
> The Reader threads reads both readerCounter and writerCounter. If both have the same value and endOfDataFlag is set then it exits. Otherwise it increments readerCounter.
> 
> Below is the complete test:
> 
> package ipc_bug;
> 
> import sun.misc.Unsafe;
> 
> import java.io.File;
> import java.io.IOException;
> import java.io.RandomAccessFile;
> import java.lang.reflect.Field;
> import java.nio.MappedByteBuffer;
> import java.nio.channels.FileChannel;
> import java.nio.file.Files;
> import java.nio.file.StandardOpenOption;
> 
> class IpcBase implements AutoCloseable {
>     private static final long PADDING = 64;
>     static final long READER_COUNTER_OFFSET = PADDING;
>     static final long WRITER_COUNTER_OFFSET = READER_COUNTER_OFFSET + PADDING;
>     static final long FLAG_OFFSET = WRITER_COUNTER_OFFSET + PADDING;
>     static final long TOTAL_LENGTH = FLAG_OFFSET + PADDING;
> 
>     private static final Unsafe UNSAFE;
>     static {
>         try {
>             final Field field = Unsafe.class.getDeclaredField("theUnsafe");
>             field.setAccessible(true);
>             UNSAFE = (Unsafe) field.get(null);
>         } catch (final Exception ex) {
>             throw new RuntimeException(ex);
>         }
>     }
> 
>     private final FileChannel channel;
>     private final MappedByteBuffer buffer;
>     private final long baseAddress;
> 
>     public IpcBase(File file) throws IOException {
>         channel = new RandomAccessFile(file, "rw").getChannel();
>         buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, TOTAL_LENGTH);
>         baseAddress = ((sun.nio.ch.DirectBuffer) buffer).address();
>     }
> 
>     final void putLongVolatile(long offset, long value) {
>         UNSAFE.putLongVolatile(null, baseAddress + offset, value);
>     }
> 
>     final long getLongVolatile(long offset) {
>         return UNSAFE.getLongVolatile(null, baseAddress + offset);
>     }
> 
>     @Override
>     public void close() throws Exception {
>         channel.close();
>     }
> }
> 
> class Writer extends IpcBase {
>     public Writer(File file) throws IOException {
>         super(file);
>     }
> 
>     @Override
>     public void close() throws Exception {
>         putLongVolatile(FLAG_OFFSET, 1L);
>         super.close();
>     }
> 
>     long writerCounter;
> 
>     public long write() {
>         writerCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         putLongVolatile(WRITER_COUNTER_OFFSET, ++writerCounter);
>         return writerCounter;
>     }
> }
> 
> class Reader extends IpcBase {
>     public Reader(File file) throws IOException {
>         super(file);
>     }
> 
>     long readCounter;
>     long writeCounter;
> 
>     public boolean read() {
>         readCounter = getLongVolatile(READER_COUNTER_OFFSET);
>         writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         if (readCounter == writeCounter && isFlagSet()) {
>             return false;
>         } else if (readCounter < writeCounter) {
>             putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
>         }
>         return true;
>     }
> 
>     private boolean isFlagSet() {
>         return 1L == getLongVolatile(FLAG_OFFSET);
>     }
> }
> 
> public class IpcBugTest {
>     public static void main(String[] args) throws Exception {
>         File file = createFile();
>         for (int i = 0; i < 50_000; i++) {
>             testIPC(file);
>         }
>         System.out.println("NO BUG EXISTS! :-)");
>     }
> 
>     private static File createFile() throws IOException {
>         File file = File.createTempFile("ipc-", ".dat");
>         file.deleteOnExit();
>         return file;
>     }
> 
>     private static void testIPC(final File file) throws Exception {
>         writeZeros(file);
> 
>         final Writer writer = new Writer(file);
>         Thread writerThread = new Thread("WRITER") {
>             @Override
>             public void run() {
>                 try {
>                     try {
>                         while (writer.write() < 42) ;
>                     } finally {
>                         writer.close();
>                     }
>                 } catch (Exception ex) {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         writerThread.start();
> 
>         final Reader reader = new Reader(file);
>         Thread readerThread = new Thread("READER") {
>             @Override
>             public void run() {
>                 try {
>                     try {
>                         while (reader.read()) ;
>                     } finally {
>                         reader.close();
>                     }
>                 } catch (Exception ex) {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         readerThread.start();
> 
>         writerThread.join();
>         readerThread.join();
> 
>         if (reader.readCounter != writer.writerCounter) {
>             throw new Error("Reader exited prematurely: Reader.readCounter=" + reader.readCounter
>                     + ", Reader.writeCounter=" + reader.writeCounter
>                     + ", Writer.writeCounter=" + writer.writerCounter);
>         }
>     }
> 
>     private static void writeZeros(File file) throws IOException {
>         Files.write(file.toPath(), new byte[(int) IpcBase.TOTAL_LENGTH], StandardOpenOption.WRITE);
>     }
> }
> 
> This test fails on my Windows 7 machine using JDK 7u67 and JDK 8u20 (older Intel i5). It also fails on Intel i7 IvyBridge machine that is running Ubuntu 14.04 on same JDKs. The failure is random, i.e. I've observed it fail on first iteration, 600, after 10K iterations etc. but it fails eventually on each run I tried.
> 
> The failure message shown is:
>     java.lang.Error: Reader exited prematurely: Reader.readCounter=0, Reader.writeCounter=0, Writer.writeCounter=42
> 
> In this case the reader thread observed both counters to have the same value 0 and endOfDataFlag is set. However writer thread writes endOfDataFlag after final writerCounter (i.e. 42) is being written.
> 
> I hope the bug is in my code. So please tell me how wrong I am! ;)
> 
> Best regards,
> Dmitry Vyazelenko
> 
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
> 
> 
> 
> 
> 

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140908/42919b95/attachment-0001.html>


More information about the Concurrency-interest mailing list