[concurrency-interest] Re: Help, how to design a pool of threads

Joe Bowbeer joe.bowbeer at gmail.com
Sat Nov 5 10:55:15 EST 2005


First, I'm trying to understand the threading scheme.  It looks likethere is a producer that submits worker tasks to the pool and aconsumer that processes the results.  I suggest you move the producerand consumer into the same thread.  Perhaps this could be the mainthread.  That is, after you create the service, do the producer thingin the main thread next, submitting the worker tasks to the service;then do the consumer thing, processing the results.  If the mainthread won't do, move the producer and consumer to a single backgroundthread.
Moving the producer and consumer into a single thread leaves thethread pool dedicated to the worker tasks.  Easier to manage thatway...
> My questions are:
> - how should I design my application so that the tasks> in the pool would be canceled/stopped after the> optimal element is found except for the task> that has found that �optimal element� - I wouldn't> like to see the message �Task is still doing useless> things� because it's actually using resources that> should be used fior preparing the final result;
FutureTasks are cancelled by calling task.cancel(true);
If the task is still in the queue, this will prevent the task fromever running.  If the task is already actively running, this will alsointerrupt the thread that's running the task.
But the task must be responsive to interrupts.  That is, wheninterrupted, the task should promptly quit.  If the task could bewaiting on a lock when it is interrupted, I recommend usinglockInterruptibly().  If the task sleeps, then don't catch theInterruptedException, just let the task (ie, Callable.call) throwInterruptedException.  Finally, if the task is looping withoutblocking then it can check isCancelled() to see if it's beencancelled.
Your cancelRunnables method removes tasks from the service's queue butdoes not attempt to cancel running tasks.  Would it work better tocancel all the tasks in the workers list?  If the service is dedicatedto workers, though, then service.shutdownNow will do this for you.  Itcancels all active tasks.
> - should I concern about this problem or not?
Yes.
> - are there any other constructs that I could> use and achieve the same result?>
Use a completion service work for the consumer thing?
Joe.

