[concurrency-interest] Fwd: ThreadPoolTask

Dhanji R. Prasanna dhanji at gmail.com
Mon Jul 10 02:18:11 EDT 2006


On 7/10/06, Peter Veentjer <alarmnummer at gmail.com> wrote:
>
>
> Another problem what if you want have at most a single 'instance' of a
> runnable being active. With an executor this is difficult to realise,
> with the repeater I use a 'StrictAwaitableReference'. All workers need
> to return their task, so the strictawaitablereference uses a rw-lock
> to make sure all old jobs are returned before a new one can be set. If
> it is allowed that different runnables are running concurrently, I can
> use a much cheaper lock: the RelaxedAwaitableReference.


At the risk of dragging this on further:
Afaics, what you describe above (with weak vs strong locking) is the job of
the blocking queue that holds and feeds new tasks.

On 7/10/06, David Holmes <dcholmes at optusnet.com.au > wrote:
> > Peter,
> >
> > If a Repeater simply has one or more worker threads that forever repeat
> > executing the same task then it would seem to me that using a
> > ThreadPoolExecutor with a custom queue that always returns theTask is
> the
> > simplest way to go. The Repeater can pre-start core threads, or else
> submit
> > to seed the initial execution. Cancellation of a thread might be harder
> as
> > they will never be idle, but the queue can support checking for being
> > interrupted too.
> >
> > But you know the details of how you want the Repeater to be able to
> > grow/shrink and perhaps those requirements don't map well to TPE. But
> then
> > again you could always grow/shrink by throwing away one TPE and
> replacing
> > with another :)
> >
> > As I said I can't see the full set of requirements for this service.
> >
> > Cheers,
> > David Holmes
> >
> > > -----Original Message-----
> > > From: concurrency-interest-bounces at cs.oswego.edu
> > > [mailto:concurrency-interest-bounces at cs.oswego.edu ]On Behalf Of Peter
> > > Veentjer
> > > Sent: Monday, 10 July 2006 3:15 PM
> > > To: Dhanji R. Prasanna; concurrency-interest at cs.oswego.edu
> > > Subject: Re: [concurrency-interest] Fwd: ThreadPoolTask
> > >
> > >
> > > A Repeater needs only a single runnable that it keeps repeating. No
> > > strange things like resubmitting tasks or strange 'copyqueues'.
> > >
> > > I'm still working on the code, so it could contain bugs.
> > >
> > > ---------------------------------------------------------------------
> > >
> > > package org.jph.concurrent.repeater ;
> > >
> > > import org.jph.concurrent.LockUtil;
> > > import org.jph.concurrent.StdThreadFactory;
> > > import org.jph.concurrent.awaitablereference.RelaxedAwaitableReference
> ;
> > >
> > > import java.util.HashSet;
> > > import java.util.Iterator;
> > > import java.util.concurrent.RejectedExecutionException;
> > > import java.util.concurrent.ThreadFactory;
> > > import java.util.concurrent.TimeUnit;
> > > import java.util.concurrent.atomic.AtomicInteger;
> > > import java.util.concurrent.locks.Lock;
> > > import java.util.concurrent.locks.ReentrantLock;
> > >
> > > /**
> > >  * The ThreadPoolRepeaterService is the default implementation of the
> > >  * {@link RepeaterService}. It uses a threadpool.
> > >  *
> > >  * @author Peter Veentjer.
> > >  */
> > > public final class ThreadPoolRepeaterService implements
> RepeaterService {
> > >
> > >       private final static Runnable DUMMY_TASK = new DummyTask();
> > >
> > >
> > >       //number of threads.
> > >       //this field has to be volatile because it is shared
> > > between different
> > >       //threads.
> > >       private volatile int _threadCount;
> > >
> > >       //factory for new threads
> > >       private final ThreadFactory _threadFactory;
> > >       //Set containing all worker threads
> > >       private final HashSet<Worker> _workers = new HashSet<Worker>();
> > >
> > >       //Contains the current number of threads.
> > >       private final AtomicInteger _currentThreadCount = new
> > > AtomicInteger(0);
> > >
> > >       //Lifecycle state
> > >       private volatile RepeaterServiceState _runstate =
> > > RepeaterServiceState.NOT_STARTED;
> > >
> > >       private final RelaxedAwaitableReference<Runnable> _awaitableTask
> =
> > > new RelaxedAwaitableReference<Runnable>();
> > >
> > >       private final Lock _mainLock = new ReentrantLock();
> > >
> > >       /**
> > >        * Creates a StdThreadFactory with a single thread, a
> > > StdThreadFactory and
> > >        * no task to execute.
> > >        */
> > >       public ThreadPoolRepeaterService() {
> > >               this(1);
> > >       }
> > >
> > >       /**
> > >        * Creates a ThreadPoolRepeaterService with the given initial
> > > parameters, a StdThreadFactory
> > >        * and no task to execute.
> > >        *
> > >        * @param threadCount the number of threads.
> > >        * @throws IllegalArgumentException if threadCount < 0.
> > >        */
> > >       public ThreadPoolRepeaterService(int threadCount) {
> > >               this(threadCount, new StdThreadFactory("repeaters"));
> > >       }
> > >
> > >       /**
> > >        * Creates a ThreadPoolRepeaterService
> > >        *
> > >        * @param threadCount   the number of threads.
> > >        * @param threadFactory the ThreadFactory that is used to
> > > create threads for
> > >        *                      the threadpool.
> > >        * @throws IllegalArgumentException if threadcount smaller than
> 0.
> > >        * @throws NullPointerException  if threadFactory is null.
> > >        */
> > >       public ThreadPoolRepeaterService(int threadCount, ThreadFactory
> > > threadFactory) {
> > >               this(threadCount, threadFactory, null);
> > >       }
> > >
> > >       /**
> > >        * Creates a new ThreadPoolRepeaterService (that is not
> started).
> > >        *
> > >        * @param threadCount   the number of threads to use.
> > >        * @param threadFactory the ThreadFactory to use for
> > > creating threads.
> > >        * @param runnable        the task to repeat (can be null).
> > >        * @throws IllegalArgumentException if threadCount smaller than
> 0.
> > >        * @throws NullPointerException  if threadFactory is null.
> > >        */
> > >       public ThreadPoolRepeaterService(int threadCount, ThreadFactory
> > > threadFactory, Runnable runnable) {
> > >               if (threadFactory == null) throw new
> NullPointerException();
> > >               if (threadCount < 0) throw new
> IllegalArgumentException();
> > >               _threadCount = threadCount;
> > >               _threadFactory = threadFactory;
> > >               try {
> > >                       _awaitableTask.setNewReference(runnable);
> > >               } catch (InterruptedException e) {
> > >                       e.printStackTrace();  //To change body of
> > > catch statement use File
> > > | Settings | File Templates.
> > >               }
> > >       }
> > >
> > >       /**
> > >        * Returns the number of threads. The returned value does not
> > >        * have to correspond with the actual number of running threads:
>
> > >        * if this method is called after the poolsize has been changed
> > >        * but before the change is picked up.
> > >        *
> > >        * @see #getCurrentThreadCount()
> > >        */
> > >       public int getThreadCount() {
> > >               return _threadCount;
> > >       }
> > >
> > >       /**
> > >        * Returns the current number of Running threads. This
> > > value can be stale
> > >        * if the threadcount has been changed in the meanwhile.
> > >        *
> > >        * @see #getThreadCount()
> > >        */
> > >       public int getCurrentThreadCount() {
> > >               return _currentThreadCount.intValue();
> > >       }
> > >
> > >       public void increaseThreadCount(int delta) {
> > >               throw new UnsupportedOperationException();
> > >       }
> > >
> > >       /**
> > >        * Sets the number of threads this ThreadPoolRepeaterService
> uses to
> > >        * execute.
> > >        *
> > >        * @param newThreadCount the number of threads.
> > >        * @throws IllegalArgumentException if newThreadCount < 0.
> > >        */
> > >       public void setThreadCount(int newThreadCount) {
> > >               if (newThreadCount < 0) throw new
> > > IllegalArgumentException();
> > >
> > >               _mainLock.lock();
> > >               try {
> > >                       if (_runstate ==
> RepeaterServiceState.NOT_STARTED) {
> > >                               //the Repeater isn`t running yet,
> > > the number of threads
> > >                               //can be increased because no
> > > threads are created yet.
> > >                               _threadCount = newThreadCount;
> > >                       } else if (_runstate !=
> > > RepeaterServiceState.RUNNING) {
> > >                               //a task can only be repeated if
> > > this RepeaterService is running.
> > >                               String msg = "This RepeaterService
> > > doesn't accept new tasks to repeat";
> > >                               throw new
> > > RejectedExecutionException(msg);  //todo: exception
> > >                       } else {
> > >                               //the Repeater is running.
> > >                               int delta = newThreadCount -
> > > getCurrentThreadCount();
> > >                               if (delta == 0) {
> > >                                       //the poolsize stays the same
> > >                                       return;
> > >                               }
> > >
> > >                               if (delta > 0) {
> > >                                       //additional threads have
> > > to be added
> > >                                       for (int k = 0; k < delta; k++)
> {
> > >                                               Thread thread =
> > > createAndRegisterWorker();
> > >                                               thread.start();
> > >                                       }
> > >                               } else {
> > >                                       //threads have to be destroyed.
> > >                                       Iterator<Worker> workerIt =
> > > _workers.iterator();
> > >                                       for (int k = 0; k < -delta; k++)
> {
> > >                                               Worker worker =
> > > workerIt.next ();
> > >                                               worker.die();
> > >                                               _workers.remove(worker);
> > >                                       }
> > >                               }
> > >                               _threadCount = newThreadCount;
> > >                       }
> > >
> > >               } finally {
> > >                       _mainLock.unlock();
> > >               }
> > >       }
> > >
> > >
> > >       /**
> > >        * Returns the ThreadFactory this ThreadPoolRepeaterService
> > > uses to create
> > >        * threads.
> > >        */
> > >       public ThreadFactory getThreadFactory() {
> > >               return _threadFactory;
> > >       }
> > >
> > >       public synchronized Runnable getTask() {
> > >               try {
> > >                       return _awaitableTask.lendValue(-1,
> > > TimeUnit.MICROSECONDS);
> > >               } catch (InterruptedException e) {
> > >                       throw new RuntimeException(e);//todo
> > >               }
> > >       }
> > >
> > >       public RepeaterServiceState getState() {
> > >               return _runstate;
> > >       }
> > >
> > >       /**
> > >        * Makes sure that the Repeater is ready for repeating. A
> > > Repeater is
> > >        * ready for repeating if:
> > >        * <ul>
> > >        * <li>if is has not been started yet. If this is the case, it
> is
> > >        * started.</li>
> > >        * <li>if it is started. It doesn't matter if it is started
> > >        * multiple times.
> > >        * </li>
> > >        * <ul>
> > >        * In all other cases, a RejectedExecutionException will be
> thrown.
> > >        * <p/>
> > >        * This call requires the mainlock, but doesn't lock it itself.
> > >        * So the caller is responsible for this task.
> > >        *
> > >        * @throws RejectedExecutionException if this Repeater is
> > > not in the correct
> > >        *                                    state to execute the task.
> > >        */
> > >       private void ensureRepeaterReadyForRepeating() {
> > >               switch (_runstate) {
> > >                       case NOT_STARTED:
> > >                               //the repeater has not been
> > > started, so start it.
> > >                               startRepeater();
> > >                               break;
> > >                       case RUNNING://everything is ok
> > >                               break;
> > >                       default:
> > >                               String msg = "This RepeaterService
> > > doesn't accept new tasks to repeat";
> > >                               throw new
> RejectedExecutionException(msg);
> > >               }
> > >       }
> > >
> > >       public boolean repeat(Runnable task, long timeout, TimeUnit
> unit)
> > > throws InterruptedException {
> > >               if (task == null || unit == null) throw new
> > > NullPointerException();
> > >
> > >               long timeoutNs = unit.toNanos(timeout);
> > >               timeoutNs = LockUtil.tryLockNanos(_mainLock, timeoutNs);
> > >               if (timeoutNs <= 0)
> > >                       return false; //the mainlock could not be
> acquired.
> > >
> > >               try {
> > >                       ensureRepeaterReadyForRepeating();
> > >                       return _awaitableTask.tryPutReference(task,
> > > timeoutNs, TimeUnit.NANOSECONDS);
> > >               } finally {
> > >                       _mainLock.unlock();
> > >               }
> > >       }
> > >
> > >       public void repeat(Runnable task) throws InterruptedException {
> > >               _mainLock.lockInterruptibly();
> > >               try {
> > >                       ensureRepeaterReadyForRepeating();
> > >                       _awaitableTask.setNewReference(task);
> > >               } finally {
> > >                       _mainLock.unlock();
> > >               }
> > >       }
> > >
> > >       public void start() {
> > >               try {
> > >                       _mainLock.lock();
> > >                       startRepeater();
> > >               } finally {
> > >                       _mainLock.unlock();
> > >               }
> > >       }
> > >
> > >       /**
> > >        * Starts this RepeaterService. This method doesn`t lock
> > > the mainLock,
> > >        * that is the responsibility of the caller of this method.
> > >        * <p/>
> > >        * If this method is called while the runstate is different
> > >        * than NOT_STARTED, then an IllegalStateException will be
> thrown.
> > >        */
> > >       private void startRepeater() {
> > >               if (_runstate != RepeaterServiceState.NOT_STARTED) throw
> new
> > > IllegalStateException();
> > >
> > >               _runstate = RepeaterServiceState.RUNNING;
> > >
> > >               for (int k = 0; k < _threadCount; k++) {
> > >                       Thread thread = createAndRegisterWorker();
> > >                       thread.start();
> > >               }
> > >       }
> > >
> > >       /**
> > >        * Creates a new (unstarted) worker and registers it.
> > >        *
> > >        * This method doesn`t lock the required resources, this is
> > >        * the task of the callee.
> > >        *
> > >        * @return the created worker Thread.
> > >        */
> > >       private Thread createAndRegisterWorker() {
> > >               Worker worker = new Worker();
> > >               Thread thread = _threadFactory.newThread(worker);
> > >               worker.setThread(thread);
> > >               _workers.add(worker);
> > >               _currentThreadCount.incrementAndGet();
> > >               return thread;
> > >       }
> > >
> > >       /**
> > >        * Deregisters a Worker. The Worker is removed from the
> > > workers and if it is
> > >        * the last worker, and this ThreadPoolRepeaterService is
> > > terminating, it sets
> > >        * the state to terminated.
> > >        *
> > >        * This method is threadsafe (it locks the mainlock itself).
> > >        *
> > >        * @param worker
> > >        */
> > >       private void deregisterWorker(Worker worker) {
> > >               _mainLock.lock();
> > >
> > >               try {
> > >                       //remove the worker
> > >                       _workers.remove(worker);
> > >
> > >                       boolean isLastWorker = _workers.isEmpty();
> > >                       boolean isTerminating = _runstate ==
> > > RepeaterServiceState.TERMINATING;
> > >
> > >                       //if this ThreadPoolRepeaterService is
> > > shutting down, and there are
> > > no more workers
> > >                       //then this ThreadPoolRepeaterService is
> terminated.
> > >                       if (isLastWorker && isTerminating)
> > >                               _runstate =
> RepeaterServiceState.TERMINATED;
> > >               } finally {
> > >                       _mainLock.unlock();
> > >               }
> > >
> > >               _currentThreadCount.decrementAndGet();
> > >       }
> > >
> > >       public void shutdown() throws InterruptedException {
> > >               _mainLock.lock();
> > >               try {
> > >                       switch (_runstate) {
> > >                               case NOT_STARTED:
> > >                                       //if the service has not
> > > been started, it can be terminated immediately.
> > >                                       _runstate =
> > > RepeaterServiceState.TERMINATED;
> > >                                       break;
> > >                               case RUNNING:
> > >                                       if (getCurrentThreadCount() ==
> 0) {
> > >                                               //there are not
> > > workers, this service is terminated
> > >                                               _runstate =
> > > RepeaterServiceState.TERMINATED;
> > >                                       } else {
> > >                                               //there are
> > > workers, this service is going in the terminating state
> > >                                               //untill all
> > > workers are terminated.
> > >                                               _runstate =
> > > RepeaterServiceState.TERMINATING;
> > >
> > >                                               //insert a dummy
> > > task in the waitabletask. If a worker
> > >                                               //is waiting for a
> > > task to execute, it gets this dummy task,
> > >                                               //executes it and
> > > terminates.
> > >                                               //If this dummy
> > > task isn`t added, it could happen a worker
> > >                                               //keeps waiting for
> > > a task that is never going to be executed.
> > >                                               //And this means
> > > this ThreadPoolRepeaterService is never going
> > > to shutdown.
> > >
> > > _awaitableTask.setNewReference(DUMMY_TASK);
> > >                                       }
> > >                                       break;
> > >                               case TERMINATING:
> > >                                       //nothing needs to be done,
> > > this ThreadPoolRepeaterService
> > > already is shuttingdown.
> > >                                       break;
> > >                               case TERMINATED:
> > >                                       //nothing needs to be done.
> > > this ThreadPoolRepeaterService
> > > already is shutdown
> > >                                       break;
> > >                               default:
> > >                                       throw new AssertionError();
> > >                       }
> > >               } finally {
> > >                       _mainLock.unlock();
> > >               }
> > >       }
> > >
> > >       //idea:shutdownAndWait
> > >
> > >       private static class DummyTask implements Runnable {
> > >               public void run() {
> > >                       //do nothing
> > >               }
> > >       }
> > >
> > >       private class Worker implements Runnable {
> > >
> > >               //a flag to indicate that this worker should stop
> > >               //This field should be volatile because it is used
> > > in a multithreaded
> > >               //environment.
> > >               private volatile boolean _die = false;
> > >               //the Thread that executes the Worker. This field
> > > is 'final'.
> > >               private Thread _thread;
> > >
> > >               /**
> > >                * Sets the threads that executes this Worker.
> > >                *
> > >                * @param thread
> > >                */
> > >               private void setThread(Thread thread) {
> > >                       _thread = thread;
> > >               }
> > >
> > >               /**
> > >                * Signals the Worker it should stop working.
> > >                */
> > >               private void die() {
> > >                       _die = true;
> > >                       _thread.interrupt();
> > >               }
> > >
> > >               public void run() {
> > >                       try {
> > >                               Runnable task = null;
> > >                               _awaitableTask.lendValue();
> > >                               while (_runstate ==
> > > RepeaterServiceState.RUNNING && !_die) {
> > >                                       try {
> > >                                               task.run();
> > >                                       } catch (RuntimeException ex) {
> > >                                               //todo: error handeling
> > >                                               ex.printStackTrace();
> > >                                       } finally {
> > >
> > > _awaitableTask.returnValue(task);
> > >                                       }
> > >
> > >                                       task =
> _awaitableTask.lendValue();
> > >                               }
> > >
> > >                               _awaitableTask.returnValue(task);
> > >                       } catch (InterruptedException e) {
> > >                               //todo
> > >                       } finally {
> > >                               deregisterWorker(this);
> > >                       }
> > >               }
> > >       }
> > > }
> > >
> > > On 7/10/06, Dhanji R. Prasanna <dhanji at gmail.com> wrote:
> > > > >From what I gathered from Peter, the specific use of a ThreadPool
> > > > "Repeating" executor was that increasing the corePoolSize would
> > > > automagically increase the number of tasks being executed
> > > ("repeated"). This
> > > > is semantically quite different from what a TPE does or is
> > > meant to do and
> > > > perhaps that is where the confusion has arisen.
> > > >
> > > > Repeating tasks (sequentially requeing them) was not the direct
> > > impedence as
> > > > that can easily be solved with the task resubmitting itself or
> > > a specialized
> > > > impl of a blocking queue which keeps queuing the same task instance.
>
> > > >
> > > > This is what I gathered from the aether... =)
> > > >
> > > >
> > > > On 7/10/06, David Holmes <dcholmes at optusnet.com.au > wrote:
> > > > >
> > > > > That may well be, but so far I don't have a complete view of
> > > either all
> > > > your
> > > > > requirements or all of Peter's, so it is hard to offer
> suggestions.
> > > > >
> > > > > Cheers,
> > > > > David
> > > > >
> > > > > _______________________________________________
> > > > > Concurrency-interest mailing list
> > > > > Concurrency-interest at altair.cs.oswego.edu
> > > > >
> > > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> > > > >
> > > >
> > > >
> > > > _______________________________________________
> > > > Concurrency-interest mailing list
> > > > Concurrency-interest at altair.cs.oswego.edu
> > > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> > > >
> > > >
> > > >
> > > _______________________________________________
> > > Concurrency-interest mailing list
> > > Concurrency-interest at altair.cs.oswego.edu
> > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> >
> >
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060710/cc951dc4/attachment-0001.html 


More information about the Concurrency-interest mailing list