[concurrency-interest] Unsafe + DirectBuffer concurrency bug?

Martin Thompson mjpt777 at gmail.com
Mon Sep 8 17:17:47 EDT 2014


Thanks Nitsan I think you are correct here. I'll have another look tomorrow
morning after coffee :-)

On 8 September 2014 21:53, Nitsan Wakart <nitsanw at yahoo.com> wrote:

> 1    readCounter = getLongVolatile(READER_COUNTER_OFFSET);
> 2      writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
> 3      if (readCounter == writeCounter && isFlagSet()) {
> 4           return false;
> 5       } else if (readCounter < writeCounter) {
> 6           putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
> 7       }
>
> It is possible for 1,2 to execute and see 0, then have the writer thread
> blaze through it's loop and set the flag to 1before you hit 3.
> The HB relationship assumption is in the wrong order, you should read flag
> first and flag visibility necessitates the counter visibility.
>
>
>   On Monday, September 8, 2014 10:23 PM, Martin Thompson <
> mjpt777 at gmail.com> wrote:
>
>
> The version below fails for me every time but at different iteration
> values.
>
> Regards,
> Martin...
>
> import sun.misc.Unsafe;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.RandomAccessFile;
> import java.lang.reflect.Field;
> import java.nio.MappedByteBuffer;
> import java.nio.channels.FileChannel;
> import java.nio.file.Files;
> import java.nio.file.StandardOpenOption;
>
> class IpcBase implements AutoCloseable
> {
>     private static final long PADDING = 64;
>     static final long READER_COUNTER_OFFSET = PADDING;
>     static final long WRITER_COUNTER_OFFSET = READER_COUNTER_OFFSET +
> PADDING;
>     static final long FLAG_OFFSET = WRITER_COUNTER_OFFSET + PADDING;
>     static final long TOTAL_LENGTH = FLAG_OFFSET + PADDING;
>
>     private static final Unsafe UNSAFE;
>
>     static
>     {
>         try
>         {
>             final Field field = Unsafe.class.getDeclaredField("theUnsafe");
>             field.setAccessible(true);
>             UNSAFE = (Unsafe)field.get(null);
>         }
>         catch (final Exception ex)
>         {
>             throw new RuntimeException(ex);
>         }
>     }
>
>     private final FileChannel channel;
>     private final MappedByteBuffer buffer; // hold reference to prevent GC
>     private final long baseAddress;
>
>     public IpcBase(final File file) throws IOException
>     {
>         channel = new RandomAccessFile(file, "rw").getChannel();
>         buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0,
> TOTAL_LENGTH);
>         baseAddress = ((sun.nio.ch.DirectBuffer)buffer).address();
>     }
>
>     final void putLongVolatile(final long offset, final long value)
>     {
>         UNSAFE.putLongVolatile(null, baseAddress + offset, value);
>     }
>
>     final long getLongVolatile(final long offset)
>     {
>         return UNSAFE.getLongVolatile(null, baseAddress + offset);
>     }
>
>     @Override
>     public void close() throws Exception
>     {
>         channel.close();
>     }
> }
>
> class Writer extends IpcBase
> {
>     long writerCounter;
>
>     public Writer(final File file) throws IOException
>     {
>         super(file);
>     }
>
>     @Override
>     public void close() throws Exception
>     {
>         putLongVolatile(FLAG_OFFSET, 1L);
>         super.close();
>     }
>
>     public long write()
>     {
>         writerCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         putLongVolatile(WRITER_COUNTER_OFFSET, ++writerCounter);
>         return writerCounter;
>     }
> }
>
> class Reader extends IpcBase
> {
>     long readCounter;
>     long writeCounter;
>
>     public Reader(final File file) throws IOException
>     {
>         super(file);
>     }
>
>     public boolean read()
>     {
>         readCounter = getLongVolatile(READER_COUNTER_OFFSET);
>         writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         if (readCounter == writeCounter && isFlagSet())
>         {
>             return false;
>         }
>         else if (readCounter < writeCounter)
>         {
>             putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
>         }
>
>         return true;
>     }
>
>     private boolean isFlagSet()
>     {
>         return 1L == getLongVolatile(FLAG_OFFSET);
>     }
> }
>
> public class IpcBugTest
> {
>     public static int iteration = 0;
>
>     public static void main(String[] args) throws Exception
>     {
>         final File file = createFile();
>
>         for (int i = 0; i < 1_000_000; i++)
>         {
>             iteration = i;
>             testIpc(file);
>         }
>
>         System.out.println("NO BUG HERE! :-)");
>     }
>
>     private static File createFile() throws IOException
>     {
>         final File file = File.createTempFile("ipc-", ".dat");
>         file.deleteOnExit();
>
>         return file;
>     }
>
>     private static void testIpc(final File file) throws Exception
>     {
>         writeZeros(file);
>
>         final Reader reader = new Reader(file);
>         final Thread readerThread = new Thread("READER")
>         {
>             @Override
>             public void run()
>             {
>                 try
>                 {
>                     try
>                     {
>                         while (reader.read())
>                         {
>                             // busy spin
>                         }
>                     }
>                     finally
>                     {
>                         reader.close();
>                     }
>                 }
>                 catch (Exception ex)
>                 {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         readerThread.start();
>
>         final Writer writer = new Writer(file);
>         final Thread writerThread = new Thread("WRITER")
>         {
>             @Override
>             public void run()
>             {
>                 try
>                 {
>                     try
>                     {
>                         while (writer.write() < 42)
>                         {
>                             // busy spin
>                         }
>                     }
>                     finally
>                     {
>                         writer.close();
>                     }
>                 }
>                 catch (Exception ex)
>                 {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         writerThread.start();
>
>         writerThread.join();
>         readerThread.join();
>
>         if (reader.readCounter != writer.writerCounter)
>         {
>             throw new Error(
>                 "Reader exited prematurely: Reader.readCounter=" +
> reader.readCounter +
>                 ", Reader.writeCounter=" + reader.writeCounter +
>                 ", Writer.writeCounter=" + writer.writerCounter +
>                 ", iteration=" + iteration);
>         }
>     }
>
>     private static void writeZeros(final File file) throws IOException
>     {
>         Files.write(file.toPath(), new byte[(int)IpcBase.TOTAL_LENGTH],
> StandardOpenOption.WRITE);
>     }
> }
>
> Date: Mon, 8 Sep 2014 08:08:29 -0700
> From: Dmitry Vyazelenko <vyazelenko at yahoo.com>
> To: "concurrency-interest at cs.oswego.edu"
>         <concurrency-interest at cs.oswego.edu>
> Subject: [concurrency-interest] Unsafe + DirectBuffer concurrency bug?
> Message-ID:
>         <1410188909.78423.YahooMailNeo at web162206.mail.bf1.yahoo.com>
> Content-Type: text/plain; charset="utf-8"
>
> Hi all,
>
> Please find standalone test that points to potential Unsafe/DirectBuffer
> issue. The test uses memory mapped file (DirectBuffer) and Unsafe to
> communicate between reader and writer.
> The data layout within the buffer:
> <64 bytes padding><reader counter><64 bytes padding><writer counter><64
> bytes padding><end of data flag><64 bytes padding>
> Total number of bytes is 256.
>
> The Writer thread keeps incrementing writerCounter until it reaches value
> 42. At which point it also writes endOfDataFlag and closes the FileChannel.
>
> The Reader threads reads both readerCounter and writerCounter. If both
> have the same value and endOfDataFlag is set then it exits. Otherwise it
> increments readerCounter.
>
> Below is the complete test:
>
> package ipc_bug;
>
> import sun.misc.Unsafe;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.RandomAccessFile;
> import java.lang.reflect.Field;
> import java.nio.MappedByteBuffer;
> import java.nio.channels.FileChannel;
> import java.nio.file.Files;
> import java.nio.file.StandardOpenOption;
>
> class IpcBase implements AutoCloseable {
>     private static final long PADDING = 64;
>     static final long READER_COUNTER_OFFSET = PADDING;
>     static final long WRITER_COUNTER_OFFSET = READER_COUNTER_OFFSET +
> PADDING;
>     static final long FLAG_OFFSET = WRITER_COUNTER_OFFSET + PADDING;
>     static final long TOTAL_LENGTH = FLAG_OFFSET + PADDING;
>
>     private static final Unsafe UNSAFE;
>     static {
>         try {
>             final Field field = Unsafe.class.getDeclaredField("theUnsafe");
>             field.setAccessible(true);
>             UNSAFE = (Unsafe) field.get(null);
>         } catch (final Exception ex) {
>             throw new RuntimeException(ex);
>         }
>     }
>
>     private final FileChannel channel;
>     private final MappedByteBuffer buffer;
>     private final long baseAddress;
>
>     public IpcBase(File file) throws IOException {
>         channel = new RandomAccessFile(file, "rw").getChannel();
>         buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0,
> TOTAL_LENGTH);
>         baseAddress = ((sun.nio.ch.DirectBuffer) buffer).address();
>     }
>
>     final void putLongVolatile(long offset, long value) {
>         UNSAFE.putLongVolatile(null, baseAddress + offset, value);
>     }
>
>     final long getLongVolatile(long offset) {
>         return UNSAFE.getLongVolatile(null, baseAddress + offset);
>     }
>
>     @Override
>     public void close() throws Exception {
>         channel.close();
>     }
> }
>
> class Writer extends IpcBase {
>     public Writer(File file) throws IOException {
>         super(file);
>     }
>
>     @Override
>     public void close() throws Exception {
>         putLongVolatile(FLAG_OFFSET, 1L);
>         super.close();
>     }
>
>     long writerCounter;
>
>     public long write() {
>         writerCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         putLongVolatile(WRITER_COUNTER_OFFSET, ++writerCounter);
>         return writerCounter;
>     }
> }
>
> class Reader extends IpcBase {
>     public Reader(File file) throws IOException {
>         super(file);
>     }
>
>     long readCounter;
>     long writeCounter;
>
>     public boolean read() {
>         readCounter = getLongVolatile(READER_COUNTER_OFFSET);
>         writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         if (readCounter == writeCounter && isFlagSet()) {
>             return false;
>         } else if (readCounter < writeCounter) {
>             putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
>         }
>         return true;
>     }
>
>     private boolean isFlagSet() {
>         return 1L == getLongVolatile(FLAG_OFFSET);
>     }
> }
>
> public class IpcBugTest {
>     public static void main(String[] args) throws Exception {
>         File file = createFile();
>         for (int i = 0; i < 50_000; i++) {
>             testIPC(file);
>         }
>         System.out.println("NO BUG EXISTS! :-)");
>     }
>
>     private static File createFile() throws IOException {
>         File file = File.createTempFile("ipc-", ".dat");
>         file.deleteOnExit();
>         return file;
>     }
>
>     private static void testIPC(final File file) throws Exception {
>         writeZeros(file);
>
>         final Writer writer = new Writer(file);
>         Thread writerThread = new Thread("WRITER") {
>             @Override
>             public void run() {
>                 try {
>                     try {
>                         while (writer.write() < 42) ;
>                     } finally {
>                         writer.close();
>                     }
>                 } catch (Exception ex) {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         writerThread.start();
>
>         final Reader reader = new Reader(file);
>         Thread readerThread = new Thread("READER") {
>             @Override
>             public void run() {
>                 try {
>                     try {
>                         while (reader.read()) ;
>                     } finally {
>                         reader.close();
>                     }
>                 } catch (Exception ex) {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         readerThread.start();
>
>         writerThread.join();
>         readerThread.join();
>
>         if (reader.readCounter != writer.writerCounter) {
>             throw new Error("Reader exited prematurely:
> Reader.readCounter=" + reader.readCounter
>                     + ", Reader.writeCounter=" + reader.writeCounter
>                     + ", Writer.writeCounter=" + writer.writerCounter);
>         }
>     }
>
>     private static void writeZeros(File file) throws IOException {
>         Files.write(file.toPath(), new byte[(int) IpcBase.TOTAL_LENGTH],
> StandardOpenOption.WRITE);
>     }
> }
>
> This test fails on my Windows 7 machine using JDK 7u67 and JDK 8u20 (older
> Intel i5). It also fails on Intel i7 IvyBridge machine that is running
> Ubuntu 14.04 on same JDKs. The failure is random, i.e. I've observed it
> fail on first iteration, 600, after 10K iterations etc. but it fails
> eventually on each run I tried.
>
> The failure message shown is:
>     java.lang.Error: Reader exited prematurely: Reader.readCounter=0,
> Reader.writeCounter=0, Writer.writeCounter=42
>
> In this case the reader thread observed both counters to have the same
> value 0 and endOfDataFlag is set. However writer thread writes
> endOfDataFlag after final writerCounter (i.e. 42) is being written.
>
> I hope the bug is in my code. So please tell me how wrong I am! ;)
>
> Best regards,
> Dmitry Vyazelenko
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140908/b7e86f58/attachment-0001.html>


More information about the Concurrency-interest mailing list