On 11/5/05, Andrej Navodnik <andrejnavodnik at yahoo.com> wrote:> Hi all,>> let's say that I did some homework ;-) Here is my> code:>> import java.lang.reflect.Array;> import java.util.ArrayList;> import java.util.Iterator;> import java.util.List;> import java.util.Random;> import java.util.concurrent.ArrayBlockingQueue;> import java.util.concurrent.BlockingQueue;> import java.util.concurrent.Callable;> import java.util.concurrent.ExecutionException;> import java.util.concurrent.ExecutorService;> import java.util.concurrent.Executors;> import java.util.concurrent.Future;> import java.util.concurrent.LinkedBlockingQueue;> import java.util.concurrent.ThreadPoolExecutor;> import java.util.concurrent.TimeUnit;> import java.util.concurrent.atomic.AtomicBoolean;> import java.util.concurrent.atomic.AtomicInteger;> import java.util.concurrent.locks.Condition;> import java.util.concurrent.locks.ReentrantLock;>> public class Test {>   public static void main(String... args) {>     int N = 100;>     FindOptimalElement s = new FindOptimalElement(>         prepareTestData(N));>     System.out.println(>         "Waiting for optimal element...");>     Result r = s.get();>     System.out.println("Result: " + r);>     System.exit(0);>   }>>   static List<Pair> prepareTestData(int n) {>     Random r = new Random();>     List<Pair> testData = new ArrayList<Pair>();>     for (int i = 0; i < n; i++) {>       double a = r.nextDouble();>       double b = r.nextDouble();>       testData.add(new Pair(a, b));>     }>     return testData;>   }> }>> interface OptimalElement<E> {>   boolean isOptimalElement(E e);> }>> class FindOptimalElement {>   private static final int NUMBER_WORKERS = 10;>   private final AtomicBoolean optimalElementFound;>   private final AtomicInteger testedData;>   private final int numberOfData;>   private final BlockingQueue<Future<Result>> workers;>   private final ReentrantLock dataLock;>   private volatile Result result;>   private volatile Condition resultPrepared;>   private boolean resultDefined;>   private final ThreadPoolExecutor service;>>   FindOptimalElement(List<Pair> data) {>     this.numberOfData = data.size();>     this.optimalElementFound =>         new AtomicBoolean(false);>     this.testedData = new AtomicInteger(0);>     this.dataLock = new ReentrantLock();>     this.resultPrepared = dataLock.newCondition();>     this.workers =>         new ArrayBlockingQueue<Future<Result>>(>         NUMBER_WORKERS);>>     int corePoolSize = NUMBER_WORKERS;>     int maximumPoolSize = NUMBER_WORKERS;>     long keepAliveTime = 1;>     BlockingQueue<Runnable> workQueue =>         new LinkedBlockingQueue<Runnable>();>     service = new ThreadPoolExecutor(corePoolSize,>         maximumPoolSize, keepAliveTime,>         TimeUnit.SECONDS, workQueue);>     service.prestartAllCoreThreads();>>     service.execute(new Producer(data));>     service.execute(new Consumer());>   }>>   // how should I cancel/stop other threads,>   // is this possible??>   private void cancelRunnables(>       Runnable excludeRunnable) {>     BlockingQueue<Runnable> queue=service.getQueue();>     Runnable[] runnables = (Runnable[]) Array>         .newInstance(Runnable.class, NUMBER_WORKERS);>     queue.toArray(runnables);>     for (Runnable r : runnables) {>       System.out.println("Task: " + r);>       if (r != null && !r.equals(excludeRunnable)) {>         System.out.println("Removed task: " + r);>         service.remove(r);>       }>     }>     service.purge();>     System.out.println("Other threads in the pool " +>         "should be canceled/removed, are they?");>   }>>   class Producer implements Runnable {>     private final List<Pair> testData;>     Producer(List<Pair> testData) {>       this.testData = testData;>     }>     public void run() {>       try {>         for (Pair p : testData) {>           Callable<Result> c = new Worker(p);>           Future<Result> task = service.submit(c);>           workers.put(task);>         }>       } catch (InterruptedException ie) {>         ie.printStackTrace();>       }>     }>   }>>   class Consumer implements Runnable {>     public void run() {>       for (;;) {>         try {>           final Future<Result> f = workers.take();>           Runnable task = new PrepareResult(f);>           service.execute(task);>         } catch (InterruptedException ie) {>           ie.printStackTrace();>           return ;>         }>       }>     }>>     class PrepareResult implements Runnable {>       private final Future<Result> f;>       PrepareResult(Future<Result> f) {>         this.f = f;>       }>       public void run() {>         try {>           Result r = f.get();>           dataLock.lock();>           try {>             if (r != null) {>               result = r;>               resultDefined = true;>               resultPrepared.signal();>             }>             if (!resultDefined) {>               int n = testedData.incrementAndGet();>               if (n==numberOfData) {>                 System.out.println(>                     "All data tested, " +>                     "no optimal element found...");>                 resultDefined = true;>                 resultPrepared.signal();>               }>             }>           } finally {>             dataLock.unlock();>           }>>         } catch (ExecutionException ee) {>           ee.printStackTrace();>         } catch (InterruptedException ie) {>           ie.printStackTrace();>         }>       }>     }>   }>>   public Result get() {>     dataLock.lock();>     try {>       while (resultDefined == false)>         resultPrepared.await();>     } catch (InterruptedException ex) {>       ex.printStackTrace();>     } finally {>       dataLock.unlock();>     }>     return result;>   }>>   private class Worker implements Callable<Result>,>       OptimalElement<Pair> {>     private final Pair p;>>     Worker(Pair p) {>       this.p = p;>     }>>     public Result call() {>       if (optimalElementFound.get()) {>         return null;>       }>       System.out.println(>         "Seaching for optimal element...");>       try {>         Thread.sleep(1000);>       } catch (InterruptedException ex) {>         ex.printStackTrace();>       }>>       // check if the element satisfies intermediate>       // condition>       if (isOptimalElement(p)) {>         System.out.println(>           "Optimal element is found...");>         for(;;) {>           if (!optimalElementFound.get()) {>             optimalElementFound.compareAndSet(>               false, true);>             if (optimalElementFound.get()) {>               break;>             }>           } else>             return null;>         }>>         // if proper element is found then>         // cancel other tasks,>         // actually this does not work>         System.out.println("Cancel other threads...");>         Thread currentThread = Thread.currentThread();>         cancelRunnables(currentThread);>>         try {>           // simulate hard work, to find solution>           Thread.sleep(10000);>         } catch (InterruptedException ex) {>           ex.printStackTrace();>         }>>         // final phase, prepare result>         System.out.println(Thread.currentThread() +>             ": Returning optimal element...");>         return new Result(p, p.a+p.b);>       } else {>         try {>           // simulate useless job that should>           // not be executed>           Thread.sleep(2000);>         } catch (InterruptedException ex) {>           ex.printStackTrace();>         }>         if (optimalElementFound.get()) {>           System.out.println(Thread.currentThread() +>               ": Task is still doing useless " +>               "things...");>         }>         return null;>       }>     }>>     public boolean isOptimalElement(Pair p) {>       return  p.a > 0.8 && p.b > 0.8;>       // simulate hard to find optimal element, result>>       // should be null;>       //return  p.a > 0.99 && p.b > 0.99;>     }>   }> }>>> class Result {>   public final double sum;>   public final Pair p;>   Result(Pair p, double sum) {>     this.p = p;>     this.sum = sum;>   }>   public String toString() {>     return "Input: " + p + " ==> result: " + sum;>   }> }>> class Pair {>   public final double a;>   public final double b;>   Pair(double a, double b) {>     this.a = a;>     this.b = b;>   }>   public String toString() {>     return "[a: " + a + ", b: " + b + "]";>   }> }>>> If the input data set contains optimal data then the> output of the program is the following:>> Waiting for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Seaching for optimal element...> Optimal element is found...> Cancel other threads...> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Task: null> Other threads in the pool should be canceled/removed,> are they?> Thread[pool-1-thread-6,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-5,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-1,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-3,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-4,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-10,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-9,5,main]: Task is still doing>              useless things...> Thread[pool-1-thread-7,5,main]: Task is still doing> useless things...> Thread[pool-1-thread-8,5,main]: Returning optimal> element...> Result: Input: [a: 0.9554837437185378, b:> 0.8581452336165145] ==> result: 1.8136289773350525>> My questions are:> - how should I design my application so that the tasks>> in the pool would be canceled/stopped after the> optimal element is found except for the task> that has found that �optimal element� - I wouldn't> like to see the message �Task is still doing useless> things� because it's actually using resources that> should be used fior preparing the final result;> - should I concern about this problem or not?> - are there any other constructs that I could> use and achieve the same result?>> Best regards,> Andrei>> P.S.: Please don't laugh at my code, I'm still> beginner as far as concurrent programming is> concerned...>


More information about the Concurrency-interest mailing list