[concurrency-interest] Unsafe + DirectBuffer concurrency bug?

Martin Thompson mjpt777 at gmail.com
Mon Sep 8 16:16:04 EDT 2014


The version below fails for me every time but at different iteration values.

Regards,
Martin...

import sun.misc.Unsafe;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;

class IpcBase implements AutoCloseable
{
    private static final long PADDING = 64;
    static final long READER_COUNTER_OFFSET = PADDING;
    static final long WRITER_COUNTER_OFFSET = READER_COUNTER_OFFSET +
PADDING;
    static final long FLAG_OFFSET = WRITER_COUNTER_OFFSET + PADDING;
    static final long TOTAL_LENGTH = FLAG_OFFSET + PADDING;

    private static final Unsafe UNSAFE;

    static
    {
        try
        {
            final Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            UNSAFE = (Unsafe)field.get(null);
        }
        catch (final Exception ex)
        {
            throw new RuntimeException(ex);
        }
    }

    private final FileChannel channel;
    private final MappedByteBuffer buffer; // hold reference to prevent GC
    private final long baseAddress;

    public IpcBase(final File file) throws IOException
    {
        channel = new RandomAccessFile(file, "rw").getChannel();
        buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0,
TOTAL_LENGTH);
        baseAddress = ((sun.nio.ch.DirectBuffer)buffer).address();
    }

    final void putLongVolatile(final long offset, final long value)
    {
        UNSAFE.putLongVolatile(null, baseAddress + offset, value);
    }

    final long getLongVolatile(final long offset)
    {
        return UNSAFE.getLongVolatile(null, baseAddress + offset);
    }

    @Override
    public void close() throws Exception
    {
        channel.close();
    }
}

class Writer extends IpcBase
{
    long writerCounter;

    public Writer(final File file) throws IOException
    {
        super(file);
    }

    @Override
    public void close() throws Exception
    {
        putLongVolatile(FLAG_OFFSET, 1L);
        super.close();
    }

    public long write()
    {
        writerCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
        putLongVolatile(WRITER_COUNTER_OFFSET, ++writerCounter);
        return writerCounter;
    }
}

class Reader extends IpcBase
{
    long readCounter;
    long writeCounter;

    public Reader(final File file) throws IOException
    {
        super(file);
    }

    public boolean read()
    {
        readCounter = getLongVolatile(READER_COUNTER_OFFSET);
        writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
        if (readCounter == writeCounter && isFlagSet())
        {
            return false;
        }
        else if (readCounter < writeCounter)
        {
            putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
        }

        return true;
    }

    private boolean isFlagSet()
    {
        return 1L == getLongVolatile(FLAG_OFFSET);
    }
}

public class IpcBugTest
{
    public static int iteration = 0;

    public static void main(String[] args) throws Exception
    {
        final File file = createFile();

        for (int i = 0; i < 1_000_000; i++)
        {
            iteration = i;
            testIpc(file);
        }

        System.out.println("NO BUG HERE! :-)");
    }

    private static File createFile() throws IOException
    {
        final File file = File.createTempFile("ipc-", ".dat");
        file.deleteOnExit();

        return file;
    }

    private static void testIpc(final File file) throws Exception
    {
        writeZeros(file);

        final Reader reader = new Reader(file);
        final Thread readerThread = new Thread("READER")
        {
            @Override
            public void run()
            {
                try
                {
                    try
                    {
                        while (reader.read())
                        {
                            // busy spin
                        }
                    }
                    finally
                    {
                        reader.close();
                    }
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }
            }
        };
        readerThread.start();

        final Writer writer = new Writer(file);
        final Thread writerThread = new Thread("WRITER")
        {
            @Override
            public void run()
            {
                try
                {
                    try
                    {
                        while (writer.write() < 42)
                        {
                            // busy spin
                        }
                    }
                    finally
                    {
                        writer.close();
                    }
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }
            }
        };
        writerThread.start();

        writerThread.join();
        readerThread.join();

        if (reader.readCounter != writer.writerCounter)
        {
            throw new Error(
                "Reader exited prematurely: Reader.readCounter=" +
reader.readCounter +
                ", Reader.writeCounter=" + reader.writeCounter +
                ", Writer.writeCounter=" + writer.writerCounter +
                ", iteration=" + iteration);
        }
    }

    private static void writeZeros(final File file) throws IOException
    {
        Files.write(file.toPath(), new byte[(int)IpcBase.TOTAL_LENGTH],
StandardOpenOption.WRITE);
    }
}

