[concurrency-interest] ThreadPoolExecutor

Patrick Eger peger at automotive.com
Wed Oct 5 23:14:40 EDT 2005


Hello and thank you for your response.  I forgot {to mention} the extra
"workersBlocked" variable I added to keep track of the # of threads
waiting for a task to execute.  This obviously adds the extra expense of
the atomic inc/dec pair for every execute() (told you it was hacky :-).
There are definite race issues here as well, but it think it is safe in
that the only problem would be if the pool shrinks to zero size after we
offer it to the queue.  "if (poolSize == 0)" clause below protects
against this case I think...

....
private final AtomicInteger workersBlocked = new AtomicInteger(0);
....
workersBlocked.incrementAndGet();
try {
	// untimed wait if core and not allowing core timeout
	if (poolSize <= corePoolSize && !allowCoreThreadTimeOut)
      	return workQueue.take();
    
      long timeout = keepAliveTime;
	if (timeout <= 0) // die immediately for 0 timeout
      	return null;
      Runnable r = workQueue.poll(timeout, TimeUnit.NANOSECONDS);
	if (r != null)
      	return r;
	if (poolSize > corePoolSize || allowCoreThreadTimeOut)
      	return null; // timed out
      // Else, after timeout, the pool shrank. Retry
     	break;	
} finally {
	workersBlocked.decrementAndGet();
}
....


It uses the "workersBlocked" as an estimate of whether or not a thread
is immediately available to execute the Runnable, though this will be
incorrect under heavily concurrent calls to execute().  I would lkove to
find a better way to do this, maybe an atomic decrement semaphore-style
"reservation" would allow correctness in the face of multiple concurrent
executes()? This would still be an additional atomic dec/inc pair (one
in execute(), one in the executor thread when it picks the task up).

Your queue approach is also interesting but sound much more complex and
error prone, plus it would really complexify the (currently simple)
interface between the thread pool and the queue. 

I would gladly pay this inc/dec cost (as a configurable option, off by
default for backwards compatibility sake and so should not effect
existing users) for the better (IMO of course) thread creation
behaviour.

Do you guys see this as generally useful?  Would a patch using the
(configurable, off by default and so would only add a cachable read to
the fastpath) inc/dec reservation-style approach be considered for
review (the above is a bit of a hack and probably has some bad
characteristics under heavy concurrent use as David has pointed out)? Or
should I pursue the queue-based approach as suggested?

Thanks again for everyone's help!

P.S. I have confirmed via a small test case that a corePoolSize of zero
will result in submitted tasks *never* executing.  They are infinitely
offer()ed up to the queue but there are no threads available to process.
In effect the corePoolSize becomes the "max" pool size with an infinite
queue, which does not give me the behaviour I desire.

P.P.S. Am I still confused and not knowing what I want?  I assumed this
behaviour is what most people would want for a dynamic work queue
(0-1000 threads) with bursty request patterns (0-1 million+ on the queue
ant any given point), but I cannot find much in the archives...

Best regards,

Patrick


-----Original Message-----
From: David Holmes [mailto:dholmes at dltech.com.au] 
Sent: Wednesday, October 05, 2005 5:18 PM
To: Patrick Eger; concurrency-interest at altair.cs.oswego.edu
Subject: RE: [concurrency-interest] ThreadPoolExecutor

Patrick,

Your requirements can't be met directly by the current implementation.
The current design works, as you know, by having a core-pool that can
either be pre-started or lazily created, and which always stays around
(in 1.6 idle core threads can timeout but that just takes you back to
the lazy startup mode). Once you have your core pool you only create
more threads (up to max) if your queue fills up - if the queue never
fills then no more threads will be created. If you reach the maximum
size with a full queue the task is rejected.

> 1) {infinite | bounded} linked list request queue, where the bound is 
> really high to avoid OOM (100000+)
> 2) MIN of 0 threads in pool
> 3) MAX # of threads in pool, configurable on pool create
> 4) reuse existing threads if available
> 5) allow pool to shrink to zero threads if there are no outstanding 
> requests

Requirement (4) requires a way to detect if threads are "available". But
what does this mean? A worker thread is either executing a task or
blocked waiting for a task to appear in the BlockingQueue. If it is
blocked then it is "available", but to use it you have to submit your
task to the queue. To know if it is blocked you need to keep track of
it, which is presumably what this code is doing:

> //NOTE: Added this for less thread-crazy behaviour if 
> (workersBlocked.get() > 0) {
> 	if(workQueue.offer(command)) {
> 		//HACK: this should protect against the pool shrinking,
should be 
> very rare...
> 		if (poolSize == 0)
> 			addIfUnderCorePoolSize(new Runnable(){ public
void run() {} });
>
> 		return;
> 	}
> }

However this sort of check requires atomicity that isn't normally
present in the ThreadPoolExecutor. So to do this right requires
additional locking otherwise two incoming tasks can see one available
worker and assume the worker will run their task, when it fact one task
will remain in the queue and the pool could have queued tasks but less
than max (or even core) threads.

So if you really want this you have to pay a price to get it.

> Everything is working correctly as per the docs AFAIK, just seemingly 
> counterintuitively. It seems quite pointless and lower performant to 
> be creating new threads while existing ones are idle and available to 
> process work.

The assumption is that the core pool will be quite steady so if you
don't create a core thread this time, the expected usage means you are
going to create it very soon anyway. If you pre-start the core then you
don't create new threads until your queue is full.

> Is this just a bad interaction between ThreadPoolExecutor and 
> LinkedBlockingQueue?  Is there another queue type that will work for 
> me or thread pool option I am missing?

It seems to me - and I could be misunderstanding things - that what you
want is a "dual-mode" queue. Set the core size at zero and what you want
is to submit the task to the queue, if a thread is waiting, else create
a thread.
This is where you need a synchronous queue - it has no capacity, so if
no thread is waiting then offer() will fail and from the pool's
perspective the queue is "full" and so a new thread will be created if
under max. But when max is reached you now want a queue that has
capacity. You could use the RejectedExecutionHandler to then make your
queue (you'd need to define a custom queue for this) switch to act as a
(finite) linked blocking queue. If the normal queue detects it is empty
then it switches back to synchronous mode. I *think* that would meet
your requirements *but* I don't know if what I just described can
actually be implemented. Interesting to think about it anyway :-)

Cheers,
David Holmes





More information about the Concurrency-interest mailing list