[concurrency-interest] ParallelArray Extension

Kasper Nielsen kasper at kav.dk
Wed Dec 19 05:50:42 EST 2007


Hi,

I've run into two scenarios where I haven't been able to find a 
satisfying solution with the current ParallelArray api.

1.
Selecting distinct elements. This comes up fairly often, for example, 
you want to find a set of currently open trades or all the groups some 
students belong to. The best solution I've been able to come with goes 
something along the lines of:

ParallelArray<Integer> pa = ...
final Map<Integer, Integer> m = new ConcurrentHashMap<Integer, Integer>();
w.apply(new Procedure<Integer>() {
     public void apply(Integer t) {
         if (!m.containsKey(t))
             m.put(t, t);
     }
});
Set<Integer> result = m.keySet();

2.

I have a data set that changes rarely but I need to make a lot of 
read-only calculations on the dataset. So internally I've encapsulated 
an instance of ParallelArray<Integer> with a ReadWriteLock and then 
exposing the wrapped methods:
public Integer sum() {
    readLock.lock();
    return pa.sum();
}
public size size() {
    readLock.lock();
    return pa.size();
}
public void set(int i, Integer x) {
    writeLock.lock();
    pa.set(i, x);
}
etc.

I don't want to expose the ReadWriteLock, because someday I might want 
to change the implementation. Now, say the client to calculate the 
average of all integers in the ParallelArray. Normally it would just be 
pa.sum() / pa.size(). However since pa.sum() and pa.size() are two 
separate methods, somebody might have changed the dataset in the mean 
time (I've not included the operations for changing the size). So I need 
someway to calculate the sum and size in one 'atomic' operation.

One solution I've thought is to add something resembling this method:
<U> U combineReduce(Combiner<T, U, U> c, Reducer<U> reducer);

The idea is that first individual threads work on the combiner. And when 
they are done, all the individual results are combined using the 
reducer. Here is the code for both problems:

1:
HashSet result = w.combineReduce(new Combiner<Integer, HashSet, HashSet>() {
     public HashSet combine(Integer t, HashSet u) {
         if (u == null) {
             u = new HashSet();
         }
         u.add(t);
         return u;
     }

}, new Reducer<HashSet>() {
     public HashSet combine(HashSet t, HashSet v) {
         t.addAll(v);
         return t;
     }
});

2:
static class Aggregate {
     public int count;
     public long sum;
}

Aggregate a = w.combineReduce(new Combiner<Integer, Aggregate, 
Aggregate>() {
     public Aggregate combine(Integer t, Aggregate u) {
         if (u == null) {
             u = new Aggregate();
         }
         u.count++;
         u.sum += t;
         return u;
     }

}, new Reducer<Aggregate>() {
     public Aggregate combine(Aggregate t, Aggregate v) {
         t.count += v.count;
         t.sum += v.sum;
         return t;
     }
});
System.out.println("Average : " + (a.sum / a.count));

Thoughts, or any other way to solve these problems?

cheers
   Kasper


More information about the Concurrency-interest mailing list