[concurrency-interest] Fwd: ThreadPoolTask

Peter Veentjer alarmnummer at gmail.com
Mon Jul 10 01:54:29 EDT 2006


Use the right tool for the job. The executor isn't made for continuous
repetition of a task and if you have to use strange solutions you
aren't useing the right abstraction.

Another problem what if you want have at most a single 'instance' of a
runnable being active. With an executor this is difficult to realise,
with the repeater I use a 'StrictAwaitableReference'. All workers need
to return their task, so the strictawaitablereference uses a rw-lock
to make sure all old jobs are returned before a new one can be set. If
it is allowed that different runnables are running concurrently, I can
use a much cheaper lock: the RelaxedAwaitableReference.



On 7/10/06, David Holmes <dcholmes at optusnet.com.au> wrote:
> Peter,
>
> If a Repeater simply has one or more worker threads that forever repeat
> executing the same task then it would seem to me that using a
> ThreadPoolExecutor with a custom queue that always returns theTask is the
> simplest way to go. The Repeater can pre-start core threads, or else submit
> to seed the initial execution. Cancellation of a thread might be harder as
> they will never be idle, but the queue can support checking for being
> interrupted too.
>
> But you know the details of how you want the Repeater to be able to
> grow/shrink and perhaps those requirements don't map well to TPE. But then
> again you could always grow/shrink by throwing away one TPE and replacing
> with another :)
>
> As I said I can't see the full set of requirements for this service.
>
> Cheers,
> David Holmes
>
> > -----Original Message-----
> > From: concurrency-interest-bounces at cs.oswego.edu
> > [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of Peter
> > Veentjer
> > Sent: Monday, 10 July 2006 3:15 PM
> > To: Dhanji R. Prasanna; concurrency-interest at cs.oswego.edu
> > Subject: Re: [concurrency-interest] Fwd: ThreadPoolTask
> >
> >
> > A Repeater needs only a single runnable that it keeps repeating. No
> > strange things like resubmitting tasks or strange 'copyqueues'.
> >
> > I'm still working on the code, so it could contain bugs.
> >
> > ---------------------------------------------------------------------
> >
> > package org.jph.concurrent.repeater;
> >
> > import org.jph.concurrent.LockUtil;
> > import org.jph.concurrent.StdThreadFactory;
> > import org.jph.concurrent.awaitablereference.RelaxedAwaitableReference;
> >
> > import java.util.HashSet;
> > import java.util.Iterator;
> > import java.util.concurrent.RejectedExecutionException;
> > import java.util.concurrent.ThreadFactory;
> > import java.util.concurrent.TimeUnit;
> > import java.util.concurrent.atomic.AtomicInteger;
> > import java.util.concurrent.locks.Lock;
> > import java.util.concurrent.locks.ReentrantLock;
> >
> > /**
> >  * The ThreadPoolRepeaterService is the default implementation of the
> >  * {@link RepeaterService}. It uses a threadpool.
> >  *
> >  * @author Peter Veentjer.
> >  */
> > public final class ThreadPoolRepeaterService implements RepeaterService {
> >
> >       private final static Runnable DUMMY_TASK = new DummyTask();
> >
> >
> >       //number of threads.
> >       //this field has to be volatile because it is shared
> > between different
> >       //threads.
> >       private volatile int _threadCount;
> >
> >       //factory for new threads
> >       private final ThreadFactory _threadFactory;
> >       //Set containing all worker threads
> >       private final HashSet<Worker> _workers = new HashSet<Worker>();
> >
> >       //Contains the current number of threads.
> >       private final AtomicInteger _currentThreadCount = new
> > AtomicInteger(0);
> >
> >       //Lifecycle state
> >       private volatile RepeaterServiceState _runstate =
> > RepeaterServiceState.NOT_STARTED;
> >
> >       private final RelaxedAwaitableReference<Runnable> _awaitableTask =
> > new RelaxedAwaitableReference<Runnable>();
> >
> >       private final Lock _mainLock = new ReentrantLock();
> >
> >       /**
> >        * Creates a StdThreadFactory with a single thread, a
> > StdThreadFactory and
> >        * no task to execute.
> >        */
> >       public ThreadPoolRepeaterService() {
> >               this(1);
> >       }
> >
> >       /**
> >        * Creates a ThreadPoolRepeaterService with the given initial
> > parameters, a StdThreadFactory
> >        * and no task to execute.
> >        *
> >        * @param threadCount the number of threads.
> >        * @throws IllegalArgumentException if threadCount < 0.
> >        */
> >       public ThreadPoolRepeaterService(int threadCount) {
> >               this(threadCount, new StdThreadFactory("repeaters"));
> >       }
> >
> >       /**
> >        * Creates a ThreadPoolRepeaterService
> >        *
> >        * @param threadCount   the number of threads.
> >        * @param threadFactory the ThreadFactory that is used to
> > create threads for
> >        *                      the threadpool.
> >        * @throws IllegalArgumentException if threadcount smaller than 0.
> >        * @throws NullPointerException  if threadFactory is null.
> >        */
> >       public ThreadPoolRepeaterService(int threadCount, ThreadFactory
> > threadFactory) {
> >               this(threadCount, threadFactory, null);
> >       }
> >
> >       /**
> >        * Creates a new ThreadPoolRepeaterService (that is not started).
> >        *
> >        * @param threadCount   the number of threads to use.
> >        * @param threadFactory the ThreadFactory to use for
> > creating threads.
> >        * @param runnable        the task to repeat (can be null).
> >        * @throws IllegalArgumentException if threadCount smaller than 0.
> >        * @throws NullPointerException  if threadFactory is null.
> >        */
> >       public ThreadPoolRepeaterService(int threadCount, ThreadFactory
> > threadFactory, Runnable runnable) {
> >               if (threadFactory == null) throw new NullPointerException();
> >               if (threadCount < 0) throw new IllegalArgumentException();
> >               _threadCount = threadCount;
> >               _threadFactory = threadFactory;
> >               try {
> >                       _awaitableTask.setNewReference(runnable);
> >               } catch (InterruptedException e) {
> >                       e.printStackTrace();  //To change body of
> > catch statement use File
> > | Settings | File Templates.
> >               }
> >       }
> >
> >       /**
> >        * Returns the number of threads. The returned value does not
> >        * have to correspond with the actual number of running threads:
> >        * if this method is called after the poolsize has been changed
> >        * but before the change is picked up.
> >        *
> >        * @see #getCurrentThreadCount()
> >        */
> >       public int getThreadCount() {
> >               return _threadCount;
> >       }
> >
> >       /**
> >        * Returns the current number of Running threads. This
> > value can be stale
> >        * if the threadcount has been changed in the meanwhile.
> >        *
> >        * @see #getThreadCount()
> >        */
> >       public int getCurrentThreadCount() {
> >               return _currentThreadCount.intValue();
> >       }
> >
> >       public void increaseThreadCount(int delta) {
> >               throw new UnsupportedOperationException();
> >       }
> >
> >       /**
> >        * Sets the number of threads this ThreadPoolRepeaterService uses to
> >        * execute.
> >        *
> >        * @param newThreadCount the number of threads.
> >        * @throws IllegalArgumentException if newThreadCount < 0.
> >        */
> >       public void setThreadCount(int newThreadCount) {
> >               if (newThreadCount < 0) throw new
> > IllegalArgumentException();
> >
> >               _mainLock.lock();
> >               try {
> >                       if (_runstate == RepeaterServiceState.NOT_STARTED) {
> >                               //the Repeater isn`t running yet,
> > the number of threads
> >                               //can be increased because no
> > threads are created yet.
> >                               _threadCount = newThreadCount;
> >                       } else if (_runstate !=
> > RepeaterServiceState.RUNNING) {
> >                               //a task can only be repeated if
> > this RepeaterService is running.
> >                               String msg = "This RepeaterService
> > doesn't accept new tasks to repeat";
> >                               throw new
> > RejectedExecutionException(msg);  //todo: exception
> >                       } else {
> >                               //the Repeater is running.
> >                               int delta = newThreadCount -
> > getCurrentThreadCount();
> >                               if (delta == 0) {
> >                                       //the poolsize stays the same
> >                                       return;
> >                               }
> >
> >                               if (delta > 0) {
> >                                       //additional threads have
> > to be added
> >                                       for (int k = 0; k < delta; k++) {
> >                                               Thread thread =
> > createAndRegisterWorker();
> >                                               thread.start();
> >                                       }
> >                               } else {
> >                                       //threads have to be destroyed.
> >                                       Iterator<Worker> workerIt =
> > _workers.iterator();
> >                                       for (int k = 0; k < -delta; k++) {
> >                                               Worker worker =
> > workerIt.next();
> >                                               worker.die();
> >                                               _workers.remove(worker);
> >                                       }
> >                               }
> >                               _threadCount = newThreadCount;
> >                       }
> >
> >               } finally {
> >                       _mainLock.unlock();
> >               }
> >       }
> >
> >
> >       /**
> >        * Returns the ThreadFactory this ThreadPoolRepeaterService
> > uses to create
> >        * threads.
> >        */
> >       public ThreadFactory getThreadFactory() {
> >               return _threadFactory;
> >       }
> >
> >       public synchronized Runnable getTask() {
> >               try {
> >                       return _awaitableTask.lendValue(-1,
> > TimeUnit.MICROSECONDS);
> >               } catch (InterruptedException e) {
> >                       throw new RuntimeException(e);//todo
> >               }
> >       }
> >
> >       public RepeaterServiceState getState() {
> >               return _runstate;
> >       }
> >
> >       /**
> >        * Makes sure that the Repeater is ready for repeating. A
> > Repeater is
> >        * ready for repeating if:
> >        * <ul>
> >        * <li>if is has not been started yet. If this is the case, it is
> >        * started.</li>
> >        * <li>if it is started. It doesn't matter if it is started
> >        * multiple times.
> >        * </li>
> >        * <ul>
> >        * In all other cases, a RejectedExecutionException will be thrown.
> >        * <p/>
> >        * This call requires the mainlock, but doesn't lock it itself.
> >        * So the caller is responsible for this task.
> >        *
> >        * @throws RejectedExecutionException if this Repeater is
> > not in the correct
> >        *                                    state to execute the task.
> >        */
> >       private void ensureRepeaterReadyForRepeating() {
> >               switch (_runstate) {
> >                       case NOT_STARTED:
> >                               //the repeater has not been
> > started, so start it.
> >                               startRepeater();
> >                               break;
> >                       case RUNNING://everything is ok
> >                               break;
> >                       default:
> >                               String msg = "This RepeaterService
> > doesn't accept new tasks to repeat";
> >                               throw new RejectedExecutionException(msg);
> >               }
> >       }
> >
> >       public boolean repeat(Runnable task, long timeout, TimeUnit unit)
> > throws InterruptedException {
> >               if (task == null || unit == null) throw new
> > NullPointerException();
> >
> >               long timeoutNs = unit.toNanos(timeout);
> >               timeoutNs = LockUtil.tryLockNanos(_mainLock, timeoutNs);
> >               if (timeoutNs <= 0)
> >                       return false; //the mainlock could not be acquired.
> >
> >               try {
> >                       ensureRepeaterReadyForRepeating();
> >                       return _awaitableTask.tryPutReference(task,
> > timeoutNs, TimeUnit.NANOSECONDS);
> >               } finally {
> >                       _mainLock.unlock();
> >               }
> >       }
> >
> >       public void repeat(Runnable task) throws InterruptedException {
> >               _mainLock.lockInterruptibly();
> >               try {
> >                       ensureRepeaterReadyForRepeating();
> >                       _awaitableTask.setNewReference(task);
> >               } finally {
> >                       _mainLock.unlock();
> >               }
> >       }
> >
> >       public void start() {
> >               try {
> >                       _mainLock.lock();
> >                       startRepeater();
> >               } finally {
> >                       _mainLock.unlock();
> >               }
> >       }
> >
> >       /**
> >        * Starts this RepeaterService. This method doesn`t lock
> > the mainLock,
> >        * that is the responsibility of the caller of this method.
> >        * <p/>
> >        * If this method is called while the runstate is different
> >        * than NOT_STARTED, then an IllegalStateException will be thrown.
> >        */
> >       private void startRepeater() {
> >               if (_runstate != RepeaterServiceState.NOT_STARTED) throw new
> > IllegalStateException();
> >
> >               _runstate = RepeaterServiceState.RUNNING;
> >
> >               for (int k = 0; k < _threadCount; k++) {
> >                       Thread thread = createAndRegisterWorker();
> >                       thread.start();
> >               }
> >       }
> >
> >       /**
> >        * Creates a new (unstarted) worker and registers it.
> >        *
> >        * This method doesn`t lock the required resources, this is
> >        * the task of the callee.
> >        *
> >        * @return the created worker Thread.
> >        */
> >       private Thread createAndRegisterWorker() {
> >               Worker worker = new Worker();
> >               Thread thread = _threadFactory.newThread(worker);
> >               worker.setThread(thread);
> >               _workers.add(worker);
> >               _currentThreadCount.incrementAndGet();
> >               return thread;
> >       }
> >
> >       /**
> >        * Deregisters a Worker. The Worker is removed from the
> > workers and if it is
> >        * the last worker, and this ThreadPoolRepeaterService is
> > terminating, it sets
> >        * the state to terminated.
> >        *
> >        * This method is threadsafe (it locks the mainlock itself).
> >        *
> >        * @param worker
> >        */
> >       private void deregisterWorker(Worker worker) {
> >               _mainLock.lock();
> >
> >               try {
> >                       //remove the worker
> >                       _workers.remove(worker);
> >
> >                       boolean isLastWorker = _workers.isEmpty();
> >                       boolean isTerminating = _runstate ==
> > RepeaterServiceState.TERMINATING;
> >
> >                       //if this ThreadPoolRepeaterService is
> > shutting down, and there are
> > no more workers
> >                       //then this ThreadPoolRepeaterService is terminated.
> >                       if (isLastWorker && isTerminating)
> >                               _runstate = RepeaterServiceState.TERMINATED;
> >               } finally {
> >                       _mainLock.unlock();
> >               }
> >
> >               _currentThreadCount.decrementAndGet();
> >       }
> >
> >       public void shutdown() throws InterruptedException {
> >               _mainLock.lock();
> >               try {
> >                       switch (_runstate) {
> >                               case NOT_STARTED:
> >                                       //if the service has not
> > been started, it can be terminated immediately.
> >                                       _runstate =
> > RepeaterServiceState.TERMINATED;
> >                                       break;
> >                               case RUNNING:
> >                                       if (getCurrentThreadCount() == 0) {
> >                                               //there are not
> > workers, this service is terminated
> >                                               _runstate =
> > RepeaterServiceState.TERMINATED;
> >                                       } else {
> >                                               //there are
> > workers, this service is going in the terminating state
> >                                               //untill all
> > workers are terminated.
> >                                               _runstate =
> > RepeaterServiceState.TERMINATING;
> >
> >                                               //insert a dummy
> > task in the waitabletask. If a worker
> >                                               //is waiting for a
> > task to execute, it gets this dummy task,
> >                                               //executes it and
> > terminates.
> >                                               //If this dummy
> > task isn`t added, it could happen a worker
> >                                               //keeps waiting for
> > a task that is never going to be executed.
> >                                               //And this means
> > this ThreadPoolRepeaterService is never going
> > to shutdown.
> >
> > _awaitableTask.setNewReference(DUMMY_TASK);
> >                                       }
> >                                       break;
> >                               case TERMINATING:
> >                                       //nothing needs to be done,
> > this ThreadPoolRepeaterService
> > already is shuttingdown.
> >                                       break;
> >                               case TERMINATED:
> >                                       //nothing needs to be done.
> > this ThreadPoolRepeaterService
> > already is shutdown
> >                                       break;
> >                               default:
> >                                       throw new AssertionError();
> >                       }
> >               } finally {
> >                       _mainLock.unlock();
> >               }
> >       }
> >
> >       //idea:shutdownAndWait
> >
> >       private static class DummyTask implements Runnable {
> >               public void run() {
> >                       //do nothing
> >               }
> >       }
> >
> >       private class Worker implements Runnable {
> >
> >               //a flag to indicate that this worker should stop
> >               //This field should be volatile because it is used
> > in a multithreaded
> >               //environment.
> >               private volatile boolean _die = false;
> >               //the Thread that executes the Worker. This field
> > is 'final'.
> >               private Thread _thread;
> >
> >               /**
> >                * Sets the threads that executes this Worker.
> >                *
> >                * @param thread
> >                */
> >               private void setThread(Thread thread) {
> >                       _thread = thread;
> >               }
> >
> >               /**
> >                * Signals the Worker it should stop working.
> >                */
> >               private void die() {
> >                       _die = true;
> >                       _thread.interrupt();
> >               }
> >
> >               public void run() {
> >                       try {
> >                               Runnable task = null;
> >                               _awaitableTask.lendValue();
> >                               while (_runstate ==
> > RepeaterServiceState.RUNNING && !_die) {
> >                                       try {
> >                                               task.run();
> >                                       } catch (RuntimeException ex) {
> >                                               //todo: error handeling
> >                                               ex.printStackTrace();
> >                                       } finally {
> >
> > _awaitableTask.returnValue(task);
> >                                       }
> >
> >                                       task = _awaitableTask.lendValue();
> >                               }
> >
> >                               _awaitableTask.returnValue(task);
> >                       } catch (InterruptedException e) {
> >                               //todo
> >                       } finally {
> >                               deregisterWorker(this);
> >                       }
> >               }
> >       }
> > }
> >
> > On 7/10/06, Dhanji R. Prasanna <dhanji at gmail.com> wrote:
> > > >From what I gathered from Peter, the specific use of a ThreadPool
> > > "Repeating" executor was that increasing the corePoolSize would
> > > automagically increase the number of tasks being executed
> > ("repeated"). This
> > > is semantically quite different from what a TPE does or is
> > meant to do and
> > > perhaps that is where the confusion has arisen.
> > >
> > > Repeating tasks (sequentially requeing them) was not the direct
> > impedence as
> > > that can easily be solved with the task resubmitting itself or
> > a specialized
> > > impl of a blocking queue which keeps queuing the same task instance.
> > >
> > > This is what I gathered from the aether... =)
> > >
> > >
> > > On 7/10/06, David Holmes <dcholmes at optusnet.com.au > wrote:
> > > >
> > > > That may well be, but so far I don't have a complete view of
> > either all
> > > your
> > > > requirements or all of Peter's, so it is hard to offer suggestions.
> > > >
> > > > Cheers,
> > > > David
> > > >
> > > > _______________________________________________
> > > > Concurrency-interest mailing list
> > > > Concurrency-interest at altair.cs.oswego.edu
> > > >
> > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> > > >
> > >
> > >
> > > _______________________________________________
> > > Concurrency-interest mailing list
> > > Concurrency-interest at altair.cs.oswego.edu
> > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> > >
> > >
> > >
> > _______________________________________________
> > Concurrency-interest mailing list
> > Concurrency-interest at altair.cs.oswego.edu
> > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>


More information about the Concurrency-interest mailing list