[concurrency-interest] Scalable object cache

Peter Firmstone peter.firmstone at zeus.net.au
Wed Feb 29 20:38:39 EST 2012


I've made some minor modifications to TimedReferrer, to support
cancellation of Future tasks in Queue's, this may act as a throttling
mechanism.

This may provide an alternative throttling mechanism to using a
BlockingQueue, to obtain similar but non blocking behaviour for
ConcurrentLinkedQueue, or multiple ConcurrentLinkedQueue's.

Example usage:

Queue<Referrer<Future>> internal = new
ConcurrentLinkedQueue<Referrer<Future>>();
Queue<Future> taskQue = RC.queue(internal, Ref.TIME, 10000L);

In this case the cycle time will be ten seconds, so the task won't be
cancelled until 20 seconds or more has elapsed, the Queue will be
visited once every ten seconds, this could be set to less than a second,
say 100ms, however it's important that the cycle period is sensible,
rapid cycle times may reduce scalability or prune too many tasks, while
long cycle times may cause OME.  Tuning is left up to the user.  I
haven't experimented with dynamically adjustable cycle times, it's set
at construction.

The Future is first cancelled, by visiting each TimedReferrer in the
queue using an Iterator,  TimedReferrers enqueue themselves for removal
after timeout and finally are removed from the queue by another thread.

Is it right to assume a linked queue for example, will be easier to
iterate for a background thread?  If all other threads are accessing the
head and tail of the queue, the middle should be quiet and not compete
for access?

TimedReferrer, guarantees that a Future will be cancelled prior to
removal from a Queue, so an Executor may poll cancelled tasks.

N.B. On that topic, is there a possibility that ForkJoinPool, will allow
the use of different Queue implementations, eg via a QueueFactory?

Regards,

Peter.


/*
 * Copyright 2012 Zeus Project Services Pty Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package au.net.zeus.collection;

import java.util.concurrent.Future;

/**
 * 
 * @author Peter Firmstone.
 */
class TimedReferrer<T> implements UntouchableReferrer<T>, TimeBomb {
    
    private volatile long clock;
    private volatile long read;
    private final TimedRefQueue queue;
    private volatile T referent;
    private volatile boolean enqued;
    private final Object lock;
    private final int hash;
    
    TimedReferrer(T k, TimedRefQueue q){
        long time = System.nanoTime();
        clock = time;
        read = time;
        referent = k;
        queue = q;
        enqued = false;
        lock = new Object();
        int hash = 7;
        hash = 29 * hash + k.hashCode();
        hash = 29 * hash + k.getClass().hashCode();
        this.hash = hash;
    }

    public T get() {
        // Doesn't need to be atomic.
        if (read < clock) read = clock; //Avoid unnecessary volatile
write.
        return referent;
    }

    public void clear() {
        referent = null;
    }

    public boolean isEnqueued() {
        return enqued;
    }

    public boolean enqueue() {
        if (enqued) return false;
        if (referent == null) return false;
        if (queue == null) return false;
        synchronized (lock){ // Sync for atomic write of enqued.
            if (enqued) return false;
            enqued = queue.offer(this);
        }
        return enqued;
    }
    
    @Override
    public void updateClock(long time){
        if (read < clock) { // only write volatile if necessary.
            if (referent instanceof Future)
((Future)referent).cancel(false);
            enqueue();
            // Don't clear, it will be removed soon anyway, prevents 
            // non empty Queue.poll() returning null.
            //clear();
        } else {
            clock = time;
        }
    }
    
    @Override
    public boolean equals(Object o) {
        if (this == o)  return true; // Same reference.
        if (!(o instanceof Referrer))  return false;
        Object k1 = get(); //call get(), so equals updates clock for
key's in a hash map.
        Object k2 =((Referrer) o).get();
        if ( k1 != null && k1.equals(k2)) return true;
        return ( k1 == null && k2 == null && hashCode() ==
o.hashCode()); // Both objects were collected.
    }

    @Override
    public int hashCode() {
        Object k = referent; //don't call get(), avoid read update.
        int hash = 7;
        if (k != null) {
            hash = 29 * hash + k.hashCode();
            hash = 29 * hash + k.getClass().hashCode();
        } else {
            hash = this.hash;
        }
        return hash;
    }
    
    @Override
    public String toString(){
        Object s = get();
        if (s != null) return s.toString();
        return super.toString();
    }

    public T lookDontTouch() {
        return referent;
    }
    
}



More information about the Concurrency-interest mailing list