[concurrency-interest] Fwd: ThreadPoolTask

Peter Veentjer alarmnummer at gmail.com
Mon Jul 10 01:14:59 EDT 2006


A Repeater needs only a single runnable that it keeps repeating. No
strange things like resubmitting tasks or strange 'copyqueues'.

I'm still working on the code, so it could contain bugs.

---------------------------------------------------------------------

package org.jph.concurrent.repeater;

import org.jph.concurrent.LockUtil;
import org.jph.concurrent.StdThreadFactory;
import org.jph.concurrent.awaitablereference.RelaxedAwaitableReference;

import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * The ThreadPoolRepeaterService is the default implementation of the
 * {@link RepeaterService}. It uses a threadpool.
 *
 * @author Peter Veentjer.
 */
public final class ThreadPoolRepeaterService implements RepeaterService {

	private final static Runnable DUMMY_TASK = new DummyTask();


	//number of threads.
	//this field has to be volatile because it is shared between different
	//threads.
	private volatile int _threadCount;

	//factory for new threads
	private final ThreadFactory _threadFactory;
	//Set containing all worker threads
	private final HashSet<Worker> _workers = new HashSet<Worker>();

	//Contains the current number of threads.
	private final AtomicInteger _currentThreadCount = new AtomicInteger(0);

	//Lifecycle state
	private volatile RepeaterServiceState _runstate =
RepeaterServiceState.NOT_STARTED;

	private final RelaxedAwaitableReference<Runnable> _awaitableTask =
new RelaxedAwaitableReference<Runnable>();

	private final Lock _mainLock = new ReentrantLock();

	/**
	 * Creates a StdThreadFactory with a single thread, a StdThreadFactory and
	 * no task to execute.
	 */
	public ThreadPoolRepeaterService() {
		this(1);
	}

	/**
	 * Creates a ThreadPoolRepeaterService with the given initial
parameters, a StdThreadFactory
	 * and no task to execute.
	 *
	 * @param threadCount the number of threads.
	 * @throws IllegalArgumentException if threadCount < 0.
	 */
	public ThreadPoolRepeaterService(int threadCount) {
		this(threadCount, new StdThreadFactory("repeaters"));
	}

	/**
	 * Creates a ThreadPoolRepeaterService
	 *
	 * @param threadCount   the number of threads.
	 * @param threadFactory the ThreadFactory that is used to create threads for
	 *                      the threadpool.
	 * @throws IllegalArgumentException if threadcount smaller than 0.
	 * @throws NullPointerException	 if threadFactory is null.
	 */
	public ThreadPoolRepeaterService(int threadCount, ThreadFactory
threadFactory) {
		this(threadCount, threadFactory, null);
	}

	/**
	 * Creates a new ThreadPoolRepeaterService (that is not started).
	 *
	 * @param threadCount   the number of threads to use.
	 * @param threadFactory the ThreadFactory to use for creating threads.
	 * @param runnable	  the task to repeat (can be null).
	 * @throws IllegalArgumentException if threadCount smaller than 0.
	 * @throws NullPointerException	 if threadFactory is null.
	 */
	public ThreadPoolRepeaterService(int threadCount, ThreadFactory
threadFactory, Runnable runnable) {
		if (threadFactory == null) throw new NullPointerException();
		if (threadCount < 0) throw new IllegalArgumentException();
		_threadCount = threadCount;
		_threadFactory = threadFactory;
		try {
			_awaitableTask.setNewReference(runnable);
		} catch (InterruptedException e) {
			e.printStackTrace();  //To change body of catch statement use File
| Settings | File Templates.
		}
	}

	/**
	 * Returns the number of threads. The returned value does not
	 * have to correspond with the actual number of running threads:
	 * if this method is called after the poolsize has been changed
	 * but before the change is picked up.
	 *
	 * @see #getCurrentThreadCount()
	 */
	public int getThreadCount() {
		return _threadCount;
	}

	/**
	 * Returns the current number of Running threads. This value can be stale
	 * if the threadcount has been changed in the meanwhile.
	 *
	 * @see #getThreadCount()
	 */
	public int getCurrentThreadCount() {
		return _currentThreadCount.intValue();
	}

	public void increaseThreadCount(int delta) {
		throw new UnsupportedOperationException();
	}

