[concurrency-interest] SynchronousQueue to use a LIFO for waitingTakes?

Guillaume Berche guillaume.berche@eloquant.com
Tue, 3 Feb 2004 14:24:16 +0100


Hello,

I've been happily using the concurrentLib version 1.3.1 for a long time, and
find it a great library, thanks to Doug Lea for making it available to the
community! I would like to suggest an optimization in the implementation of
the JSR-166 with respect to the ThreadPoolExecutor's keep alive mechanism
when using the a SynchronousQueue.

If I read the code well (and did not miss subtle differences with
concurrentLib version 1.3.1), the java.util.concurrent.SynchronousQueue uses
an internal FIFO to manage "waitingTakes". As a result, the threads which
have been waiting for the longuest time get new incoming "puts" first. As a
consequence, in the following scenario, the keep-alive mechanism would not
optimally operate as the number of threads would not decrease:

1- configure ThreadPoolExecutor's keep alive mechanism when using the a
SynchronousQueue, and a maximum pool thread of 100.
2- request a burst of work which creates 100 threads
3- once the burst is done, request a low but steady amount of work which in
average keeps 1 thread busy.
4- observe thread count remain to 100 instead of slowly decreasing to 1

If I understood the code well, this could be optimized by having
SynchronousQueue use a LIFO for managing waitingTakes, and hence ask the
same threads to work, letting extra idle threads being reclaimed when their
keep alive time is reached.

I hope the following code fragement written to use concurrentLib version
1.3.1 may be helpful in writing a Junit test to validate this optimization.

I hope this helps,

Guillaume.

-------------------------

/**
 * Tests the Executor instanciated by the ExecutorFactory
 */
public class TestExecutorFactory extends TestCase {

	public TestExecutorFactory(String name) {
		super(name);
	}


	/**
	 * Tests that when requested to run more than one task in concurrence, the
executor indeed runs them in concurrence.
	 * @throws Exception
	 */
	public void testConcurrentThreads() throws Exception {
		int maxThreads = 10;
		int maxTasks = 10;
		int taskSleepTime = 1000;
		int keepAliveTime = 5*1000;

		int expectedTimeToExecuteAllTasks = 500;
		int expectedMaxTimeToStartEachTask = 500;

		//
		// 1. tests that when smaller than maximun tasks are executed.
		//

		ThreadGroup threadGroup = new ThreadGroup("TestExecutorFactory");

		//Note: we don't use a buffer: we want to execute at most max threads in
concurrence and then
		//start blocking the caller thread when reached. Using a buffer has two
drawbacks:
		//1- it does not have flow control unless the buffer is bounded
		//2- the PooledExecutor will always prefer queuing if nb current threads >
minThreadPool. More over
		//it will try to create minThreadPool threads even if some threads are
idle waiting for some work.
		PooledExecutor pooledExecutor1 = new PooledExecutor(maxThreads);

		//Have a slightly lower priority to let core threads have more priority.
This is supposed
		//to avoid concurrent fetches
		pooledExecutor1.setThreadFactory(new
ThreadFactoryImpl(Thread.MAX_PRIORITY, "TestExecutorFactory", threadGroup));

		//10 minutes keep alive time to avoid garbage collecting threads too
		//frequently (in previous JDK versions this used to induce memory leaks)
		pooledExecutor1.setKeepAliveTime(keepAliveTime);

		pooledExecutor1.waitWhenBlocked();
		Executor pooledExecutor = pooledExecutor1;
		TestRunnable [] startedRunnables = new TestRunnable [maxTasks];

		long startRequestTime = System.currentTimeMillis();

		//First start all runables.
		for (int taskIndex=0; taskIndex<maxTasks; taskIndex++) {
			TestRunnable testRunnable = new TestRunnable(taskSleepTime);
			pooledExecutor.execute(testRunnable);
			startedRunnables[taskIndex] = testRunnable;
		}

		//Then wait for all task to be started
		for (int taskIndex=0; taskIndex<maxTasks; taskIndex++) {
			TestRunnable testRunnable = startedRunnables[taskIndex];
			Semaphore wasStartedSemaphore = testRunnable.getWasStartedSemaphore();
			assertTrue("task #" + taskIndex + " not started within expected timeout",
wasStartedSemaphore.attempt(expectedMaxTimeToStartEachTask));
		}

		long requestsAllStartedTime = System.currentTimeMillis();
		assertTrue("request probably did not start in concurrence since time to
start them all is larger than task sleep time", requestsAllStartedTime -
startRequestTime < taskSleepTime);

		//Then wait for all tasks to complete
		for (int taskIndex=0; taskIndex<maxTasks; taskIndex++) {
			TestRunnable testRunnable = startedRunnables[taskIndex];
			Semaphore completedSemaphore = testRunnable.getCompletedSemaphore();
			assertTrue("task #" + taskIndex + " not completed within expected
timeout", completedSemaphore.attempt(taskSleepTime * 2));
		}

		long requestsCompletedTime = System.currentTimeMillis();
		assertTrue("problem completing tasks. took longer than expected",
requestsCompletedTime - startRequestTime < taskSleepTime +
expectedTimeToExecuteAllTasks);

        assertEquals("invalid number of active threads", maxTasks,
threadGroup.activeCount());


		//
		// 2. test that after high load is gone, number of threads properly
decrease
		//

		int maxTestDuration = 30 * 1000;
        long startTime = System.currentTimeMillis();
		int extraTimeAfterKeepAllive = 5000;

		while(System.currentTimeMillis() - startTime < maxTestDuration) {

			//Start a task
			TestRunnable testRunnable = new TestRunnable(taskSleepTime);
			pooledExecutor.execute(testRunnable);
			Semaphore wasStartedSemaphore = testRunnable.getWasStartedSemaphore();
			assertTrue("task not started within expected timeout",
wasStartedSemaphore.attempt(expectedMaxTimeToStartEachTask));
			Semaphore completedSemaphore = testRunnable.getCompletedSemaphore();
			assertTrue("task not completed within expected timeout",
completedSemaphore.attempt(taskSleepTime * 2));

			//sleep a while
			Thread.sleep(10);

			if (System.currentTimeMillis() - startTime > keepAliveTime +
extraTimeAfterKeepAllive) {
				assertEquals("expected other threads to be stopped by now. threadGroup:"
+ threadGroup, 1, threadGroup.activeCount());
			}
		}

	}

	/**
	 * A dummy task which simply sleeps for a given time. This is necessary to
make sure different thread indeed run in concurrence.
	 */
	private static class TestRunnable implements Runnable {

		private Semaphore _wasStartedSemaphore = new Semaphore(0);
		private Semaphore _completedSemaphore = new Semaphore(0);
		private int _sleepTime;

		//
		// Runnable API
		//

		public void run() {
			_wasStartedSemaphore.release();
			try {
				Thread.sleep(_sleepTime);
			} catch (InterruptedException e) {
				if (Assert.enabled) {
					Assert.condition(false, "unexpected interruption!");
				}
			}
			_completedSemaphore.release();
		}

		//
		// Test API
		//

		public TestRunnable(int sleepTime) {
			_sleepTime = sleepTime;
		}

		/**
		 * Provides the semaphore which indicates the task was started
		 */
		public Semaphore getWasStartedSemaphore() {
			return _wasStartedSemaphore;
		}

		/**
		 * Provides the semaphore which indicates the task completed.
		 */
		public Semaphore getCompletedSemaphore() {
			return _completedSemaphore;
		}

	}

}