Date: Mon, 8 Sep 2014 08:08:29 -0700
> From: Dmitry Vyazelenko <vyazelenko at yahoo.com>
> To: "concurrency-interest at cs.oswego.edu"
>         <concurrency-interest at cs.oswego.edu>
> Subject: [concurrency-interest] Unsafe + DirectBuffer concurrency bug?
> Message-ID:
>         <1410188909.78423.YahooMailNeo at web162206.mail.bf1.yahoo.com>
> Content-Type: text/plain; charset="utf-8"
>
> Hi all,
>
> Please find standalone test that points to potential Unsafe/DirectBuffer
> issue. The test uses memory mapped file (DirectBuffer) and Unsafe to
> communicate between reader and writer.
> The data layout within the buffer:
> <64 bytes padding><reader counter><64 bytes padding><writer counter><64
> bytes padding><end of data flag><64 bytes padding>
> Total number of bytes is 256.
>
> The Writer thread keeps incrementing writerCounter until it reaches value
> 42. At which point it also writes endOfDataFlag and closes the FileChannel.
>
> The Reader threads reads both readerCounter and writerCounter. If both
> have the same value and endOfDataFlag is set then it exits. Otherwise it
> increments readerCounter.
>
> Below is the complete test:
>
> package ipc_bug;
>
> import sun.misc.Unsafe;
>
> import java.io.File;
> import java.io.IOException;
> import java.io.RandomAccessFile;
> import java.lang.reflect.Field;
> import java.nio.MappedByteBuffer;
> import java.nio.channels.FileChannel;
> import java.nio.file.Files;
> import java.nio.file.StandardOpenOption;
>
> class IpcBase implements AutoCloseable {
>     private static final long PADDING = 64;
>     static final long READER_COUNTER_OFFSET = PADDING;
>     static final long WRITER_COUNTER_OFFSET = READER_COUNTER_OFFSET +
> PADDING;
>     static final long FLAG_OFFSET = WRITER_COUNTER_OFFSET + PADDING;
>     static final long TOTAL_LENGTH = FLAG_OFFSET + PADDING;
>
>     private static final Unsafe UNSAFE;
>     static {
>         try {
>             final Field field = Unsafe.class.getDeclaredField("theUnsafe");
>             field.setAccessible(true);
>             UNSAFE = (Unsafe) field.get(null);
>         } catch (final Exception ex) {
>             throw new RuntimeException(ex);
>         }
>     }
>
>     private final FileChannel channel;
>     private final MappedByteBuffer buffer;
>     private final long baseAddress;
>
>     public IpcBase(File file) throws IOException {
>         channel = new RandomAccessFile(file, "rw").getChannel();
>         buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0,
> TOTAL_LENGTH);
>         baseAddress = ((sun.nio.ch.DirectBuffer) buffer).address();
>     }
>
>     final void putLongVolatile(long offset, long value) {
>         UNSAFE.putLongVolatile(null, baseAddress + offset, value);
>     }
>
>     final long getLongVolatile(long offset) {
>         return UNSAFE.getLongVolatile(null, baseAddress + offset);
>     }
>
>     @Override
>     public void close() throws Exception {
>         channel.close();
>     }
> }
>
> class Writer extends IpcBase {
>     public Writer(File file) throws IOException {
>         super(file);
>     }
>
>     @Override
>     public void close() throws Exception {
>         putLongVolatile(FLAG_OFFSET, 1L);
>         super.close();
>     }
>
>     long writerCounter;
>
>     public long write() {
>         writerCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         putLongVolatile(WRITER_COUNTER_OFFSET, ++writerCounter);
>         return writerCounter;
>     }
> }
>
> class Reader extends IpcBase {
>     public Reader(File file) throws IOException {
>         super(file);
>     }
>
>     long readCounter;
>     long writeCounter;
>
>     public boolean read() {
>         readCounter = getLongVolatile(READER_COUNTER_OFFSET);
>         writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
>         if (readCounter == writeCounter && isFlagSet()) {
>             return false;
>         } else if (readCounter < writeCounter) {
>             putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
>         }
>         return true;
>     }
>
>     private boolean isFlagSet() {
>         return 1L == getLongVolatile(FLAG_OFFSET);
>     }
> }
>
> public class IpcBugTest {
>     public static void main(String[] args) throws Exception {
>         File file = createFile();
>         for (int i = 0; i < 50_000; i++) {
>             testIPC(file);
>         }
>         System.out.println("NO BUG EXISTS! :-)");
>     }
>
>     private static File createFile() throws IOException {
>         File file = File.createTempFile("ipc-", ".dat");
>         file.deleteOnExit();
>         return file;
>     }
>
>     private static void testIPC(final File file) throws Exception {
>         writeZeros(file);
>
>         final Writer writer = new Writer(file);
>         Thread writerThread = new Thread("WRITER") {
>             @Override
>             public void run() {
>                 try {
>                     try {
>                         while (writer.write() < 42) ;
>                     } finally {
>                         writer.close();
>                     }
>                 } catch (Exception ex) {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         writerThread.start();
>
>         final Reader reader = new Reader(file);
>         Thread readerThread = new Thread("READER") {
>             @Override
>             public void run() {
>                 try {
>                     try {
>                         while (reader.read()) ;
>                     } finally {
>                         reader.close();
>                     }
>                 } catch (Exception ex) {
>                     ex.printStackTrace();
>                 }
>             }
>         };
>         readerThread.start();
>
>         writerThread.join();
>         readerThread.join();
>
>         if (reader.readCounter != writer.writerCounter) {
>             throw new Error("Reader exited prematurely:
> Reader.readCounter=" + reader.readCounter
>                     + ", Reader.writeCounter=" + reader.writeCounter
>                     + ", Writer.writeCounter=" + writer.writerCounter);
>         }
>     }
>
>     private static void writeZeros(File file) throws IOException {
>         Files.write(file.toPath(), new byte[(int) IpcBase.TOTAL_LENGTH],
> StandardOpenOption.WRITE);
>     }
> }
>
> This test fails on my Windows 7 machine using JDK 7u67 and JDK 8u20 (older
> Intel i5). It also fails on Intel i7 IvyBridge machine that is running
> Ubuntu 14.04 on same JDKs. The failure is random, i.e. I've observed it
> fail on first iteration, 600, after 10K iterations etc. but it fails
> eventually on each run I tried.
>
> The failure message shown is:
>     java.lang.Error: Reader exited prematurely: Reader.readCounter=0,
> Reader.writeCounter=0, Writer.writeCounter=42
>
> In this case the reader thread observed both counters to have the same
> value 0 and endOfDataFlag is set. However writer thread writes
> endOfDataFlag after final writerCounter (i.e. 42) is being written.
>
> I hope the bug is in my code. So please tell me how wrong I am! ;)
>
> Best regards,
> Dmitry Vyazelenko
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140908/e6a41d82/attachment-0001.html>


More information about the Concurrency-interest mailing list