	/**
	 * Sets the number of threads this ThreadPoolRepeaterService uses to
	 * execute.
	 *
	 * @param newThreadCount the number of threads.
	 * @throws IllegalArgumentException if newThreadCount < 0.
	 */
	public void setThreadCount(int newThreadCount) {
		if (newThreadCount < 0) throw new IllegalArgumentException();

		_mainLock.lock();
		try {
			if (_runstate == RepeaterServiceState.NOT_STARTED) {
				//the Repeater isn`t running yet, the number of threads
				//can be increased because no threads are created yet.
				_threadCount = newThreadCount;
			} else if (_runstate != RepeaterServiceState.RUNNING) {
				//a task can only be repeated if this RepeaterService is running.
				String msg = "This RepeaterService doesn't accept new tasks to repeat";
				throw new RejectedExecutionException(msg);  //todo: exception
			} else {
				//the Repeater is running.
				int delta = newThreadCount - getCurrentThreadCount();
				if (delta == 0) {
					//the poolsize stays the same
					return;
				}

				if (delta > 0) {
					//additional threads have to be added
					for (int k = 0; k < delta; k++) {
						Thread thread = createAndRegisterWorker();
						thread.start();
					}
				} else {
					//threads have to be destroyed.
					Iterator<Worker> workerIt = _workers.iterator();
					for (int k = 0; k < -delta; k++) {
						Worker worker = workerIt.next();
						worker.die();
						_workers.remove(worker);
					}
				}
				_threadCount = newThreadCount;
			}

		} finally {
			_mainLock.unlock();
		}
	}


	/**
	 * Returns the ThreadFactory this ThreadPoolRepeaterService uses to create
	 * threads.
	 */
	public ThreadFactory getThreadFactory() {
		return _threadFactory;
	}

	public synchronized Runnable getTask() {
		try {
			return _awaitableTask.lendValue(-1, TimeUnit.MICROSECONDS);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);//todo
		}
	}

	public RepeaterServiceState getState() {
		return _runstate;
	}

	/**
	 * Makes sure that the Repeater is ready for repeating. A Repeater is
	 * ready for repeating if:
	 * <ul>
	 * <li>if is has not been started yet. If this is the case, it is
	 * started.</li>
	 * <li>if it is started. It doesn't matter if it is started
	 * multiple times.
	 * </li>
	 * <ul>
	 * In all other cases, a RejectedExecutionException will be thrown.
	 * <p/>
	 * This call requires the mainlock, but doesn't lock it itself.
	 * So the caller is responsible for this task.
	 *
	 * @throws RejectedExecutionException if this Repeater is not in the correct
	 *                                    state to execute the task.
	 */
	private void ensureRepeaterReadyForRepeating() {
		switch (_runstate) {
			case NOT_STARTED:
				//the repeater has not been started, so start it.
				startRepeater();
				break;
			case RUNNING://everything is ok
				break;
			default:
				String msg = "This RepeaterService doesn't accept new tasks to repeat";
				throw new RejectedExecutionException(msg);
		}
	}

	public boolean repeat(Runnable task, long timeout, TimeUnit unit)
