[concurrency-interest] ConcurrentHashMapV8

Zhong Yu zhong.j.yu at gmail.com
Mon Aug 29 19:25:20 EDT 2011


This is great, a very much needed feature.

About recursive computeIfAbsent() calls (on the same key) - the
behavior can be modeled the same as nested transactions: the thread
that locks the key can update the entry, and a following read in the
same thread will see that value; however the value is only committed,
i.e. visible to other threads, upon the outermost unlock action. All
threads can read the latest committed value of an entry without
locking.

I have just implemented something similar, which I'll attach later.
The code is very straightforward, given the facility of
lock-update-unlock an entry. Actually, maybe ConcurrentHashMap should
expose this facility; value based locking ( lock(v)..unlock(v) ) has
been desired by some people, but there are no well known
implementations.

If ConcurrentHashMap exposes per-entry lock-update-unlock facility,
computeIfAbsent() impl would be so trivial, it doesn't need to be
provided by JDK, everybody can write it.


Zhong Yu

---------------------------------

// code below is released to public domain by the author

public interface Provider<T>
{
    T get();
}

public class ConcurrentScope extends AtomicHashMap<Object,Object>
{
    public <T> T computeIfAbsent(Object key, Provider<T> provider)
    {
        Object val = super.get(key);
        if(val==null)
        {
            if(provider==null)
                return null;

            try( Entry<Object> entry = super.lock(key) )
            {
                val = entry.getValue();
                if(val==null)
                {
                    val = provider.get();  // alien code. may block,
throw, recurse
                    entry.setValue(val);
                }
            } //auto unlock
        }
        return (T)val;
    }
}

// AtomicHashMap.java ---------------------------

/**
 * A concurrent hash map where an entry can be individually locked to
implement atomic operations.
 * Example usages:
 * <pre>
 * import ohm.infra.util.AtomicHashMap;
 * import ohm.infra.util.AtomicHashMap.Entry;
 *
 *     AtomicHashMap&lt;K,V> map = new AtomicHashMap&lt;>();
 *     Entry&lt;V> entry = map.lock(key)
 *     try
 *     {
 *         v1 = entry.getValue();
 *         ...
 *         entry.setValue(v2);
 *     }
 *     finally
 *     {   entry.unlock();    }
 * </pre>
 * <tt>AtomicHashMap.Entry</tt> is <tt>AutoCloseable</tt>, the
try-with-resource syntax would be simpler to use
 * <pre>
 *
 *     try( Entry&lt;V> entry = map.lock(key) ) // auto unlock
 *     {
 *        v1 = entry.getValue();
 *        ...
 *        entry.setValue(v2);
 *     }
 * </pre>
 * <p>
 *     The only way to get an entry for a <tt>key</tt> is through
{@link #lock lock(key)}.
 *     The entry must be used in the same thread that locked it; once
it's unlocked,
 *     the object representing the entry shall not be reused. To get
the entry again,
 *     {@link #lock lock(key)} must be invoked again. Nested
lock-unlock on the same key are allowed.
 *     Upon the outermost unlock action, the value set by the last
<tt>setValue()</tt> is "committed".
 * </p>
 * <p>
 *     {@link #get get(key) } reads the latest committed value without
locking. It's like
 *     a volatile read; its performance is on par with {@link
java.util.concurrent.ConcurrentHashMap#get}.
 * </p>
 * <p>
 *     Implementation note: an AtomicHashMap is internally backed up
by a {@link java.util.concurrent.ConcurrentHashMap}.
 *     Every entry is backed up by a {@link
java.util.concurrent.locks.ReentrantLock}.
 *     To save space, an entry is evicted from the internal map when
the entry is unlocked
 *     and its committed value is null. If the entry is needed again,
{@link #lock lock(key)} will create a new one.
 *     The "active entries" of the map, as referred to by {@link
#size()} and {@link #keySet()},
 *     are entries that are either being locked or having non-null
committed values.
 * </p>
 *
 * @param <K> type of keys
 * @param <V> type of values
 */
public class AtomicHashMap<K,V>
{
    /**
     * An entry in a AtomicHashMap.
     *
     * @param <V> type of value
     * @see ohm.infra.util.to_lea.AtomicHashMap
     */
    public interface Entry<V> extends AutoCloseable
    {
        V getValue();

        void setValue(V value);

        /**
         * @return number of times the current thread has locked the
entry (minus unlocks).
         */
        int getLockCount();

        /**
         * Unlock the entry; if lock count is 0, commit the value.
         */
        void unlock();

        /**
         * Same as {@link #unlock() }
         */
        void close();
    }

