[concurrency-interest] Re: Help, how to design a pool of threads

Andrej Navodnik andrejnavodnik at yahoo.com
Sat Nov 5 09:58:48 EST 2005


Hi all,

let's say that I did some homework ;-) Here is my
code:

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Test {
  public static void main(String... args) {
    int N = 100;
    FindOptimalElement s = new FindOptimalElement(
        prepareTestData(N));
    System.out.println(
        "Waiting for optimal element...");
    Result r = s.get();
    System.out.println("Result: " + r);
    System.exit(0);
  }
  
  static List<Pair> prepareTestData(int n) {
    Random r = new Random();
    List<Pair> testData = new ArrayList<Pair>();
    for (int i = 0; i < n; i++) {
      double a = r.nextDouble();
      double b = r.nextDouble();
      testData.add(new Pair(a, b));
    }
    return testData;
  }
}

interface OptimalElement<E> {
  boolean isOptimalElement(E e);
}

class FindOptimalElement {
  private static final int NUMBER_WORKERS = 10;
  private final AtomicBoolean optimalElementFound;
  private final AtomicInteger testedData;
  private final int numberOfData;
  private final BlockingQueue<Future<Result>> workers;
  private final ReentrantLock dataLock;
  private volatile Result result;
  private volatile Condition resultPrepared;
  private boolean resultDefined;
  private final ThreadPoolExecutor service;
  
  FindOptimalElement(List<Pair> data) {
    this.numberOfData = data.size();
    this.optimalElementFound = 
        new AtomicBoolean(false);
    this.testedData = new AtomicInteger(0);
    this.dataLock = new ReentrantLock();
    this.resultPrepared = dataLock.newCondition();
    this.workers = 
        new ArrayBlockingQueue<Future<Result>>(
        NUMBER_WORKERS);
    
    int corePoolSize = NUMBER_WORKERS;
    int maximumPoolSize = NUMBER_WORKERS;
    long keepAliveTime = 1;
    BlockingQueue<Runnable> workQueue =
        new LinkedBlockingQueue<Runnable>();
    service = new ThreadPoolExecutor(corePoolSize,
        maximumPoolSize, keepAliveTime,
        TimeUnit.SECONDS, workQueue);
    service.prestartAllCoreThreads();
    
    service.execute(new Producer(data));
    service.execute(new Consumer());
  }
  
  // how should I cancel/stop other threads, 
  // is this possible??
  private void cancelRunnables(
      Runnable excludeRunnable) {
    BlockingQueue<Runnable> queue=service.getQueue();
    Runnable[] runnables = (Runnable[]) Array
        .newInstance(Runnable.class, NUMBER_WORKERS);
    queue.toArray(runnables);
    for (Runnable r : runnables) {
      System.out.println("Task: " + r);
      if (r != null && !r.equals(excludeRunnable)) {
        System.out.println("Removed task: " + r);
        service.remove(r);
      }
    }
    service.purge();
    System.out.println("Other threads in the pool " +
        "should be canceled/removed, are they?");
  }
  
  class Producer implements Runnable {
    private final List<Pair> testData;
    Producer(List<Pair> testData) {
      this.testData = testData;
    }
    public void run() {
      try {
        for (Pair p : testData) {
          Callable<Result> c = new Worker(p);
          Future<Result> task = service.submit(c);
          workers.put(task);
        }
      } catch (InterruptedException ie) {
        ie.printStackTrace();
      }
    }
  }
  
  class Consumer implements Runnable {
    public void run() {
      for (;;) {
        try {
          final Future<Result> f = workers.take();
          Runnable task = new PrepareResult(f);
          service.execute(task);
        } catch (InterruptedException ie) {
          ie.printStackTrace();
          return ;
        }
      }
    }
    
    class PrepareResult implements Runnable {
      private final Future<Result> f;
      PrepareResult(Future<Result> f) {
        this.f = f;
      }
      public void run() {
        try {
          Result r = f.get();
          dataLock.lock();
          try {
            if (r != null) {
              result = r;
              resultDefined = true;
              resultPrepared.signal();
            }
            if (!resultDefined) {
              int n = testedData.incrementAndGet();
              if (n==numberOfData) {
                System.out.println(
                    "All data tested, " +
                    "no optimal element found...");
                resultDefined = true;
                resultPrepared.signal();
              }
            }
          } finally {
            dataLock.unlock();
          }
          
        } catch (ExecutionException ee) {
          ee.printStackTrace();
        } catch (InterruptedException ie) {
          ie.printStackTrace();
        }
      }
    }
  }
  
