[concurrency-interest] ParallelArray Extension

Kasper Nielsen kasper at kav.dk
Wed Dec 19 10:13:09 EST 2007


Tim Peierls wrote:
> These sound like applications for map-reduce:
> 
> 1. w.withMapping(elementsToSingletonSets).reduce(combineSets);
> 2. w.withMapping(elementsToAggregates).reduce(combineAggregates);
> 
> I'm don't know whether these will perform as well in practice as the 
> solutions you've come up with, but they seem essentially equivalent to 
> your proposed combineReduce.

 From a functional standpoint this works fine, but it doesn't really 
perform. If I create a new object in withMapping. I would need to create 
a new HashSet/Aggregate for each element in ParallelArray.

> Incidentally, wouldn't you want to use ParallelIntArray in your examples?

I've just used Integer's to keep things simple in the examples.

- Kasper

> 
> --tim
> 
> On Dec 19, 2007 5:50 AM, Kasper Nielsen <kasper at kav.dk 
> <mailto:kasper at kav.dk> > wrote:
> 
>     Hi,
> 
>     I've run into two scenarios where I haven't been able to find a
>     satisfying solution with the current ParallelArray api.
> 
>     1.
>     Selecting distinct elements. This comes up fairly often, for example,
>     you want to find a set of currently open trades or all the groups some
>     students belong to. The best solution I've been able to come with goes
>     something along the lines of:
> 
>     ParallelArray<Integer> pa = ...
>     final Map<Integer, Integer> m = new ConcurrentHashMap<Integer,
>     Integer>();
>     w.apply(new Procedure<Integer>() {
>         public void apply(Integer t) {
>             if (!m.containsKey(t))
>                 m.put(t, t);
>         }
>     });
>     Set<Integer> result = m.keySet();
> 
>     2.
> 
>     I have a data set that changes rarely but I need to make a lot of
>     read-only calculations on the dataset. So internally I've encapsulated
>     an instance of ParallelArray<Integer> with a ReadWriteLock and then
>     exposing the wrapped methods:
>     public Integer sum() {
>        readLock.lock();
>        return pa.sum();
>     }
>     public size size() {
>         readLock.lock();
>        return pa.size();
>     }
>     public void set(int i, Integer x) {
>        writeLock.lock();
>        pa.set(i, x);
>     }
>     etc.
> 
>     I don't want to expose the ReadWriteLock, because someday I might want
>     to change the implementation. Now, say the client to calculate the
>     average of all integers in the ParallelArray. Normally it would just be
>     pa.sum() / pa.size(). However since pa.sum() and pa.size() are two
>     separate methods, somebody might have changed the dataset in the mean
>     time (I've not included the operations for changing the size). So I need
>     someway to calculate the sum and size in one 'atomic' operation.
> 
>     One solution I've thought is to add something resembling this method:
>     <U> U combineReduce(Combiner<T, U, U> c, Reducer<U> reducer);
> 
>     The idea is that first individual threads work on the combiner. And when
>     they are done, all the individual results are combined using the
>     reducer. Here is the code for both problems:
> 
>     1:
>     HashSet result = w.combineReduce(new Combiner<Integer, HashSet,
>     HashSet>() {
>         public HashSet combine(Integer t, HashSet u) {
>             if (u == null) {
>                 u = new HashSet();
>             }
>             u.add(t);
>             return u;
>         }
> 
>     }, new Reducer<HashSet>() {
>         public HashSet combine(HashSet t, HashSet v) {
>             t.addAll(v);
>             return t;
>         }
>     });
> 
>     2:
>     static class Aggregate {
>         public int count;
>         public long sum;
>     }
> 
>     Aggregate a = w.combineReduce(new Combiner<Integer, Aggregate,
>     Aggregate>() {
>         public Aggregate combine(Integer t, Aggregate u) {
>             if (u == null) {
>                 u = new Aggregate();
>             }
>             u.count++;
>             u.sum += t;
>             return u;
>         }
> 
>     }, new Reducer<Aggregate>() {
>         public Aggregate combine(Aggregate t, Aggregate v) {
>             t.count += v.count;
>             t.sum += v.sum;
>             return t;
>         }
>     });
>     System.out.println ("Average : " + (a.sum / a.count));
> 
>     Thoughts, or any other way to solve these problems?
> 
>     cheers
>       Kasper
>     _______________________________________________
>     Concurrency-interest mailing list
>     Concurrency-interest at altair.cs.oswego.edu
>     <mailto:Concurrency-interest at altair.cs.oswego.edu>
>     http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>     <http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest>
> 
> 
> 
> ------------------------------------------------------------------------
> 
> No virus found in this incoming message.
> Checked by AVG Free Edition. 
> Version: 7.5.503 / Virus Database: 269.17.1/1183 - Release Date: 13-12-2007 09:15



More information about the Concurrency-interest mailing list