    static class EntryImpl<V> extends ReentrantLock implements Entry<V>
    {
        final Object key;
        final ConcurrentHashMap map;
        EntryImpl(boolean fair, Object key, ConcurrentHashMap map)
        {
            super(fair);
            this.key = key;
            this.map = map;
        }

        public int getLockCount(){ return super.getHoldCount(); }


        V workingValue;
        volatile V committedValue;
        boolean retired;

        // get/set must be called by the owner thread. we have given
plenty of warnings
        // so we skip the check of `super.isHeldByCurrentThread()` here.
        // if client violates that, `workingValue` is messed up; no
harm to our structure.

        public V getValue()
        {
            return workingValue;
        }
        public void setValue(V value)
        {
            workingValue = value;
        }

        // if false, entry is retired
        boolean acquire()
        {
            super.lock();
            if(!retired)
                return true;
            // retired
            super.unlock(); // another sucker may be blocked on acquire() me.
            return false;
        }

        public void unlock()
        {
            if(super.getHoldCount()==1) // outermost unlock
            {
                committedValue = workingValue;

                if(workingValue==null)
                {
                    retired=true;
                    map.remove(key);
                }
            }

            super.unlock();
        }

        public void close()
        {
            unlock();
        }
    }

    final ConcurrentHashMap<K,EntryImpl<V>> map;
    final boolean fair;

    /**
     * With default {@link java.util.concurrent.ConcurrentHashMap}
constructor parameters, and unfair locking.
     */
    public AtomicHashMap()
    {
        this.map = new ConcurrentHashMap<>();
        this.fair = false;
    }

    /**
     * See {@link
java.util.concurrent.ConcurrentHashMap#ConcurrentHashMap(int, float,
int)
     *            ConcurrentHashMap(initialCapacity, loadFactor,
concurrencyLevel)}
     * for meaning of first 3 parameters.
     *
     * @param fair whether locking should be fair
     */
    public AtomicHashMap(int initialCapacity, float loadFactor, int
concurrencyLevel, boolean fair)
    {
        this.map = new ConcurrentHashMap<>(initialCapacity,
loadFactor, concurrencyLevel);
        this.fair = fair;
    }

    /**
     * Exclusively lock the entry of the key.
     */
    public Entry<V> lock(K key)
    {
        while(true)
        {
            EntryImpl<V> entry = map.get(key); // in most cases entry
already exists.
            if(entry==null)
            {
                entry = new EntryImpl<>(fair, key, map);
                EntryImpl<V> prev = map.putIfAbsent(key, entry);
                if(prev!=null)
                    entry = prev;
            }

            if(entry.acquire())
                return entry;
            // else entry retired, re-try.
        }
    }

    /**
     * Get the latest committed value for the key. This call will not block.
     */
    public V get(Object key)
    {
        EntryImpl<V> entry = map.get(key);
        if(entry==null)
            return null;
        if(entry.isHeldByCurrentThread())
            return entry.workingValue;
        // entry could be retired, doesn't matter.
        return entry.committedValue;
    }

    /**
     * Atomically set the <tt>newValue</tt> to the entry of the key,
     * simply, lock the entry, set the value, unlock the entry.
     * If <tt>newValue</tt> is null, the entry may be evicted.
     * This call may block if the entry is being locked by other threads.
     *
     * @return the old value
     */
    public V set(K key, V newValue)
    {
        try( Entry<V> entry = lock(key) )
        {
            V oldValue = entry.getValue();
            entry.setValue(newValue);
            return oldValue;
        }
    }

    /**
     * @return the number of active entries; performs like {@link
java.util.concurrent.ConcurrentHashMap#size()}
     */
    public int size() { return map.size(); }

    /**
     * @return set of keys of active entries, read only.
     */
    public Set<K> keySet()
    {
        Set<K> keySet = map.keySet();
        // that set is mutable, and removing a key there will remove
an entry in map, bypassing our protocol.
        return Collections.unmodifiableSet(keySet); // we must forbid
that; read only to client.
    }

    /**
     * Set all entries' values to null. Every entry may be evicted.
     * This call may block if an entry is being locked by another thread.
     */
    public void clear()
    {
        // we can't simply call map.clear(). must lock entry before removal.
        for(Map.Entry<K,EntryImpl<V>> e : map.entrySet())
        {
            EntryImpl<V> entry = e.getValue(); // can be null due to
concurrent modification.
            if(entry!=null && entry.acquire())
            {
                entry.setValue(null);
                entry.unlock();
            }
        }
    }

}


More information about the Concurrency-interest mailing list