  public Result get() {
    dataLock.lock();
    try {
      while (resultDefined == false)
        resultPrepared.await();
    } catch (InterruptedException ex) {
      ex.printStackTrace();
    } finally {
      dataLock.unlock();
    }
    return result;
  }
  
  private class Worker implements Callable<Result>,
      OptimalElement<Pair> {
    private final Pair p;
    
    Worker(Pair p) {
      this.p = p;
    }
    
    public Result call() {
      if (optimalElementFound.get()) {
        return null;
      }
      System.out.println(
        "Seaching for optimal element...");
      try {
        Thread.sleep(1000);
      } catch (InterruptedException ex) {
        ex.printStackTrace();
      }
      
      // check if the element satisfies intermediate
      // condition
      if (isOptimalElement(p)) {
        System.out.println(
          "Optimal element is found...");
        for(;;) {
          if (!optimalElementFound.get()) {
            optimalElementFound.compareAndSet(
              false, true);
            if (optimalElementFound.get()) {
              break;
            }
          } else
            return null;
        }
        
        // if proper element is found then 
        // cancel other tasks, 
        // actually this does not work
        System.out.println("Cancel other threads...");
        Thread currentThread = Thread.currentThread();
        cancelRunnables(currentThread);
        
        try {
          // simulate hard work, to find solution
          Thread.sleep(10000);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
        
        // final phase, prepare result
        System.out.println(Thread.currentThread() +
            ": Returning optimal element...");
        return new Result(p, p.a+p.b);
      } else {
        try {
          // simulate useless job that should
          // not be executed
          Thread.sleep(2000);
        } catch (InterruptedException ex) {
          ex.printStackTrace();
        }
        if (optimalElementFound.get()) {
          System.out.println(Thread.currentThread() +
              ": Task is still doing useless " + 
              "things...");
        }
        return null;
      }
    }
    
    public boolean isOptimalElement(Pair p) {
      return  p.a > 0.8 && p.b > 0.8;
      // simulate hard to find optimal element, result

      // should be null;
      //return  p.a > 0.99 && p.b > 0.99;
    }
  }
}


class Result {
  public final double sum;
  public final Pair p;
  Result(Pair p, double sum) {
    this.p = p;
    this.sum = sum;
  }
  public String toString() {
    return "Input: " + p + " ==> result: " + sum;
  }
}

class Pair {
  public final double a;
  public final double b;
  Pair(double a, double b) {
    this.a = a;
    this.b = b;
  }
  public String toString() {
    return "[a: " + a + ", b: " + b + "]";
  }
}


If the input data set contains optimal data then the 
output of the program is the following:

Waiting for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Seaching for optimal element...
Optimal element is found...
Cancel other threads...
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Task: null
Other threads in the pool should be canceled/removed,
are they?
Thread[pool-1-thread-6,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-5,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-1,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-3,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-4,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-10,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-9,5,main]: Task is still doing   
             useless things...
Thread[pool-1-thread-7,5,main]: Task is still doing
useless things...
Thread[pool-1-thread-8,5,main]: Returning optimal
element...
Result: Input: [a: 0.9554837437185378, b:
0.8581452336165145] ==> result: 1.8136289773350525

My questions are:
- how should I design my application so that the tasks

in the pool would be canceled/stopped after the 
optimal element is found except for the task 
that has found that “optimal element” - I wouldn't 
like to see the message “Task is still doing useless
things” because it's actually using resources that
should be used fior preparing the final result;
- should I concern about this problem or not?
- are there any other constructs that I could 
use and achieve the same result?

Best regards,
Andrei

P.S.: Please don't laugh at my code, I'm still 
beginner as far as concurrent programming is
concerned...


		




		
__________________________________ 
Yahoo! FareChase: Search multiple travel sites in one click.
http://farechase.yahoo.com


More information about the Concurrency-interest mailing list