[concurrency-interest] ConcurrentHashMapV8
Zhong Yu
zhong.j.yu at gmail.com
Mon Aug 29 19:25:20 EDT 2011
This is great, a very much needed feature.
About recursive computeIfAbsent() calls (on the same key) - the
behavior can be modeled the same as nested transactions: the thread
that locks the key can update the entry, and a following read in the
same thread will see that value; however the value is only committed,
i.e. visible to other threads, upon the outermost unlock action. All
threads can read the latest committed value of an entry without
locking.
I have just implemented something similar, which I'll attach later.
The code is very straightforward, given the facility of
lock-update-unlock an entry. Actually, maybe ConcurrentHashMap should
expose this facility; value based locking ( lock(v)..unlock(v) ) has
been desired by some people, but there are no well known
implementations.
If ConcurrentHashMap exposes per-entry lock-update-unlock facility,
computeIfAbsent() impl would be so trivial, it doesn't need to be
provided by JDK, everybody can write it.
Zhong Yu
---------------------------------
// code below is released to public domain by the author
public interface Provider<T>
{
T get();
}
public class ConcurrentScope extends AtomicHashMap<Object,Object>
{
public <T> T computeIfAbsent(Object key, Provider<T> provider)
{
Object val = super.get(key);
if(val==null)
{
if(provider==null)
return null;
try( Entry<Object> entry = super.lock(key) )
{
val = entry.getValue();
if(val==null)
{
val = provider.get(); // alien code. may block,
throw, recurse
entry.setValue(val);
}
} //auto unlock
}
return (T)val;
}
}
// AtomicHashMap.java ---------------------------
/**
* A concurrent hash map where an entry can be individually locked to
implement atomic operations.
* Example usages:
* <pre>
* import ohm.infra.util.AtomicHashMap;
* import ohm.infra.util.AtomicHashMap.Entry;
*
* AtomicHashMap<K,V> map = new AtomicHashMap<>();
* Entry<V> entry = map.lock(key)
* try
* {
* v1 = entry.getValue();
* ...
* entry.setValue(v2);
* }
* finally
* { entry.unlock(); }
* </pre>
* <tt>AtomicHashMap.Entry</tt> is <tt>AutoCloseable</tt>, the
try-with-resource syntax would be simpler to use
* <pre>
*
* try( Entry<V> entry = map.lock(key) ) // auto unlock
* {
* v1 = entry.getValue();
* ...
* entry.setValue(v2);
* }
* </pre>
* <p>
* The only way to get an entry for a <tt>key</tt> is through
{@link #lock lock(key)}.
* The entry must be used in the same thread that locked it; once
it's unlocked,
* the object representing the entry shall not be reused. To get
the entry again,
* {@link #lock lock(key)} must be invoked again. Nested
lock-unlock on the same key are allowed.
* Upon the outermost unlock action, the value set by the last
<tt>setValue()</tt> is "committed".
* </p>
* <p>
* {@link #get get(key) } reads the latest committed value without
locking. It's like
* a volatile read; its performance is on par with {@link
java.util.concurrent.ConcurrentHashMap#get}.
* </p>
* <p>
* Implementation note: an AtomicHashMap is internally backed up
by a {@link java.util.concurrent.ConcurrentHashMap}.
* Every entry is backed up by a {@link
java.util.concurrent.locks.ReentrantLock}.
* To save space, an entry is evicted from the internal map when
the entry is unlocked
* and its committed value is null. If the entry is needed again,
{@link #lock lock(key)} will create a new one.
* The "active entries" of the map, as referred to by {@link
#size()} and {@link #keySet()},
* are entries that are either being locked or having non-null
committed values.
* </p>
*
* @param <K> type of keys
* @param <V> type of values
*/
public class AtomicHashMap<K,V>
{
/**
* An entry in a AtomicHashMap.
*
* @param <V> type of value
* @see ohm.infra.util.to_lea.AtomicHashMap
*/
public interface Entry<V> extends AutoCloseable
{
V getValue();
void setValue(V value);
/**
* @return number of times the current thread has locked the
entry (minus unlocks).
*/
int getLockCount();
/**
* Unlock the entry; if lock count is 0, commit the value.
*/
void unlock();
/**
* Same as {@link #unlock() }
*/
void close();
}
static class EntryImpl<V> extends ReentrantLock implements Entry<V>
{
final Object key;
final ConcurrentHashMap map;
EntryImpl(boolean fair, Object key, ConcurrentHashMap map)
{
super(fair);
this.key = key;
this.map = map;
}
public int getLockCount(){ return super.getHoldCount(); }
V workingValue;
volatile V committedValue;
boolean retired;
// get/set must be called by the owner thread. we have given
plenty of warnings
// so we skip the check of `super.isHeldByCurrentThread()` here.
// if client violates that, `workingValue` is messed up; no
harm to our structure.
public V getValue()
{
return workingValue;
}
public void setValue(V value)
{
workingValue = value;
}
// if false, entry is retired
boolean acquire()
{
super.lock();
if(!retired)
return true;
// retired
super.unlock(); // another sucker may be blocked on acquire() me.
return false;
}
public void unlock()
{
if(super.getHoldCount()==1) // outermost unlock
{
committedValue = workingValue;
if(workingValue==null)
{
retired=true;
map.remove(key);
}
}
super.unlock();
}
public void close()
{
unlock();
}
}
final ConcurrentHashMap<K,EntryImpl<V>> map;
final boolean fair;
/**
* With default {@link java.util.concurrent.ConcurrentHashMap}
constructor parameters, and unfair locking.
*/
public AtomicHashMap()
{
this.map = new ConcurrentHashMap<>();
this.fair = false;
}
/**
* See {@link
java.util.concurrent.ConcurrentHashMap#ConcurrentHashMap(int, float,
int)
* ConcurrentHashMap(initialCapacity, loadFactor,
concurrencyLevel)}
* for meaning of first 3 parameters.
*
* @param fair whether locking should be fair
*/
public AtomicHashMap(int initialCapacity, float loadFactor, int
concurrencyLevel, boolean fair)
{
this.map = new ConcurrentHashMap<>(initialCapacity,
loadFactor, concurrencyLevel);
this.fair = fair;
}
/**
* Exclusively lock the entry of the key.
*/
public Entry<V> lock(K key)
{
while(true)
{
EntryImpl<V> entry = map.get(key); // in most cases entry
already exists.
if(entry==null)
{
entry = new EntryImpl<>(fair, key, map);
EntryImpl<V> prev = map.putIfAbsent(key, entry);
if(prev!=null)
entry = prev;
}
if(entry.acquire())
return entry;
// else entry retired, re-try.
}
}
/**
* Get the latest committed value for the key. This call will not block.
*/
public V get(Object key)
{
EntryImpl<V> entry = map.get(key);
if(entry==null)
return null;
if(entry.isHeldByCurrentThread())
return entry.workingValue;
// entry could be retired, doesn't matter.
return entry.committedValue;
}
/**
* Atomically set the <tt>newValue</tt> to the entry of the key,
* simply, lock the entry, set the value, unlock the entry.
* If <tt>newValue</tt> is null, the entry may be evicted.
* This call may block if the entry is being locked by other threads.
*
* @return the old value
*/
public V set(K key, V newValue)
{
try( Entry<V> entry = lock(key) )
{
V oldValue = entry.getValue();
entry.setValue(newValue);
return oldValue;
}
}
/**
* @return the number of active entries; performs like {@link
java.util.concurrent.ConcurrentHashMap#size()}
*/
public int size() { return map.size(); }
/**
* @return set of keys of active entries, read only.
*/
public Set<K> keySet()
{
Set<K> keySet = map.keySet();
// that set is mutable, and removing a key there will remove
an entry in map, bypassing our protocol.
return Collections.unmodifiableSet(keySet); // we must forbid
that; read only to client.
}
/**
* Set all entries' values to null. Every entry may be evicted.
* This call may block if an entry is being locked by another thread.
*/
public void clear()
{
// we can't simply call map.clear(). must lock entry before removal.
for(Map.Entry<K,EntryImpl<V>> e : map.entrySet())
{
EntryImpl<V> entry = e.getValue(); // can be null due to
concurrent modification.
if(entry!=null && entry.acquire())
{
entry.setValue(null);
entry.unlock();
}
}
}
}
More information about the Concurrency-interest
mailing list