[concurrency-interest] Problem with using an unbounded map of Mutex lock objects

Greg Luck gluck at gregluck.com
Tue Jun 6 04:20:13 EDT 2006


Brian et al

Thanks so much for everyone's contributions.

I tested out Brian's lock striping with my suite of concurrency tests  
(BlockingCacheTest in ehcache) which hammer BlockingCache with 100  
threads. It seems to work very well.

Mutex objects use 24 bytes so I opted to go for 100. That is 2400  
bytes in total, way down from the tens of MBs that is common now in  
ehcache for large systems.

I also looked into the  allocation of locks for different keys. In  
the tests I did for lock selection, locks were evenly selected +- 20%.

In all this looks to be an excellent solution with minimal change to  
the BlockingCache. The newly revised blocking cache is reproduced  
below. I would appreciate anyone letting me know if there are any  
shortcomings in it.

One point to note. In Brian's implementation, getLockForKey 
(Serializable key) was synchronized. I think this is unncecessary and  
potentially harmful to scalability. Mine is not synchronized.

Finally thanks to Tim Peieris and others for alternate solutions. I  
don't want to bring another lib at this time. One of the attractions  
of ehcache, which I would like to preserve,  is the minimal  
dependencies.

Brian's Recommendation
===================


Hashtable / synchronizedMap uses the "one big fat lock" approach to  
guard the mutable state of the map.  That works, but is a big  
concurrency bottleneck, as you've observed.  You went to the opposite  
extreme, one lock per key.  That works (as long as you've got  
sufficient synchronization in the cache itself to protect its own  
data structures.)

Lock striping is a middle ground, partitioning keys into a fixed  
number of subsets, like the trick used at large theaters for will- 
call ticket pickup -- there are separate lines for "A-F, G-M, N-R,  
and S-Z".  This way, there are a fixed number of locks, each guarding  
(hopefully) 1/Nth of the keys.

You could use striping in your example:

   Lock[] locks = new Lock[N];
   /* intializer block */ {
     for (i=0; i<N; i++)
         locks[i] = new ReentrantLock();
   }

and

private synchronized Mutex getLockForKey(final Serializable key) {
     int h = key.hashCode() % N;
     if (h < 0)
         h += N;
     return locks[h];
}

This way, 1/Nth of the keys are guarded by locks[0], 1/Nth by locks 
[1], etc.  Concurrency can be tuned by choosing N; we found 16 works  
well on systems with fewer than a few dozen processors.



The completed BlockingCache
========================



public class BlockingCache {

     /**
      * The default number of locks to use. 16 is recommended by  
Brian Goetz
      */
     protected static final int LOCK_NUMBER = 100;

     private static final Log LOG = LogFactory.getLog 
(BlockingCache.class.getName());


     /**
      * Based on the lock striping concept from Brian Goetz. See Java  
Concurrency in Practice 11.4.3
      */
     private Mutex[] locks = new Mutex[LOCK_NUMBER];

     {
         for (int i = 0; i < LOCK_NUMBER; i++) {
             locks[i] = new Mutex();
         }
     }

     /**
      * The backing Cache
      */
     private final Ehcache cache;


     private final int timeoutMillis;
     /**
      * A map of cache entry locks, one per key, if present
      */
     //private final Map locks = new HashMap();

     /**
      * Creates a BlockingCache with the given name.
      *
      * @param name the name to give the cache
      * @throws CacheException
      */
     public BlockingCache(final String name) throws CacheException {
         this(name, 0);
     }

