[concurrency-interest] Fwd: ThreadPoolTask

Peter Veentjer alarmnummer at gmail.com
Mon Jul 10 02:09:51 EDT 2006


I use it to wire up everything that needs to be repeated (but in most
cased blocks waiting for input).

I could use it to create JMS-listeners for example.

or check out the last code example in the following topic:
http://forum.springframework.org/showthread.php?t=26194&highlight=repeater

The parser is a component with a InputChannel (Takeable) that contains
messages and it also has 2 output channels (one for parsed messages,
and one for errors).

The parserrepeater is hooked up to the parser. The parser exposes an
axle (the parse method that blocks if no message are available on the
input channel) and the parserrepeater is the engine you hook up to
that axle. It gives a very clear design imho.


On 7/10/06, David Holmes <dcholmes at optusnet.com.au> wrote:
> The Repeater may well be the right abstraction for this, my point was why
> not build it from TPE and a special queue and/or special task rather than
> from "first principles"?
>
> I'm curious about the use-cases for this.
>
> Cheers,
> David
>
> > -----Original Message-----
> > From: Peter Veentjer [mailto:alarmnummer at gmail.com]
> > Sent: Monday, 10 July 2006 3:54 PM
> > To: dholmes at ieee.org; concurrency-interest at altair.cs.oswego.edu
> > Subject: Re: [concurrency-interest] Fwd: ThreadPoolTask
> >
> >
> > Use the right tool for the job. The executor isn't made for continuous
> > repetition of a task and if you have to use strange solutions you
> > aren't useing the right abstraction.
> >
> > Another problem what if you want have at most a single 'instance' of a
> > runnable being active. With an executor this is difficult to realise,
> > with the repeater I use a 'StrictAwaitableReference'. All workers need
> > to return their task, so the strictawaitablereference uses a rw-lock
> > to make sure all old jobs are returned before a new one can be set. If
> > it is allowed that different runnables are running concurrently, I can
> > use a much cheaper lock: the RelaxedAwaitableReference.
> >
> >
> >
> > On 7/10/06, David Holmes <dcholmes at optusnet.com.au> wrote:
> > > Peter,
> > >
> > > If a Repeater simply has one or more worker threads that forever repeat
> > > executing the same task then it would seem to me that using a
> > > ThreadPoolExecutor with a custom queue that always returns
> > theTask is the
> > > simplest way to go. The Repeater can pre-start core threads, or
> > else submit
> > > to seed the initial execution. Cancellation of a thread might
> > be harder as
> > > they will never be idle, but the queue can support checking for being
> > > interrupted too.
> > >
> > > But you know the details of how you want the Repeater to be able to
> > > grow/shrink and perhaps those requirements don't map well to
> > TPE. But then
> > > again you could always grow/shrink by throwing away one TPE and
> > replacing
> > > with another :)
> > >
> > > As I said I can't see the full set of requirements for this service.
> > >
> > > Cheers,
> > > David Holmes
> > >
> > > > -----Original Message-----
> > > > From: concurrency-interest-bounces at cs.oswego.edu
> > > > [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of Peter
> > > > Veentjer
> > > > Sent: Monday, 10 July 2006 3:15 PM
> > > > To: Dhanji R. Prasanna; concurrency-interest at cs.oswego.edu
> > > > Subject: Re: [concurrency-interest] Fwd: ThreadPoolTask
> > > >
> > > >
> > > > A Repeater needs only a single runnable that it keeps repeating. No
> > > > strange things like resubmitting tasks or strange 'copyqueues'.
> > > >
> > > > I'm still working on the code, so it could contain bugs.
> > > >
> > > > ---------------------------------------------------------------------
> > > >
> > > > package org.jph.concurrent.repeater;
> > > >
> > > > import org.jph.concurrent.LockUtil;
> > > > import org.jph.concurrent.StdThreadFactory;
> > > > import
> > org.jph.concurrent.awaitablereference.RelaxedAwaitableReference;
> > > >
> > > > import java.util.HashSet;
> > > > import java.util.Iterator;
> > > > import java.util.concurrent.RejectedExecutionException;
> > > > import java.util.concurrent.ThreadFactory;
> > > > import java.util.concurrent.TimeUnit;
> > > > import java.util.concurrent.atomic.AtomicInteger;
> > > > import java.util.concurrent.locks.Lock;
> > > > import java.util.concurrent.locks.ReentrantLock;
> > > >
> > > > /**
> > > >  * The ThreadPoolRepeaterService is the default implementation of the
> > > >  * {@link RepeaterService}. It uses a threadpool.
> > > >  *
> > > >  * @author Peter Veentjer.
> > > >  */
> > > > public final class ThreadPoolRepeaterService implements
> > RepeaterService {
> > > >
> > > >       private final static Runnable DUMMY_TASK = new DummyTask();
> > > >
> > > >
> > > >       //number of threads.
> > > >       //this field has to be volatile because it is shared
> > > > between different
> > > >       //threads.
> > > >       private volatile int _threadCount;
> > > >
> > > >       //factory for new threads
> > > >       private final ThreadFactory _threadFactory;
> > > >       //Set containing all worker threads
> > > >       private final HashSet<Worker> _workers = new HashSet<Worker>();
> > > >
> > > >       //Contains the current number of threads.
> > > >       private final AtomicInteger _currentThreadCount = new
> > > > AtomicInteger(0);
> > > >
> > > >       //Lifecycle state
> > > >       private volatile RepeaterServiceState _runstate =
> > > > RepeaterServiceState.NOT_STARTED;
> > > >
> > > >       private final RelaxedAwaitableReference<Runnable>
> > _awaitableTask =
> > > > new RelaxedAwaitableReference<Runnable>();
> > > >
> > > >       private final Lock _mainLock = new ReentrantLock();
> > > >
> > > >       /**
> > > >        * Creates a StdThreadFactory with a single thread, a
> > > > StdThreadFactory and
> > > >        * no task to execute.
> > > >        */
> > > >       public ThreadPoolRepeaterService() {
> > > >               this(1);
> > > >       }
> > > >
> > > >       /**
> > > >        * Creates a ThreadPoolRepeaterService with the given initial
> > > > parameters, a StdThreadFactory
> > > >        * and no task to execute.
> > > >        *
> > > >        * @param threadCount the number of threads.
> > > >        * @throws IllegalArgumentException if threadCount < 0.
> > > >        */
> > > >       public ThreadPoolRepeaterService(int threadCount) {
> > > >               this(threadCount, new StdThreadFactory("repeaters"));
> > > >       }
> > > >
> > > >       /**
> > > >        * Creates a ThreadPoolRepeaterService
> > > >        *
> > > >        * @param threadCount   the number of threads.
> > > >        * @param threadFactory the ThreadFactory that is used to
> > > > create threads for
> > > >        *                      the threadpool.
> > > >        * @throws IllegalArgumentException if threadcount
> > smaller than 0.
> > > >        * @throws NullPointerException  if threadFactory is null.
> > > >        */
> > > >       public ThreadPoolRepeaterService(int threadCount, ThreadFactory
> > > > threadFactory) {
> > > >               this(threadCount, threadFactory, null);
> > > >       }
> > > >
> > > >       /**
> > > >        * Creates a new ThreadPoolRepeaterService (that is not
> > started).
> > > >        *
> > > >        * @param threadCount   the number of threads to use.
> > > >        * @param threadFactory the ThreadFactory to use for
> > > > creating threads.
> > > >        * @param runnable        the task to repeat (can be null).
> > > >        * @throws IllegalArgumentException if threadCount
> > smaller than 0.
> > > >        * @throws NullPointerException  if threadFactory is null.
> > > >        */
> > > >       public ThreadPoolRepeaterService(int threadCount, ThreadFactory
> > > > threadFactory, Runnable runnable) {
> > > >               if (threadFactory == null) throw new
> > NullPointerException();
> > > >               if (threadCount < 0) throw new
> > IllegalArgumentException();
> > > >               _threadCount = threadCount;
> > > >               _threadFactory = threadFactory;
> > > >               try {
> > > >                       _awaitableTask.setNewReference(runnable);
> > > >               } catch (InterruptedException e) {
> > > >                       e.printStackTrace();  //To change body of
> > > > catch statement use File
> > > > | Settings | File Templates.
> > > >               }
> > > >       }
> > > >
> > > >       /**
> > > >        * Returns the number of threads. The returned value does not
> > > >        * have to correspond with the actual number of running threads:
> > > >        * if this method is called after the poolsize has been changed
> > > >        * but before the change is picked up.
> > > >        *
> > > >        * @see #getCurrentThreadCount()
> > > >        */
> > > >       public int getThreadCount() {
> > > >               return _threadCount;
> > > >       }
> > > >
> > > >       /**
> > > >        * Returns the current number of Running threads. This
> > > > value can be stale
> > > >        * if the threadcount has been changed in the meanwhile.
> > > >        *
> > > >        * @see #getThreadCount()
> > > >        */
> > > >       public int getCurrentThreadCount() {
> > > >               return _currentThreadCount.intValue();
> > > >       }
> > > >
> > > >       public void increaseThreadCount(int delta) {
> > > >               throw new UnsupportedOperationException();
> > > >       }
> > > >
> > > >       /**
> > > >        * Sets the number of threads this
> > ThreadPoolRepeaterService uses to
> > > >        * execute.
> > > >        *
> > > >        * @param newThreadCount the number of threads.
> > > >        * @throws IllegalArgumentException if newThreadCount < 0.
> > > >        */
> > > >       public void setThreadCount(int newThreadCount) {
> > > >               if (newThreadCount < 0) throw new
> > > > IllegalArgumentException();
> > > >
> > > >               _mainLock.lock();
> > > >               try {
> > > >                       if (_runstate ==
> > RepeaterServiceState.NOT_STARTED) {
> > > >                               //the Repeater isn`t running yet,
> > > > the number of threads
> > > >                               //can be increased because no
> > > > threads are created yet.
> > > >                               _threadCount = newThreadCount;
> > > >                       } else if (_runstate !=
> > > > RepeaterServiceState.RUNNING) {
> > > >                               //a task can only be repeated if
> > > > this RepeaterService is running.
> > > >                               String msg = "This RepeaterService
> > > > doesn't accept new tasks to repeat";
> > > >                               throw new
> > > > RejectedExecutionException(msg);  //todo: exception
> > > >                       } else {
> > > >                               //the Repeater is running.
> > > >                               int delta = newThreadCount -
> > > > getCurrentThreadCount();
> > > >                               if (delta == 0) {
> > > >                                       //the poolsize stays the same
> > > >                                       return;
> > > >                               }
> > > >
> > > >                               if (delta > 0) {
> > > >                                       //additional threads have
> > > > to be added
> > > >                                       for (int k = 0; k <
> > delta; k++) {
> > > >                                               Thread thread =
> > > > createAndRegisterWorker();
> > > >                                               thread.start();
> > > >                                       }
> > > >                               } else {
> > > >                                       //threads have to be destroyed.
> > > >                                       Iterator<Worker> workerIt =
> > > > _workers.iterator();
> > > >                                       for (int k = 0; k <
> > -delta; k++) {
> > > >                                               Worker worker =
> > > > workerIt.next();
> > > >                                               worker.die();
> > > >                                               _workers.remove(worker);
> > > >                                       }
> > > >                               }
> > > >                               _threadCount = newThreadCount;
> > > >                       }
> > > >
> > > >               } finally {
> > > >                       _mainLock.unlock();
> > > >               }
> > > >       }
> > > >
> > > >
> > > >       /**
> > > >        * Returns the ThreadFactory this ThreadPoolRepeaterService
> > > > uses to create
> > > >        * threads.
> > > >        */
> > > >       public ThreadFactory getThreadFactory() {
> > > >               return _threadFactory;
> > > >       }
> > > >
> > > >       public synchronized Runnable getTask() {
> > > >               try {
> > > >                       return _awaitableTask.lendValue(-1,
> > > > TimeUnit.MICROSECONDS);
> > > >               } catch (InterruptedException e) {
> > > >                       throw new RuntimeException(e);//todo
> > > >               }
> > > >       }
> > > >
> > > >       public RepeaterServiceState getState() {
> > > >               return _runstate;
> > > >       }
> > > >
> > > >       /**
> > > >        * Makes sure that the Repeater is ready for repeating. A
> > > > Repeater is
> > > >        * ready for repeating if:
> > > >        * <ul>
> > > >        * <li>if is has not been started yet. If this is the
> > case, it is
> > > >        * started.</li>
> > > >        * <li>if it is started. It doesn't matter if it is started
> > > >        * multiple times.
> > > >        * </li>
> > > >        * <ul>
> > > >        * In all other cases, a RejectedExecutionException
> > will be thrown.
> > > >        * <p/>
> > > >        * This call requires the mainlock, but doesn't lock it itself.
> > > >        * So the caller is responsible for this task.
> > > >        *
> > > >        * @throws RejectedExecutionException if this Repeater is
> > > > not in the correct
> > > >        *                                    state to execute the task.
> > > >        */
> > > >       private void ensureRepeaterReadyForRepeating() {
> > > >               switch (_runstate) {
> > > >                       case NOT_STARTED:
> > > >                               //the repeater has not been
> > > > started, so start it.
> > > >                               startRepeater();
> > > >                               break;
> > > >                       case RUNNING://everything is ok
> > > >                               break;
> > > >                       default:
> > > >                               String msg = "This RepeaterService
> > > > doesn't accept new tasks to repeat";
> > > >                               throw new
> > RejectedExecutionException(msg);
> > > >               }
> > > >       }
> > > >
> > > >       public boolean repeat(Runnable task, long timeout,
> > TimeUnit unit)
> > > > throws InterruptedException {
> > > >               if (task == null || unit == null) throw new
> > > > NullPointerException();
> > > >
> > > >               long timeoutNs = unit.toNanos(timeout);
> > > >               timeoutNs = LockUtil.tryLockNanos(_mainLock, timeoutNs);
> > > >               if (timeoutNs <= 0)
> > > >                       return false; //the mainlock could not
> > be acquired.
> > > >
> > > >               try {
> > > >                       ensureRepeaterReadyForRepeating();
> > > >                       return _awaitableTask.tryPutReference(task,
> > > > timeoutNs, TimeUnit.NANOSECONDS);
> > > >               } finally {
> > > >                       _mainLock.unlock();
> > > >               }
> > > >       }
> > > >
> > > >       public void repeat(Runnable task) throws InterruptedException {
> > > >               _mainLock.lockInterruptibly();
> > > >               try {
> > > >                       ensureRepeaterReadyForRepeating();
> > > >                       _awaitableTask.setNewReference(task);
> > > >               } finally {
> > > >                       _mainLock.unlock();
> > > >               }
> > > >       }
> > > >
> > > >       public void start() {
> > > >               try {
> > > >                       _mainLock.lock();
> > > >                       startRepeater();
> > > >               } finally {
> > > >                       _mainLock.unlock();
> > > >               }
> > > >       }
> > > >
> > > >       /**
> > > >        * Starts this RepeaterService. This method doesn`t lock
> > > > the mainLock,
> > > >        * that is the responsibility of the caller of this method.
> > > >        * <p/>
> > > >        * If this method is called while the runstate is different
> > > >        * than NOT_STARTED, then an IllegalStateException will
> > be thrown.
> > > >        */
> > > >       private void startRepeater() {
> > > >               if (_runstate !=
> > RepeaterServiceState.NOT_STARTED) throw new
> > > > IllegalStateException();
> > > >
> > > >               _runstate = RepeaterServiceState.RUNNING;
> > > >
> > > >               for (int k = 0; k < _threadCount; k++) {
> > > >                       Thread thread = createAndRegisterWorker();
> > > >                       thread.start();
> > > >               }
> > > >       }
> > > >
> > > >       /**
> > > >        * Creates a new (unstarted) worker and registers it.
> > > >        *
> > > >        * This method doesn`t lock the required resources, this is
> > > >        * the task of the callee.
> > > >        *
> > > >        * @return the created worker Thread.
> > > >        */
> > > >       private Thread createAndRegisterWorker() {
> > > >               Worker worker = new Worker();
> > > >               Thread thread = _threadFactory.newThread(worker);
> > > >               worker.setThread(thread);
> > > >               _workers.add(worker);
> > > >               _currentThreadCount.incrementAndGet();
> > > >               return thread;
> > > >       }
> > > >
> > > >       /**
> > > >        * Deregisters a Worker. The Worker is removed from the
> > > > workers and if it is
> > > >        * the last worker, and this ThreadPoolRepeaterService is
> > > > terminating, it sets
> > > >        * the state to terminated.
> > > >        *
> > > >        * This method is threadsafe (it locks the mainlock itself).
> > > >        *
> > > >        * @param worker
> > > >        */
> > > >       private void deregisterWorker(Worker worker) {
> > > >               _mainLock.lock();
> > > >
> > > >               try {
> > > >                       //remove the worker
> > > >                       _workers.remove(worker);
> > > >
> > > >                       boolean isLastWorker = _workers.isEmpty();
> > > >                       boolean isTerminating = _runstate ==
> > > > RepeaterServiceState.TERMINATING;
> > > >
> > > >                       //if this ThreadPoolRepeaterService is
> > > > shutting down, and there are
> > > > no more workers
> > > >                       //then this ThreadPoolRepeaterService
> > is terminated.
> > > >                       if (isLastWorker && isTerminating)
> > > >                               _runstate =
> > RepeaterServiceState.TERMINATED;
> > > >               } finally {
> > > >                       _mainLock.unlock();
> > > >               }
> > > >
> > > >               _currentThreadCount.decrementAndGet();
> > > >       }
> > > >
> > > >       public void shutdown() throws InterruptedException {
> > > >               _mainLock.lock();
> > > >               try {
> > > >                       switch (_runstate) {
> > > >                               case NOT_STARTED:
> > > >                                       //if the service has not
> > > > been started, it can be terminated immediately.
> > > >                                       _runstate =
> > > > RepeaterServiceState.TERMINATED;
> > > >                                       break;
> > > >                               case RUNNING:
> > > >                                       if
> > (getCurrentThreadCount() == 0) {
> > > >                                               //there are not
> > > > workers, this service is terminated
> > > >                                               _runstate =
> > > > RepeaterServiceState.TERMINATED;
> > > >                                       } else {
> > > >                                               //there are
> > > > workers, this service is going in the terminating state
> > > >                                               //untill all
> > > > workers are terminated.
> > > >                                               _runstate =
> > > > RepeaterServiceState.TERMINATING;
> > > >
> > > >                                               //insert a dummy
> > > > task in the waitabletask. If a worker
> > > >                                               //is waiting for a
> > > > task to execute, it gets this dummy task,
> > > >                                               //executes it and
> > > > terminates.
> > > >                                               //If this dummy
> > > > task isn`t added, it could happen a worker
> > > >                                               //keeps waiting for
> > > > a task that is never going to be executed.
> > > >                                               //And this means
> > > > this ThreadPoolRepeaterService is never going
> > > > to shutdown.
> > > >
> > > > _awaitableTask.setNewReference(DUMMY_TASK);
> > > >                                       }
> > > >                                       break;
> > > >                               case TERMINATING:
> > > >                                       //nothing needs to be done,
> > > > this ThreadPoolRepeaterService
> > > > already is shuttingdown.
> > > >                                       break;
> > > >                               case TERMINATED:
> > > >                                       //nothing needs to be done.
> > > > this ThreadPoolRepeaterService
> > > > already is shutdown
> > > >                                       break;
> > > >                               default:
> > > >                                       throw new AssertionError();
> > > >                       }
> > > >               } finally {
> > > >                       _mainLock.unlock();
> > > >               }
> > > >       }
> > > >
> > > >       //idea:shutdownAndWait
> > > >
> > > >       private static class DummyTask implements Runnable {
> > > >               public void run() {
> > > >                       //do nothing
> > > >               }
> > > >       }
> > > >
> > > >       private class Worker implements Runnable {
> > > >
> > > >               //a flag to indicate that this worker should stop
> > > >               //This field should be volatile because it is used
> > > > in a multithreaded
> > > >               //environment.
> > > >               private volatile boolean _die = false;
> > > >               //the Thread that executes the Worker. This field
> > > > is 'final'.
> > > >               private Thread _thread;
> > > >
> > > >               /**
> > > >                * Sets the threads that executes this Worker.
> > > >                *
> > > >                * @param thread
> > > >                */
> > > >               private void setThread(Thread thread) {
> > > >                       _thread = thread;
> > > >               }
> > > >
> > > >               /**
> > > >                * Signals the Worker it should stop working.
> > > >                */
> > > >               private void die() {
> > > >                       _die = true;
> > > >                       _thread.interrupt();
> > > >               }
> > > >
> > > >               public void run() {
> > > >                       try {
> > > >                               Runnable task = null;
> > > >                               _awaitableTask.lendValue();
> > > >                               while (_runstate ==
> > > > RepeaterServiceState.RUNNING && !_die) {
> > > >                                       try {
> > > >                                               task.run();
> > > >                                       } catch (RuntimeException ex) {
> > > >                                               //todo: error handeling
> > > >                                               ex.printStackTrace();
> > > >                                       } finally {
> > > >
> > > > _awaitableTask.returnValue(task);
> > > >                                       }
> > > >
> > > >                                       task =
> > _awaitableTask.lendValue();
> > > >                               }
> > > >
> > > >                               _awaitableTask.returnValue(task);
> > > >                       } catch (InterruptedException e) {
> > > >                               //todo
> > > >                       } finally {
> > > >                               deregisterWorker(this);
> > > >                       }
> > > >               }
> > > >       }
> > > > }
> > > >
> > > > On 7/10/06, Dhanji R. Prasanna <dhanji at gmail.com> wrote:
> > > > > >From what I gathered from Peter, the specific use of a ThreadPool
> > > > > "Repeating" executor was that increasing the corePoolSize would
> > > > > automagically increase the number of tasks being executed
> > > > ("repeated"). This
> > > > > is semantically quite different from what a TPE does or is
> > > > meant to do and
> > > > > perhaps that is where the confusion has arisen.
> > > > >
> > > > > Repeating tasks (sequentially requeing them) was not the direct
> > > > impedence as
> > > > > that can easily be solved with the task resubmitting itself or
> > > > a specialized
> > > > > impl of a blocking queue which keeps queuing the same task instance.
> > > > >
> > > > > This is what I gathered from the aether... =)
> > > > >
> > > > >
> > > > > On 7/10/06, David Holmes <dcholmes at optusnet.com.au > wrote:
> > > > > >
> > > > > > That may well be, but so far I don't have a complete view of
> > > > either all
> > > > > your
> > > > > > requirements or all of Peter's, so it is hard to offer
> > suggestions.
> > > > > >
> > > > > > Cheers,
> > > > > > David
> > > > > >
> > > > > > _______________________________________________
> > > > > > Concurrency-interest mailing list
> > > > > > Concurrency-interest at altair.cs.oswego.edu
> > > > > >
> > > > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> > > > > >
> > > > >
> > > > >
> > > > > _______________________________________________
> > > > > Concurrency-interest mailing list
> > > > > Concurrency-interest at altair.cs.oswego.edu
> > > > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> > > > >
> > > > >
> > > > >
> > > > _______________________________________________
> > > > Concurrency-interest mailing list
> > > > Concurrency-interest at altair.cs.oswego.edu
> > > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> > >
> > >
>
>


More information about the Concurrency-interest mailing list