[concurrency-interest] ParallelArray Extension
Kasper Nielsen
kasper at kav.dk
Wed Dec 19 05:50:42 EST 2007
Hi,
I've run into two scenarios where I haven't been able to find a
satisfying solution with the current ParallelArray api.
1.
Selecting distinct elements. This comes up fairly often, for example,
you want to find a set of currently open trades or all the groups some
students belong to. The best solution I've been able to come with goes
something along the lines of:
ParallelArray<Integer> pa = ...
final Map<Integer, Integer> m = new ConcurrentHashMap<Integer, Integer>();
w.apply(new Procedure<Integer>() {
public void apply(Integer t) {
if (!m.containsKey(t))
m.put(t, t);
}
});
Set<Integer> result = m.keySet();
2.
I have a data set that changes rarely but I need to make a lot of
read-only calculations on the dataset. So internally I've encapsulated
an instance of ParallelArray<Integer> with a ReadWriteLock and then
exposing the wrapped methods:
public Integer sum() {
readLock.lock();
return pa.sum();
}
public size size() {
readLock.lock();
return pa.size();
}
public void set(int i, Integer x) {
writeLock.lock();
pa.set(i, x);
}
etc.
I don't want to expose the ReadWriteLock, because someday I might want
to change the implementation. Now, say the client to calculate the
average of all integers in the ParallelArray. Normally it would just be
pa.sum() / pa.size(). However since pa.sum() and pa.size() are two
separate methods, somebody might have changed the dataset in the mean
time (I've not included the operations for changing the size). So I need
someway to calculate the sum and size in one 'atomic' operation.
One solution I've thought is to add something resembling this method:
<U> U combineReduce(Combiner<T, U, U> c, Reducer<U> reducer);
The idea is that first individual threads work on the combiner. And when
they are done, all the individual results are combined using the
reducer. Here is the code for both problems:
1:
HashSet result = w.combineReduce(new Combiner<Integer, HashSet, HashSet>() {
public HashSet combine(Integer t, HashSet u) {
if (u == null) {
u = new HashSet();
}
u.add(t);
return u;
}
}, new Reducer<HashSet>() {
public HashSet combine(HashSet t, HashSet v) {
t.addAll(v);
return t;
}
});
2:
static class Aggregate {
public int count;
public long sum;
}
Aggregate a = w.combineReduce(new Combiner<Integer, Aggregate,
Aggregate>() {
public Aggregate combine(Integer t, Aggregate u) {
if (u == null) {
u = new Aggregate();
}
u.count++;
u.sum += t;
return u;
}
}, new Reducer<Aggregate>() {
public Aggregate combine(Aggregate t, Aggregate v) {
t.count += v.count;
t.sum += v.sum;
return t;
}
});
System.out.println("Average : " + (a.sum / a.count));
Thoughts, or any other way to solve these problems?
cheers
Kasper
More information about the Concurrency-interest
mailing list