[concurrency-interest] Unsafe + DirectBuffer concurrency bug?

Vitaly Davidovich vitalyd at gmail.com
Mon Sep 8 17:10:53 EDT 2014


Just to rule some things out, what happens if you fsync the file as part of
writeZeros?

On Mon, Sep 8, 2014 at 4:16 PM, Martin Thompson <mjpt777 at gmail.com> wrote:

> The version below fails for me every time but at different iteration
> values.
>
> Regards,
> Martin...
>
> import sun.misc.Unsafe;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.RandomAccessFile;
> import java.lang.reflect.Field;
> import java.nio.MappedByteBuffer;
> import java.nio.channels.FileChannel;
> import java.nio.file.Files;
> import java.nio.file.StandardOpenOption;
>
> class IpcBase implements AutoCloseable
> {
>     private static final long PADDING = 64;
>     static final long READER_COUNTER_OFFSET = PADDING;
>     static final long WRITER_COUNTER_OFFSET = READER_COUNTER_OFFSET +
> PADDING;
>     static final long FLAG_OFFSET = WRITER_COUNTER_OFFSET + PADDING;
>     static final long TOTAL_LENGTH = FLAG_OFFSET + PADDING;
>
>     private static final Unsafe UNSAFE;
>
>     static
>     {
>         try
>         {
>             final Field field = Unsafe.class.getDeclaredField("theUnsafe");
>             field.setAccessible(true);
>             UNSAFE = (Unsafe)field.get(null);
>         }
>         catch (final Exception ex)
>         {
>             throw new RuntimeException(ex);
>         }
>     }
>
>     private final FileChannel channel;
>     private final MappedByteBuffer buffer; // hold reference to prevent GC
>     private final long baseAddress;
>
>     public IpcBase(final File file) throws IOException
>     {
>         channel = new RandomAccessFile(file, "rw").getChannel();
>         buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0,
> TOTAL_LENGTH);
>         baseAddress = ((sun.nio.ch.DirectBuffer)buffer).address();
>     }
>
>     final void putLongVolatile(final long offset, final long value)
>     {
>         UNSAFE.putLongVolatile(null, baseAddress + offset, value);
>     }
>
>     final long getLongVolatile(final long offset)
>     {
>         return UNSAFE.getLongVolatile(null, baseAddress + offset);
>     }
>
>     @Override
>     public void close() throws Exception
>     {
>         channel.close();
>     }
> }
>
> class Writer extends IpcBase
> {
>     long writerCounter;
>
>     public Writer(final File file) throws IOException
>     {
>         super(file);
>     }
>
>     @Override
>     public void close() throws Exception
>     {
>         putLongVolatile(FLAG_OFFSET, 1L);
>         super.close();
>     }
>
>     public long write()
>     {
>         writerCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         putLongVolatile(WRITER_COUNTER_OFFSET, ++writerCounter);
>         return writerCounter;
>     }
> }
>
> class Reader extends IpcBase
> {
>     long readCounter;
>     long writeCounter;
>
>     public Reader(final File file) throws IOException
>     {
>         super(file);
>     }
>
>     public boolean read()
>     {
>         readCounter = getLongVolatile(READER_COUNTER_OFFSET);
>         writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         if (readCounter == writeCounter && isFlagSet())
>         {
>             return false;
>         }
>         else if (readCounter < writeCounter)
>         {
>             putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
>         }
>
>         return true;
>     }
>
>     private boolean isFlagSet()
>     {
>         return 1L == getLongVolatile(FLAG_OFFSET);
>     }
> }
>
> public class IpcBugTest
> {
>     public static int iteration = 0;
>
>     public static void main(String[] args) throws Exception
>     {
>         final File file = createFile();
>
>         for (int i = 0; i < 1_000_000; i++)
>         {
>             iteration = i;
>             testIpc(file);
>         }
>
>         System.out.println("NO BUG HERE! :-)");
>     }
>
>     private static File createFile() throws IOException
>     {
>         final File file = File.createTempFile("ipc-", ".dat");
>         file.deleteOnExit();
>
>         return file;
>     }
>
>     private static void testIpc(final File file) throws Exception
>     {
>         writeZeros(file);
>
>         final Reader reader = new Reader(file);
>         final Thread readerThread = new Thread("READER")
>         {
>             @Override
>             public void run()
>             {
>                 try
>                 {
>                     try
>                     {
>                         while (reader.read())
>                         {
>                             // busy spin
>                         }
>                     }
>                     finally
>                     {
>                         reader.close();
>                     }
>                 }
>                 catch (Exception ex)
>                 {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         readerThread.start();
>
>         final Writer writer = new Writer(file);
>         final Thread writerThread = new Thread("WRITER")
>         {
>             @Override
>             public void run()
>             {
>                 try
>                 {
>                     try
>                     {
>                         while (writer.write() < 42)
>                         {
>                             // busy spin
>                         }
>                     }
>                     finally
>                     {
>                         writer.close();
>                     }
>                 }
>                 catch (Exception ex)
>                 {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         writerThread.start();
>
>         writerThread.join();
>         readerThread.join();
>
>         if (reader.readCounter != writer.writerCounter)
>         {
>             throw new Error(
>                 "Reader exited prematurely: Reader.readCounter=" +
> reader.readCounter +
>                 ", Reader.writeCounter=" + reader.writeCounter +
>                 ", Writer.writeCounter=" + writer.writerCounter +
>                 ", iteration=" + iteration);
>         }
>     }
>
>     private static void writeZeros(final File file) throws IOException
>     {
>         Files.write(file.toPath(), new byte[(int)IpcBase.TOTAL_LENGTH],
> StandardOpenOption.WRITE);
>     }
> }
>
> Date: Mon, 8 Sep 2014 08:08:29 -0700
>> From: Dmitry Vyazelenko <vyazelenko at yahoo.com>
>> To: "concurrency-interest at cs.oswego.edu"
>>         <concurrency-interest at cs.oswego.edu>
>> Subject: [concurrency-interest] Unsafe + DirectBuffer concurrency bug?
>> Message-ID:
>>         <1410188909.78423.YahooMailNeo at web162206.mail.bf1.yahoo.com>
>> Content-Type: text/plain; charset="utf-8"
>>
>> Hi all,
>>
>> Please find standalone test that points to potential Unsafe/DirectBuffer
>> issue. The test uses memory mapped file (DirectBuffer) and Unsafe to
>> communicate between reader and writer.
>> The data layout within the buffer:
>> <64 bytes padding><reader counter><64 bytes padding><writer counter><64
>> bytes padding><end of data flag><64 bytes padding>
>> Total number of bytes is 256.
>>
>> The Writer thread keeps incrementing writerCounter until it reaches value
>> 42. At which point it also writes endOfDataFlag and closes the FileChannel.
>>
>> The Reader threads reads both readerCounter and writerCounter. If both
>> have the same value and endOfDataFlag is set then it exits. Otherwise it
>> increments readerCounter.
>>
>> Below is the complete test:
>>
>> package ipc_bug;
>>
>> import sun.misc.Unsafe;
>>
>> import java.io.File;
>> import java.io.IOException;
>> import java.io.RandomAccessFile;
>> import java.lang.reflect.Field;
>> import java.nio.MappedByteBuffer;
>> import java.nio.channels.FileChannel;
>> import java.nio.file.Files;
>> import java.nio.file.StandardOpenOption;
>>
>> class IpcBase implements AutoCloseable {
>>     private static final long PADDING = 64;
>>     static final long READER_COUNTER_OFFSET = PADDING;
>>     static final long WRITER_COUNTER_OFFSET = READER_COUNTER_OFFSET +
>> PADDING;
>>     static final long FLAG_OFFSET = WRITER_COUNTER_OFFSET + PADDING;
>>     static final long TOTAL_LENGTH = FLAG_OFFSET + PADDING;
>>
>>     private static final Unsafe UNSAFE;
>>     static {
>>         try {
>>             final Field field =
>> Unsafe.class.getDeclaredField("theUnsafe");
>>             field.setAccessible(true);
>>             UNSAFE = (Unsafe) field.get(null);
>>         } catch (final Exception ex) {
>>             throw new RuntimeException(ex);
>>         }
>>     }
>>
>>     private final FileChannel channel;
>>     private final MappedByteBuffer buffer;
>>     private final long baseAddress;
>>
>>     public IpcBase(File file) throws IOException {
>>         channel = new RandomAccessFile(file, "rw").getChannel();
>>         buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0,
>> TOTAL_LENGTH);
>>         baseAddress = ((sun.nio.ch.DirectBuffer) buffer).address();
>>     }
>>
>>     final void putLongVolatile(long offset, long value) {
>>         UNSAFE.putLongVolatile(null, baseAddress + offset, value);
>>     }
>>
>>     final long getLongVolatile(long offset) {
>>         return UNSAFE.getLongVolatile(null, baseAddress + offset);
>>     }
>>
>>     @Override
>>     public void close() throws Exception {
>>         channel.close();
>>     }
>> }
>>
>> class Writer extends IpcBase {
>>     public Writer(File file) throws IOException {
>>         super(file);
>>     }
>>
>>     @Override
>>     public void close() throws Exception {
>>         putLongVolatile(FLAG_OFFSET, 1L);
>>         super.close();
>>     }
>>
>>     long writerCounter;
>>
>>     public long write() {
>>         writerCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>>         putLongVolatile(WRITER_COUNTER_OFFSET, ++writerCounter);
>>         return writerCounter;
>>     }
>> }
>>
>> class Reader extends IpcBase {
>>     public Reader(File file) throws IOException {
>>         super(file);
>>     }
>>
>>     long readCounter;
>>     long writeCounter;
>>
>>     public boolean read() {
>>         readCounter = getLongVolatile(READER_COUNTER_OFFSET);
>>         writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>>         if (readCounter == writeCounter && isFlagSet()) {
>>             return false;
>>         } else if (readCounter < writeCounter) {
>>             putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
>>         }
>>         return true;
>>     }
>>
>>     private boolean isFlagSet() {
>>         return 1L == getLongVolatile(FLAG_OFFSET);
>>     }
>> }
>>
>> public class IpcBugTest {
>>     public static void main(String[] args) throws Exception {
>>         File file = createFile();
>>         for (int i = 0; i < 50_000; i++) {
>>             testIPC(file);
>>         }
>>         System.out.println("NO BUG EXISTS! :-)");
>>     }
>>
>>     private static File createFile() throws IOException {
>>         File file = File.createTempFile("ipc-", ".dat");
>>         file.deleteOnExit();
>>         return file;
>>     }
>>
>>     private static void testIPC(final File file) throws Exception {
>>         writeZeros(file);
>>
>>         final Writer writer = new Writer(file);
>>         Thread writerThread = new Thread("WRITER") {
>>             @Override
>>             public void run() {
>>                 try {
>>                     try {
>>                         while (writer.write() < 42) ;
>>                     } finally {
>>                         writer.close();
>>                     }
>>                 } catch (Exception ex) {
>>                     ex.printStackTrace();
>>                 }
>>             }
>>         };
>>         writerThread.start();
>>
>>         final Reader reader = new Reader(file);
>>         Thread readerThread = new Thread("READER") {
>>             @Override
>>             public void run() {
>>                 try {
>>                     try {
>>                         while (reader.read()) ;
>>                     } finally {
>>                         reader.close();
>>                     }
>>                 } catch (Exception ex) {
>>                     ex.printStackTrace();
>>                 }
>>             }
>>         };
>>         readerThread.start();
>>
>>         writerThread.join();
>>         readerThread.join();
>>
>>         if (reader.readCounter != writer.writerCounter) {
>>             throw new Error("Reader exited prematurely:
>> Reader.readCounter=" + reader.readCounter
>>                     + ", Reader.writeCounter=" + reader.writeCounter
>>                     + ", Writer.writeCounter=" + writer.writerCounter);
>>         }
>>     }
>>
>>     private static void writeZeros(File file) throws IOException {
>>         Files.write(file.toPath(), new byte[(int) IpcBase.TOTAL_LENGTH],
>> StandardOpenOption.WRITE);
>>     }
>> }
>>
>> This test fails on my Windows 7 machine using JDK 7u67 and JDK 8u20
>> (older Intel i5). It also fails on Intel i7 IvyBridge machine that is
>> running Ubuntu 14.04 on same JDKs. The failure is random, i.e. I've
>> observed it fail on first iteration, 600, after 10K iterations etc. but it
>> fails eventually on each run I tried.
>>
>> The failure message shown is:
>>     java.lang.Error: Reader exited prematurely: Reader.readCounter=0,
>> Reader.writeCounter=0, Writer.writeCounter=42
>>
>> In this case the reader thread observed both counters to have the same
>> value 0 and endOfDataFlag is set. However writer thread writes
>> endOfDataFlag after final writerCounter (i.e. 42) is being written.
>>
>> I hope the bug is in my code. So please tell me how wrong I am! ;)
>>
>> Best regards,
>> Dmitry Vyazelenko
>>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140908/98e77cba/attachment-0001.html>


More information about the Concurrency-interest mailing list