[concurrency-interest] Questions about ThreadPoolExecutor

Brian Goetz brian at quiotix.com
Thu Mar 2 17:13:58 EST 2006


[ I think this message got eaten somehow ]

>> "If you set the maximum pool size on your ThreadPoolExecutor and assign
> CallerRunsPolicy() as the rejectedExecutionHandler, then tasks
> submitted while the active thread limit has been reached will be
> executed in the submitting thread."
> 
> Thanks for the above hint.  This should fit part of my needs.  By using
> this policy, it should avoid task deadlocks even if my pool is bounded,
> correct?

Only with a zero-length task queue (i.e., SynchronousQueue.)  Otherwise,
the same deadlock risks apply.

>> "In your case, you'd write a BatchExecutor that submitted its tasks to
> a master executor. "
> 
> Since my original problem is to be
> 
> 1) - able to specify the number of threads to run each batch of tasks.
> (i.e. I have a batch of 100 tasks; I only want to allocate 6 threads to
> be running them even if the pool has more than 6 threads available.  The
> reasons are one, I don't want one batch to take away all the thread
> resources and two, if the tasks are big stress to the db, I simply want
> the control to be able to specify less threads to run them.)
> 2) - bound the size of the pool
> 3) - avoid task deadlocks
> 4) - would need something like invokeAll(..) in ExecutorService to wait
> and get my result back. 
> 5) - nice to be able to reuse threads in pool instead of creating an
> Executor for each batch since some batch might be small, some are large.
> 
> In your suggestion of this BatchExecutor, in esssense, there is really
> one Executor, I am not so clear on how I would achieve 1 and 4?  Would
> my BatchExecutor has to be wise in figuring out the pace of dispatching
> to the master executor so to satisfy X number of threads running the
> batch of tasks?

Here's another trick that might help.  It doesn't even required the
cascading executors -- if you're willing for your invokeAll to be
synchronous (which I think is OK by your requirements.)

You create a shared pool with 100 threads.  For each batch, you create a
semaphore with the number of threads that batch is allowed to use.  Each
permit represents the right to submit a task concurrently to the main
pool.  The permit is acquired before submitting the task, and released
by the task when its done.  I think this, along with the trick above of
combining caller-runs with SynchronousQueue meets all your requirements.

     public static ExecutorService exec
         = Executors.newFixedThreadPool(100);

     class Batch<T> {
         private final Collection<Callable<T>> tasks;
         private final Semaphore sem;

         public Batch(Collection<Callable<T>> tasks, int nThreads) {
             this.tasks = tasks;
             sem = new Semaphore(nThreads);
         }

         public Collection<Future<T>> invokeAll() throws
InterruptedException {
             List<Future<T>> results = new ArrayList<Future<T>>();
             for (final Callable<T> task : tasks) {
                 sem.acquire();
                 results.add(exec.submit(
                         new Callable<T>() {
                             public T call() throws Exception {
                                 try {
                                     return task.call();
                                 }
                                 finally {
                                     sem.release();
                                 }
                             }
                         }));
             }
             return results;
         }
     }




More information about the Concurrency-interest mailing list