[concurrency-interest] Fwd: ThreadPoolTask

Jean Morissette jean.morissette at gmail.com
Sun Jul 9 14:57:19 EDT 2006


As Peter has explained, there are some issues with what David has suggested.

My actual solution use a CopiesQueue consisting of n copies of the
specified task.  This queue is memory efficient and very performant
since it contains only a single reference to the task and use an
AtomicInteger to store the size.  Also, I use a Runnable decorator
that resubmit the task in the queue, so the task don't need to be
aware of the ThreadPool/Executor.

So, by setting the size of the queue to Integer.MAX_VALUE, I can
adjust the size of the pool just calling setCorePoolSize and
setMaxPoolSize with the same value.  Here is the code.  What do you
think?


public class ThreadPoolTask {

	private ThreadPoolExecutor executor;
	private Runnable task;

	public ThreadPoolTask(Runnable task) {
		this.task = new RepeatableTask(task);
		this.executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new CopiesQueue(Integer.MAX_VALUE, this.task));
	}

	public void setPoolSize(int size) {
		executor.setCorePoolSize(size);
		executor.setMaximumPoolSize(size);
	}

	public int getPoolSize() {
		return executor.getCorePoolSize();
	}

	public void stop() {
		executor.shutdown();
	}

	public void stopNow() {
		executor.shutdownNow();
	}

	public void awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
		executor.awaitTermination(timeout, unit);
	}

	private class RepeatableTask implements Runnable {

		Runnable delegate;

		public RepeatableTask(Runnable delegate) {
			this.delegate = delegate;
		}

		public void run() {
			delegate.run();
			executor.getQueue().offer(this);
		}
	}

	private static class CopiesQueue extends AbstractQueue implements
BlockingQueue {

		AtomicInteger size;
		Object element;

		public CopiesQueue(int initialSize, Object element) {
			size = new AtomicInteger(initialSize);
			this.element = element;
		}

		public boolean offer(Object o) {
			size.incrementAndGet();
			return true;
		}

		public boolean offer(Object o, long timeout, TimeUnit unit) throws
InterruptedException {
			size.incrementAndGet();
			return true;
		}

		public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
			size.decrementAndGet();
			return element;
		}

		public Object take() throws InterruptedException {
			size.decrementAndGet();
			return element;
		}

		public void put(Object o) throws InterruptedException {
			size.incrementAndGet();
		}

		public int remainingCapacity() {
			return Integer.MAX_VALUE;
		}

		public int drainTo(Collection c) {
			List copies = Collections.nCopies(size.getAndSet(0), element);
			c.addAll(copies);
			return copies.size();
		}

		public int drainTo(Collection c, int maxElements) {
			int oldSize;
			int newSize;
			boolean success;
			do {
				oldSize = this.size.get();
				newSize = Math.max(oldSize - maxElements, 0);
				success = size.weakCompareAndSet(oldSize, newSize);
			} while (!success);
			return oldSize - newSize;
		}

		public Object poll() {
			size.decrementAndGet();
			return element;
		}

		public Object peek() {
			return element;
		}

		public Iterator iterator() {
			return Collections.nCopies(size.get(), element).iterator();
		}

		public int size() {
			return size.get();
		}
	}


}


More information about the Concurrency-interest mailing list