[concurrency-interest] Unsafe + DirectBuffer concurrency bug?

Dmitry Vyazelenko vyazelenko at yahoo.com
Mon Sep 8 11:08:29 EDT 2014


Hi all,

Please find standalone test that points to potential Unsafe/DirectBuffer issue. The test uses memory mapped file (DirectBuffer) and Unsafe to communicate between reader and writer. 
The data layout within the buffer:
<64 bytes padding><reader counter><64 bytes padding><writer counter><64 bytes padding><end of data flag><64 bytes padding>
Total number of bytes is 256.

The Writer thread keeps incrementing writerCounter until it reaches value 42. At which point it also writes endOfDataFlag and closes the FileChannel.

The Reader threads reads both readerCounter and writerCounter. If both have the same value and endOfDataFlag is set then it exits. Otherwise it increments readerCounter.

Below is the complete test:

package ipc_bug;

import sun.misc.Unsafe;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Field;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;

class IpcBase implements AutoCloseable {
    private static final long PADDING = 64;
    static final long READER_COUNTER_OFFSET = PADDING;
    static final long WRITER_COUNTER_OFFSET = READER_COUNTER_OFFSET + PADDING;
    static final long FLAG_OFFSET = WRITER_COUNTER_OFFSET + PADDING;
    static final long TOTAL_LENGTH = FLAG_OFFSET + PADDING;

    private static final Unsafe UNSAFE;
    static {
        try {
            final Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            UNSAFE = (Unsafe) field.get(null);
        } catch (final Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private final FileChannel channel;
    private final MappedByteBuffer buffer;
    private final long baseAddress;

    public IpcBase(File file) throws IOException {
        channel = new RandomAccessFile(file, "rw").getChannel();
        buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, TOTAL_LENGTH);
        baseAddress = ((sun.nio.ch.DirectBuffer) buffer).address();
    }

    final void putLongVolatile(long offset, long value) {
        UNSAFE.putLongVolatile(null, baseAddress + offset, value);
    }

    final long getLongVolatile(long offset) {
        return UNSAFE.getLongVolatile(null, baseAddress + offset);
    }

    @Override
    public void close() throws Exception {
        channel.close();
    }
}

class Writer extends IpcBase {
    public Writer(File file) throws IOException {
        super(file);
    }

    @Override
    public void close() throws Exception {
        putLongVolatile(FLAG_OFFSET, 1L);
        super.close();
    }

    long writerCounter;

    public long write() {
        writerCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
        putLongVolatile(WRITER_COUNTER_OFFSET, ++writerCounter);
        return writerCounter;
    }
}

class Reader extends IpcBase {
    public Reader(File file) throws IOException {
        super(file);
    }

    long readCounter;
    long writeCounter;

    public boolean read() {
        readCounter = getLongVolatile(READER_COUNTER_OFFSET);
        writeCounter = getLongVolatile(WRITER_COUNTER_OFFSET);
        if (readCounter == writeCounter && isFlagSet()) {
            return false;
        } else if (readCounter < writeCounter) {
            putLongVolatile(READER_COUNTER_OFFSET, ++readCounter);
        }
        return true;
    }

    private boolean isFlagSet() {
        return 1L == getLongVolatile(FLAG_OFFSET);
    }
}

public class IpcBugTest {
    public static void main(String[] args) throws Exception {
        File file = createFile();
        for (int i = 0; i < 50_000; i++) {
            testIPC(file);
        }
        System.out.println("NO BUG EXISTS! :-)");
    }

    private static File createFile() throws IOException {
        File file = File.createTempFile("ipc-", ".dat");
        file.deleteOnExit();
        return file;
    }

    private static void testIPC(final File file) throws Exception {
        writeZeros(file);

        final Writer writer = new Writer(file);
        Thread writerThread = new Thread("WRITER") {
            @Override
            public void run() {
                try {
                    try {
                        while (writer.write() < 42) ;
                    } finally {
                        writer.close();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        writerThread.start();

        final Reader reader = new Reader(file);
        Thread readerThread = new Thread("READER") {
            @Override
            public void run() {
                try {
                    try {
                        while (reader.read()) ;
                    } finally {
                        reader.close();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        };
        readerThread.start();

        writerThread.join();
        readerThread.join();

        if (reader.readCounter != writer.writerCounter) {
            throw new Error("Reader exited prematurely: Reader.readCounter=" + reader.readCounter
                    + ", Reader.writeCounter=" + reader.writeCounter
                    + ", Writer.writeCounter=" + writer.writerCounter);
        }
    }

    private static void writeZeros(File file) throws IOException {
        Files.write(file.toPath(), new byte[(int) IpcBase.TOTAL_LENGTH], StandardOpenOption.WRITE);
    }
}

This test fails on my Windows 7 machine using JDK 7u67 and JDK 8u20 (older Intel i5). It also fails on Intel i7 IvyBridge machine that is running Ubuntu 14.04 on same JDKs. The failure is random, i.e. I've observed it fail on first iteration, 600, after 10K iterations etc. but it fails eventually on each run I tried. 

The failure message shown is:
    java.lang.Error: Reader exited prematurely: Reader.readCounter=0, Reader.writeCounter=0, Writer.writeCounter=42

In this case the reader thread observed both counters to have the same value 0 and endOfDataFlag is set. However writer thread writes endOfDataFlag after final writerCounter (i.e. 42) is being written.

I hope the bug is in my code. So please tell me how wrong I am! ;)
 
Best regards,
Dmitry Vyazelenko
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140908/bb93daf8/attachment-0001.html>


More information about the Concurrency-interest mailing list