[concurrency-interest] Reference Collections

Peter Firmstone peter.firmstone at zeus.net.au
Wed Jan 4 07:02:50 EST 2012


Hi,

Over at river.apache.org, we've got a reference collections library,
utilised for concurrent caches, it's straightforward clean and simple
code.  Easy to use, although Generics are a little verbose.  RC is a
class with a simple public api, containing static methods used to wrap
any of the java collections interfaces, allowing you to use any
reference type, to refer to objects contained in your collections.

All implementation is package private, sync / locking strategies are
left to the underlying collection, removal / cleaning is performed by
any thread that obtains a lock on a private ReferenceQueue (each
collection has it's own), which is guarded by a ReentrantLock.tryLock(),
threads that don't obtain the lock, continue without performing any
cleanup.  Do you think that all calling threads attempting to obtain the
tryLock(), (once only) will cause any performance issues?  Garbage
collection occurs concurrently with other operations.

Reference types available are Weak Identity, Weak, Soft Identity, Soft
and Strong.  It's relatively simple to add additional reference types
such as a timed reference, however this isn't implemented at present.

Here's a useage example, straight from our SecurityManager code:

<SNIP>
private final ConcurrentMap<AccessControlContext,
NavigableSet<Permission>> checked;
private final Comparator<Referrer<Permission>> permCompare;
</SNIP>

<SNIP-FROM-CONSTRUCTOR>
ConcurrentMap<Referrer<AccessControlContext>,Referrer<NavigableSet<Permission>>> refmap 
	= new
ConcurrentHashMap<Referrer<AccessControlContext>,Referrer<NavigableSet<Permission>>>(100);

checked = RC.concurrentMap(refmap, Ref.SOFT, Ref.STRONG);
permCompare = RC.comparator(new PermissionComparator());

</SNIP-FROM-CONSTRUCTOR>

<SNIP-METHOD>
    @Override
    public void checkPermission(Permission perm, Object context) throws
SecurityException {
	if (!(context instanceof AccessControlContext)) throw new
SecurityException();
	if (perm == null ) throw new NullPointerException("Permission
Collection null");
        /* The next line speeds up permission checks related to this
SecurityManager. */
        if ( SMPrivilegedContext.equals(context) ||
SMConstructorContext.equals(context)) return; // prevents endless loop
in debug.
        AccessControlContext executionContext = (AccessControlContext)
context;
        // Checks if Permission has already been checked for this
context.
        NavigableSet<Permission> checkedPerms =
checked.get(executionContext);
        if (checkedPerms == null){
            /* A ConcurrentSkipListSet is used to avoid blocking during
             * removal operations that occur while the garbage collector
             * recovers softly reachable memory.  Since this happens
while
             * the jvm's under stress, it's important that permission
checks
             * continue to perform well.
             * 
             * Although I considered a multi read, single write Set, I
wanted
             * to avoid blocking under stress, caused as a result
             * of garbage collection.
             * 
             * The Reference Collection that encapsulates the
ConcurrentSkipListSet
             * uses a ReentrantLock.tryLock() guard the ReferenceQueue
used
             * to remove objects from the Set.  This allows other
threads to
             * proceed during object removal.  Only one thread is given
access
             * to the ReferenceQueue, the unlucky caller thread performs
garbage
             * removal from the Set before accessing the Set for its
original
             * purpose.
             */
            NavigableSet<Referrer<Permission>> internal = 
                    new
ConcurrentSkipListSet<Referrer<Permission>>(permCompare);
            checkedPerms = RC.navigableSet(internal, Ref.SOFT);
            NavigableSet<Permission> existed =
checked.putIfAbsent(executionContext, checkedPerms);
            if (existed != null) checkedPerms = existed;
        }
        if (checkedPerms.contains(perm)) return; // don't need to check
again.

</SNIP-METHOD>

Serialization is implemented so implementations can be replaced and
upgraded, serial form is a separate concern; see the Serialization
Builder pattern for details: http://wiki.apache.org/river/Serialization

This a copy of some implementation code:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.river.impl.util;


import java.lang.ref.Reference;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A referenced hash map, that encapsulates and utilises any
ConcurrentMap
 * implementation passed in at construction.
 * 
 * Based on any ConcurrentMap implementation, it doesn't accept null
keys or values.
 *
 * It is recommended although not mandatory to use identity based
References for keys,
 * unexpected results occur when relying on equal keys, if one key is no
longer 
 * strongly reachable and has been garbage collected and removed from
the 
 * Map.
 * 
 * 
 * 
 * If either a key or value, is no longer strongly reachable, their
mapping
 * will be queued for removal and garbage collection, in compliance with
 * the Reference implementation selected.
 *
 * @param <K> 
 * @param <V> 
 * @see Ref
 * @author Peter Firmstone.
 *
 * @since 2.3
 */
class ReferenceConcurrentMap<K, V> extends ReferenceMap<K, V> implements
ConcurrentMap<K, V> {

    // ConcurrentMap must be protected from null values?  It changes
it's behaviour, is that a problem?
    private final ConcurrentMap<Referrer<K>, Referrer<V>> map;
    
    ReferenceConcurrentMap(ConcurrentMap<Referrer<K>,Referrer<V>> map,
Ref key, Ref val){
        super (map, key, val);
        this.map = map;
    }
    
    ReferenceConcurrentMap(ConcurrentMap<Referrer<K>, Referrer<V>> map,
            ReferenceQueuingFactory<K, Referrer<K>> krqf,
ReferenceQueuingFactory<V, Referrer<V>> vrqf, Ref key, Ref val){
        super(map, krqf, vrqf, key, val);
        this.map = map;
    }
    
    public V putIfAbsent(K key, V value) {
        processQueue();  //may be a slight delay before atomic
putIfAbsent
        Referrer<K> k = wrapKey(key, true, false);
        Referrer<V> v = wrapVal(value, true, false);
        Referrer<V> val = map.putIfAbsent(k, v);
        while ( val != null ) {
            V existed = val.get();
            // We hold a strong reference to value, so 
            if ( existed == null ){
                // stale reference must be replaced, it has been garbage
collect but hasn't 
                // been removed, we must treat it like the entry doesn't
exist.
                if ( map.replace(k, val, v)){
                    // replace successful
                    return null; // Because officially there was no
record.
                } else {
                    // Another thread may have replaced it.
                    val = map.putIfAbsent(k, v);
                }
            } else {
                return existed;
            }
        }
        return null;
    }

    @SuppressWarnings("unchecked")
    public boolean remove(Object key, Object value) {
        processQueue();
        return map.remove(wrapKey((K) key, false, true), wrapVal((V)
value, false, true));
    }

    public boolean replace(K key, V oldValue, V newValue) {
        processQueue();
        return map.replace(wrapKey(key, false, true), wrapVal(oldValue,
false, true), wrapVal(newValue, true, false));
    }

    public V replace(K key, V value) {
        processQueue();
        Referrer<V> val = map.replace(wrapKey(key, false, true),
wrapVal(value, true, false));
        if ( val != null ) return val.get();
        return null;
    }
}

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.river.impl.util;

import java.io.InvalidObjectException;
import java.io.ObjectInputStream;
import java.io.ObjectStreamException;
import java.io.Serializable;
import java.io.WriteAbortedException;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * A Collection of Reference Objects, the developer may chose any
Collection
 * implementation to store the References, which is passed in a runtime.
 * 
 * The underlying Collection implementation governs the specific
behaviour of this
 * Collection.
 * 
 * Synchronisation must be implemented by the underlying Collection and
cannot
 * be performed externally to this class.  The underlying Collection
must
 * also be mutable.  Objects will be removed automatically from the
underlying
 * Collection when they are eligible for garbage collection.
 * 
 * Weak, Weak Identity, Soft, Soft Identity or Strong references may be
used.
 * This Collection may be used as an Object pool cache or any other
purpose 
 * that requires unique memory handling.
 * 
 * For concurrent threads, it is recommended to encapsulate the
underlying
 * collection in a multi read, single write collection for scalability.
 * 
 * @see Ref
 * @see ConcurrentCollections#multiReadCollection(java.util.Collection) 
 * @author Peter Firmstone.
 */
class ReferenceCollection<T> extends AbstractCollection<T> 
                                implements Collection<T>, Serializable {
    private static final long serialVersionUID = 1L;
    private final Collection<Referrer<T>> col;
    private final ReferenceQueuingFactory<T, Referrer<T>> rqf;
    private final Ref type;
    
    ReferenceCollection(Collection<Referrer<T>> col, Ref type){
        this(col, new ReferenceProcessor<T>(col, type, type ==
Ref.STRONG ? null : new ReferenceQueue<T>()), type);
    }
    
    ReferenceCollection(Collection<Referrer<T>> col, 
            ReferenceQueuingFactory<T, Referrer<T>> rqf, Ref type){
        this.col = col;
        this.rqf = rqf;
        this.type = type;
    }
    
    void processQueue(){
        rqf.processQueue();
        }
    
    ReferenceQueuingFactory<T, Referrer<T>> getRQF(){
        return rqf;
    }
    
    Ref getRef(){
        return type;
    }
    
    Referrer<T> wrapObj(T t, boolean enqueue, boolean temporary){
        return rqf.referenced(t, enqueue, temporary);
    }
    
    public int size() {
        processQueue();
        return col.size();
    }

    public boolean isEmpty() {
        processQueue();
        return col.isEmpty();
    }

    public boolean contains(Object o) {
        processQueue();
        return col.contains(wrapObj((T) o, false, true));
    }
    
    /**
     * This Iterator may return null values if garbage collection
     * runs during iteration.
     * 
     * Always check for null values.
     * 
     * @return T - possibly null.
     */
    public Iterator<T> iterator() {
        processQueue();
        return new ReferenceIterator<T>(col.iterator());
    }

    public boolean add(T e) {
        processQueue();
        return col.add(wrapObj(e, true, false));
    }

    public boolean remove(Object o) {
        processQueue();
        return col.remove(wrapObj((T) o, false, true));
    }

 
    @SuppressWarnings("unchecked")
    public boolean containsAll(Collection<?> c) {
        processQueue();
        return col.containsAll(new CollectionWrapper<T>((Collection<T>)
c, getRQF(), false, true));
    }

    
    @SuppressWarnings("unchecked")
    public boolean addAll(Collection<? extends T> c) {
        processQueue();
        return col.addAll(new CollectionWrapper<T>((Collection<T>) c,
getRQF(), true, false));
    }

    public void clear() {
        col.clear();
    }
    
    /*
     * The next three methods are suitable implementations for
subclasses also.
     */
    public String toString(){
        return col.toString();
    }

    @Override
    public int hashCode() {
        if ( col instanceof List || col instanceof Set ){
            return col.hashCode();
        }
        return System.identityHashCode(this);
    }
    
    /**
     * Because equals and hashCode are not defined for collections, we 
     * cannot guarantee consistent behaviour by implementing equals and
     * hashCode.  A collection could be a list, set, queue or deque.
     * So a List != Queue and a Set != list. therefore equals for
collections is
     * not defined.
     * 
     * However since two collections may both also be Lists, while
abstracted
     * from the client two lists may still be equal.
     * @see Collection#equals(java.lang.Object) 
     */
    
    @Override
    public boolean equals(Object o){
        if ( o == this ) return true;
        if ( col instanceof List || col instanceof Set ){
            return col.equals(o);
        }
        return false;
    }
    
    final Object writeReplace() throws ObjectStreamException {
        try {
            // returns a Builder instead of this class.
            return SerializationOfReferenceCollection.create(getClass(),
col, type );
        } catch (InstantiationException ex) {
            throw new WriteAbortedException("Unable to create
serialization proxy", ex);
        } catch (IllegalAccessException ex) {
            throw new WriteAbortedException("Unable to create
serialization proxy", ex);
        }
    }
    
    private void readObject(ObjectInputStream stream) 
            throws InvalidObjectException{
        throw new InvalidObjectException("Builder required");
    }

}




More information about the Concurrency-interest mailing list