[concurrency-interest] Reference Collections

Peter Firmstone peter.firmstone at zeus.net.au
Thu Jan 5 23:09:55 EST 2012


Ok, based on the comments so far:

Reasoning about the possibility that one thread may not be able to keep
up with a ReferenceQueue, bestsss mentioned that ReferenceQueue is a
stack, not a queue, because it blocks while the garbage collector thread
holds it's lock, the cleaning thread won't be able to poll(), but I
suspect the jvm only does this a scheduled intervals.

Oracle might decide to improve ReferenceQueue in future, to use a lock
free queue.

If the underlying collection is causing a backlog then only way to
increase concurrency is to change the underlying collection, that's up
to the user.

If ReferenceQueue is causing a concurrency issue, then we could
potentially have multiple queue's for the collection, but I think we
need to see that happen before increasing complexity.

Anyone have access to some big iron?

It's probably also worth noting that the performance of asynchronous
garbage cleaning, only indirectly affects performance of client threads
using the collection.  If it's blocked, it doesn't consume cpu, it
should eventually succeed, at the risk of increased memory consumption.

Here's the current implementation based on everyone's input:

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership. The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.river.impl.util;

import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * ReferenceProcessor is responsible for creation and collection of
References 
 * on behalf of Reference Collection implementations.
 *
 * @param <T> 
 * @author peter
 */
class ReferenceProcessor<T> implements ReferenceQueuingFactory<T,
Referrer<T>> {
    
    private final static ScheduledExecutorService garbageCleaner =
            Executors.newScheduledThreadPool(1);
    // Map to register newly created object references.
    private final static Map<Reference,ScheduledFuture> finalizerTasks =
            new ConcurrentHashMap<Reference,ScheduledFuture>();
    // Finalizer queue to advise cancellation of ScheduledFuture's, 
    // when their ReferenceProcessor has been collected.
    private final static ReferenceQueue<Reference> phantomQueue = 
            new ReferenceQueue<Reference>();
    static {
        // Finizer Task to cancel unneeded tasks.
        garbageCleaner.scheduleAtFixedRate(
                new FinalizerTask(phantomQueue, finalizerTasks), 
                5L, 5L, TimeUnit.MINUTES
                );
    }
    
    private final Collection<Referrer<T>> col;
    private final ReferenceQueue<T> queue;
    private final Ref type;
    private final Lock queueLock;
    private final boolean gcThreads;
    private volatile boolean started = false;
    
    ReferenceProcessor(Collection<Referrer<T>> col, Ref type,
ReferenceQueue<T> queue, boolean gcThreads){
        if (col == null || type == null ) throw new
NullPointerException("collection or reference type cannot be null");
        this.col = col;
        this.type = type;
        this.queue = type == Ref.STRONG ? null : queue;
        this.gcThreads = gcThreads;
        queueLock = new ReentrantLock();
    }
    
    /**
     * Register with executor service and finaliser for cleanup.
     */
    public void start(){
       if (started) return; // Start once only.
       synchronized (this){
           started = true;
       }
       ScheduledFuture task;
       task = (gcThreads && queue != null)
                ? garbageCleaner.scheduleAtFixedRate(new
CleanerTask(col, queue), 10L, 10L, TimeUnit.SECONDS) 
                : null;
       if ( task != null ){
           // Register with finaliser.
            @SuppressWarnings("unchecked")
           Reference r = new PhantomReference(this, phantomQueue);
           finalizerTasks.put(r, task);
       }
    }

    @Override
    public T pseudoReferent(Referrer<T> u) {
        throw new UnsupportedOperationException("Not supported.");
    }

    @Override
    public Referrer<T> referenced(T w, boolean enque, boolean temporary)
{
        if (w == null) return null;
        if (temporary) return ReferenceFactory.singleUseForLookup(w,
type);
        return ReferenceFactory.create(w, enque == true ? queue : null,
type);
    }

    @Override
    public void processQueue() {
        if (queue == null || gcThreads) return;
        Object t = null;
        /*
         * The reason for using an explicit lock is if another thread is
         * removing the garbage, we don't want to prevent all other
threads
         * accessing the underlying collection, when it blocks on poll,
         * this means that some client threads will receive null values 
         * on occassion, but this is a small price to pay.  
         * Might have to employ the null object pattern.
         */
        if ( queueLock.tryLock()){
            try {
                while ( (t = queue.poll()) != null){
                    col.remove(t);
                }
            }finally{
                queueLock.unlock();
            }
        }
    }
    
    private static class CleanerTask implements Runnable {
        
        private final Collection col;
        private final ReferenceQueue queue;
        
        private CleanerTask(Collection c, ReferenceQueue queue){
            col = c;
            this.queue = queue;
        }
        
        @Override
        public void run() {
            Object t;
            while ( (t = queue.poll()) != null ){ 
                col.remove(t);
            }
        }
    
    }
    
    private static class FinalizerTask implements Runnable {
        
        private final ReferenceQueue phantomQueue;
        private final Map<Reference,ScheduledFuture> finalizerTasks ;
        
        private FinalizerTask(ReferenceQueue queue, 
                Map<Reference,ScheduledFuture> tasks){
            phantomQueue = queue;
            finalizerTasks = tasks;
        }

        @Override
        public void run() {
            Reference p;
            while ( (p = phantomQueue.poll()) != null){
                ScheduledFuture sf = finalizerTasks.remove(p);
                if (sf !=null) sf.cancel(true);
            }
        }
        
    }
    
}




More information about the Concurrency-interest mailing list