throws InterruptedException {
		if (task == null || unit == null) throw new NullPointerException();

		long timeoutNs = unit.toNanos(timeout);
		timeoutNs = LockUtil.tryLockNanos(_mainLock, timeoutNs);
		if (timeoutNs <= 0)
			return false; //the mainlock could not be acquired.

		try {
			ensureRepeaterReadyForRepeating();
			return _awaitableTask.tryPutReference(task, timeoutNs, TimeUnit.NANOSECONDS);
		} finally {
			_mainLock.unlock();
		}
	}

	public void repeat(Runnable task) throws InterruptedException {
		_mainLock.lockInterruptibly();
		try {
			ensureRepeaterReadyForRepeating();
			_awaitableTask.setNewReference(task);
		} finally {
			_mainLock.unlock();
		}
	}

	public void start() {
		try {
			_mainLock.lock();
			startRepeater();
		} finally {
			_mainLock.unlock();
		}
	}

	/**
	 * Starts this RepeaterService. This method doesn`t lock the mainLock,
	 * that is the responsibility of the caller of this method.
	 * <p/>
	 * If this method is called while the runstate is different
	 * than NOT_STARTED, then an IllegalStateException will be thrown.
	 */
	private void startRepeater() {
		if (_runstate != RepeaterServiceState.NOT_STARTED) throw new
IllegalStateException();

		_runstate = RepeaterServiceState.RUNNING;

		for (int k = 0; k < _threadCount; k++) {
			Thread thread = createAndRegisterWorker();
			thread.start();
		}
	}

	/**
	 * Creates a new (unstarted) worker and registers it.
	 *
	 * This method doesn`t lock the required resources, this is
	 * the task of the callee.
	 *
	 * @return the created worker Thread.
	 */
	private Thread createAndRegisterWorker() {
		Worker worker = new Worker();
		Thread thread = _threadFactory.newThread(worker);
		worker.setThread(thread);
		_workers.add(worker);
		_currentThreadCount.incrementAndGet();
		return thread;
	}

	/**
	 * Deregisters a Worker. The Worker is removed from the workers and if it is
	 * the last worker, and this ThreadPoolRepeaterService is terminating, it sets
	 * the state to terminated.
	 *
	 * This method is threadsafe (it locks the mainlock itself).
	 *
	 * @param worker
	 */
	private void deregisterWorker(Worker worker) {
		_mainLock.lock();

		try {
			//remove the worker
			_workers.remove(worker);

			boolean isLastWorker = _workers.isEmpty();
			boolean isTerminating = _runstate == RepeaterServiceState.TERMINATING;

			//if this ThreadPoolRepeaterService is shutting down, and there are
no more workers
			//then this ThreadPoolRepeaterService is terminated.
			if (isLastWorker && isTerminating)
				_runstate = RepeaterServiceState.TERMINATED;
		} finally {
			_mainLock.unlock();
		}

		_currentThreadCount.decrementAndGet();
	}

	public void shutdown() throws InterruptedException {
		_mainLock.lock();
		try {
			switch (_runstate) {
				case NOT_STARTED:
					//if the service has not been started, it can be terminated immediately.
					_runstate = RepeaterServiceState.TERMINATED;
					break;
				case RUNNING:
					if (getCurrentThreadCount() == 0) {
						//there are not workers, this service is terminated
						_runstate = RepeaterServiceState.TERMINATED;
					} else {
						//there are workers, this service is going in the terminating state
						//untill all workers are terminated.
						_runstate = RepeaterServiceState.TERMINATING;

						//insert a dummy task in the waitabletask. If a worker
						//is waiting for a task to execute, it gets this dummy task,
						//executes it and terminates.
						//If this dummy task isn`t added, it could happen a worker
						//keeps waiting for a task that is never going to be executed.
						//And this means this ThreadPoolRepeaterService is never going
to shutdown.
						_awaitableTask.setNewReference(DUMMY_TASK);
					}
					break;
				case TERMINATING:
					//nothing needs to be done, this ThreadPoolRepeaterService
already is shuttingdown.
					break;
				case TERMINATED:
					//nothing needs to be done. this ThreadPoolRepeaterService
already is shutdown
					break;
				default:
					throw new AssertionError();
			}
		} finally {
			_mainLock.unlock();
		}
	}

	//idea:shutdownAndWait

	private static class DummyTask implements Runnable {
		public void run() {
			//do nothing
		}
	}

	private class Worker implements Runnable {

		//a flag to indicate that this worker should stop
		//This field should be volatile because it is used in a multithreaded
		//environment.
		private volatile boolean _die = false;
		//the Thread that executes the Worker. This field is 'final'.
		private Thread _thread;

		/**
		 * Sets the threads that executes this Worker.
		 *
		 * @param thread
		 */
		private void setThread(Thread thread) {
			_thread = thread;
		}

		/**
		 * Signals the Worker it should stop working.
		 */
		private void die() {
			_die = true;
			_thread.interrupt();
		}

		public void run() {
			try {
				Runnable task = null;
				_awaitableTask.lendValue();
				while (_runstate == RepeaterServiceState.RUNNING && !_die) {
					try {
						task.run();
					} catch (RuntimeException ex) {
						//todo: error handeling
						ex.printStackTrace();
					} finally {
						_awaitableTask.returnValue(task);
					}

					task = _awaitableTask.lendValue();
				}

				_awaitableTask.returnValue(task);
			} catch (InterruptedException e) {
				//todo
			} finally {
				deregisterWorker(this);
			}
		}
	}
}

On 7/10/06, Dhanji R. Prasanna <dhanji at gmail.com> wrote:
> >From what I gathered from Peter, the specific use of a ThreadPool
> "Repeating" executor was that increasing the corePoolSize would
> automagically increase the number of tasks being executed ("repeated"). This
> is semantically quite different from what a TPE does or is meant to do and
> perhaps that is where the confusion has arisen.
>
> Repeating tasks (sequentially requeing them) was not the direct impedence as
> that can easily be solved with the task resubmitting itself or a specialized
> impl of a blocking queue which keeps queuing the same task instance.
>
> This is what I gathered from the aether... =)
>
>
> On 7/10/06, David Holmes <dcholmes at optusnet.com.au > wrote:
> >
> > That may well be, but so far I don't have a complete view of either all
> your
> > requirements or all of Peter's, so it is hard to offer suggestions.
> >
> > Cheers,
> > David
> >
> > _______________________________________________
> > Concurrency-interest mailing list
> > Concurrency-interest at altair.cs.oswego.edu
> >
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> >
>
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>
>


More information about the Concurrency-interest mailing list