     /**
      * Creates a BlockingCache with the given name.
      *
      * @param name          the name to give the cache
      * @param timeoutMillis the amount of time, in milliseconds, to  
block for
      * @throws CacheException
      * @since 1.2
      */
     public BlockingCache(final String name, int timeoutMillis)  
throws CacheException {
         CacheManager manager = null;
         try {
             manager = CacheManager.create();
         } catch (net.sf.ehcache.CacheException e) {
             LOG.fatal("CacheManager cannot be created. Cause was: "  
+ e.getMessage() + e);
             throw new CacheException("CacheManager cannot be  
created", e);
         }
         cache = manager.getCache(name);
         if (cache == null || !cache.getName().equals(name)) {
             throw new CacheException("Cache " + name + " cannot be  
retrieved. Please check ehcache.xml");
         }
         this.timeoutMillis = timeoutMillis;
     }

     /**
      * Creates a BlockingCache with the given name and
      * uses the given cache manager to create the cache
      *
      * @param name    the name to give the cache
      * @param manager the EHCache CacheManager used to create the  
backing cache
      * @throws CacheException
      */
     public BlockingCache(final String name, final CacheManager  
manager) throws CacheException {
         this(name, manager, 0);
     }

     /**
      * Creates a BlockingCache with the given name and
      * uses the given cache manager to create the cache
      *
      * @param name          the name to give the cache
      * @param manager       the EHCache CacheManager used to create  
the backing cache
      * @param timeoutMillis the amount of time, in milliseconds, to  
block for
      * @throws CacheException
      * @since 1.2
      */
     public BlockingCache(final String name, final CacheManager  
manager, int timeoutMillis) throws CacheException {
         if (manager == null) {
             throw new CacheException("CacheManager cannot be null");
         }
         cache = manager.getCache(name);
         if (cache == null || !cache.getName().equals(name)) {
             throw new CacheException("Cache " + name + " cannot be  
retrieved. Please check ehcache.xml");
         }
         this.timeoutMillis = timeoutMillis;
     }

     /**
      * Retrieve the EHCache backing cache
      */
     protected net.sf.ehcache.Ehcache getCache() {
         return cache;
     }

     /**
      * Returns this cache's name
      */
     public String getName() {
         return cache.getName();
     }

     /**
      * Looks up an entry.  Blocks if the entry is null.
      * Relies on the first thread putting an entry in, which  
releases the lock
      * If a put is not done, the lock is never released
      */
     public Serializable get(final Serializable key) throws  
BlockingCacheException {
         Mutex lock = getLockForKey(key);
         try {
             if (timeoutMillis == 0) {
                 lock.acquire();
             } else {
                 boolean acquired = lock.attempt(timeoutMillis);
                 if (!acquired) {
                     StringBuffer message = new StringBuffer("lock  
timeout attempting to acquire lock for key ")
                             .append(key).append(" on cache ").append 
(cache.getName());
                     throw new BlockingCacheException(message.toString 
());
                 }
             }
             final Element element = cache.get(key);
             if (element != null) {
                 //ok let the other threads in
                 lock.release();
                 return element.getValue();
             } else {
                 //don't release the read lock until we write
                 return null;
             }
         } catch (InterruptedException e) {
             throw new CacheException("Interrupted. Message was: " +  
e.getMessage());
         }
     }

     private Mutex getLockForKey(final Serializable key) {
         if (key == null) {
             return locks[0];
         }
         int h = key.hashCode() % LOCK_NUMBER;
         if (h < 0) {
             h += LOCK_NUMBER;
         }
         return locks[h];
     }

     /**
      * Adds an entry and unlocks it
      */
     public void put(final Serializable key, final Serializable value) {
         Mutex lock = getLockForKey(key);
         try {
             if (value != null) {
                 final Element element = new Element(key, value);
                 cache.put(element);
             } else {
                 cache.remove(key);
             }
         } finally {
             //Release the readlock here. This will have been  
acquired in the get, where the element was null
             lock.release();
         }
     }

     /**
      * Returns the keys for this cache.
      *
      * @return The keys of this cache.  This is not a live set, so  
it will not track changes to the key set.
      */
     public Collection getKeys() throws CacheException {
         return cache.getKeys();
     }

     /**
      * Drops the contents of this cache.
      */
     public void clear() throws CacheException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Cache " + cache.getName() + ": removing all  
entries");
         }
         cache.removeAll();
     }

     /**
      * Synchronized version of getName to test liveness of the  
object lock.
      * <p/>
      * The time taken for this method to return is a useful measure  
of runtime contention on the cache.
      */
     public synchronized String liveness() {
         return getName();
     }

     /**
      * Gets all entries from a blocking cache. Cache is not  
serializable. This
      * method provides a way of accessing the keys and values of a  
Cache in a Serializable way e.g.
      * to return from a Remote call.
      * <p/>
      * This method may take a long time to return. It does not lock  
the cache. The list of entries is based
      * on a copy. The actual cache may have changed in the time  
between getting the list and gathering the
      * KeyValuePairs.
      * <p/>
      * This method can potentially return an extremely large object,  
roughly matching the memory taken by the cache
      * itself. Care should be taken before using this method.
      * <p/>
      * By getting all of the entries at once this method can  
transfer a whole cache with a single method call, which
      * is important for Remote calls across a network.
      *
      * @return a Serializable {@link java.util.List} of {@link  
KeyValuePair}s, which implement the Map.Entry interface
      * @throws CacheException where there is an error in the  
underlying cache
      */
     public List getEntries() throws CacheException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Getting entries for the " + cache.getName() +  
" cache");
         }
         Collection keys = cache.getKeys();
         List keyValuePairs = new ArrayList(keys.size());
         for (Iterator iterator = keys.iterator(); iterator.hasNext 
();) {
             Serializable key = (Serializable) iterator.next();
             Element element = cache.get(key);
             keyValuePairs.add(new KeyValuePair(key, element.getValue 
()));
         }
         return keyValuePairs;
     }
}


Regards

Greg Luck

web: http://gregluck.com
skype: gregrluck
yahoo: gregrluck
mobile: +61 408 061 622



-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060606/fcd0d59e/attachment-0001.html 


More information about the Concurrency-interest mailing list