[concurrency-interest] Advice for execute( Runnable task, long timeout )

David Holmes dholmes at dltech.com.au
Tue Apr 26 19:39:23 EDT 2005


Given the scheduling guarantees you don't have in the JVM you have very
little chance of this working anywhere close to what you hope. You are
relying on your interrupter executing in preference to the workers to be
interrupted. This presumes that you can even come up with sensible numbers
for the allowed execution time, given you don't even know when each worker
will be able to run.

The basic structure is about the best you can do - from a cursory
examination. But without scheduling guarantees I'd be very surprised if you
actually get a satisfactory result.

David Holmes

> -----Original Message-----
> From: concurrency-interest-bounces at cs.oswego.edu
> [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of Jan
> Nielsen
> Sent: Wednesday, 27 April 2005 9:12 AM
> To: concurrency-interest at altair.cs.oswego.edu
> Subject: [concurrency-interest] Advice for execute( Runnable task,long
> timeout )
>
>
> Hello all:
>
> I have a need in JVM 1.3 and above (Solaris, Windows, RedHat, and HPUX)
> for the following API in dl.util.concurrent.PooledExecutor.execute(
> Runnable command, long msecs) which interrupts a job if it's not
> complete after msecs. Snippits are shown below and a modified version
> PooledExecutor is attached. The implementation seems work but it feels
> like a hack, in particular the Worker, Work, and Interrupt class
> interaction seems less than ideal. Is there a better way to implement
> this in dl.util.concurrent? Any thoughts, suggestions, insights and/or
> critique are greatly appreciated.
>
> Many thanks!
>
> -Jan
>
>     /**
>      * Complete the given command within the specified time. If the
>      * command cannot be completed within the specified time, the thread
>      * executing the command is interrupted. If the command has not yet
>      * been assigned a thread the command will be marked as interrupted
>      * and will not be assigned a thread of execution.
>      *
>      * @param command work to execute
>      *
>      * @param msecs milliseconds in future when work should complete
>      **/
>     public void execute(
>         Runnable command,
>         long msecs
>         )
>         throws InterruptedException
>     {
>         for (;;) {
>             synchronized(this) {
>                 if (!shutdown_) {
>                     int size = poolSize_;
>
>                     Work work = new Work(command);
>
>                     // Ensure minimum number of threads
>                     if (size < minimumPoolSize_) {
>                         if( 0 < msecs )
>                         {
>                             Interrupt interrupt = new Interrupt( work );
>                             interrupter_.executeAfterDelay( msecs,
> interrupt );
>                         }
>                         addThread(work);
>                         return;
>                     }
>
>                     // Try to give to existing thread
>                     if (handOff_.offer(work, 0)) {
>                         if( 0 < msecs )
>                         {
>                             Interrupt interrupt = new Interrupt( work );
>                             interrupter_.executeAfterDelay( msecs,
> interrupt );
>                         }
>                         return;
>                     }
>
>                     // If cannot handoff and still under maximum, create
> new thread
>                     if (size < maximumPoolSize_) {
>                         if( 0 < msecs )
>                         {
>                             Interrupt interrupt = new Interrupt( work );
>                             interrupter_.executeAfterDelay( msecs,
> interrupt );
>                         }
>                         addThread(work);
>                         return;
>                     }
>                 }
>             }
>
>             // Cannot hand off and cannot create -- ask for help
>             if (getBlockedExecutionHandler().blockedAction(command)) {
>                 return;
>             }
>         }
>     }
>
>     /**
>      * Class defining the basic element of work for worker threads.
>      **/
>     protected class Work implements Runnable {
>         protected Runnable command_;
>         protected Thread thread_;
>         protected boolean threadInterrupted_;
>
>         /**
>          * Constructs a element of work which should be complete by a
>          * certain time, otherwise the work should be interrupted.
>          *
>          * @param command work to be executed
>          **/
>         protected Work(Runnable command)
>         {
>             command_ = command;
>         }
>
>         /**
>          * Sets the thread assigned from the executor, and unsets it
>          * when complete.
>          *
>          * @param thread executor's thread assigned to this work
>          **/
>         public synchronized void setThread(
>             Thread thread
>             )
>         {
>             thread_ = thread;
>         }
>
>         public void run()
>         {
>             if( !threadInterrupted_ )
>             {
>                 command_.run();
>             }
>             else
>             {
>                 thread_.interrupt();
>             }
>         }
>
>         /**
>          * Interrupts the scheduled work. If the
>          **/
>         public synchronized void interrupt()
>         {
>             if( null != thread_ )
>                 thread_.interrupt();
>             else
>                 threadInterrupted_ = true;
>         }
>     }
>
>     /**
>      * Class to hold the work which should be interrupted if the
>      * submitted work is not completed in time.
>      **/
>     protected class Interrupt
>         implements Runnable
>     {
>         /** Work to interrupt. */
>         private Work work_;
>
>         protected Interrupt( Work work )
>         {
>             work_ = work;
>         }
>
>         public void run()
>         {
>             work_.interrupt();
>
>             work_ = null;
>         }
>     }
>
>     /**
>      * Class defining the basic run loop for pooled threads.
>      **/
>     protected class Worker implements Runnable {
>         protected Work firstTask_;
>
>         protected Worker(Work firstTask) { firstTask_ = firstTask; }
>
>         public void run() {
>             try {
>                 Work task = firstTask_;
>                 firstTask_ = null; // enable GC
>
>                 if (task != null) {
>                     task.setThread( Thread.currentThread() );
>                     task.run();
>                     task.setThread( null );
>                     task = null;
>                 }
>
>                 while ( (task = getTask()) != null) {
>                     task.setThread( Thread.currentThread() );
>                     task.run();
>                     task.setThread( null );
>                     task = null;
>                 }
>             }
>             catch (InterruptedException ex) { } // fall through
>             finally {
>                 workerDone(this);
>             }
>         }
>     }
>
>     /**
>      * Single thread of execution which interrupts jobs submitted to the
>      * executor which do not complete in time.
>      **/
>     private ClockDaemon interrupter_ = new ClockDaemon();
>
>
> To implement this API I followed a suggestion by Joe Bowbeer to a
> similar question for JDK 5. In my case
>
>
>
> Joe Bowbeer wrote:
>
> >Here's a simple approach:
> >
> >When you submit your FutureTask to the executor service, also schedule a
> >TimerTask to cancel that task.
> >
> >If your FutureTask needs to act at the moment it is cancelled,
> override the
> >"done" method and perform the desired action if isCancelled returns true.
> >
> >
> >----- Original Message -----
> >From: "Minnie Haridasa" <minnieh at corp.earthlink.net>
> >To: <concurrency-interest at altair.cs.oswego.edu>
> >Sent: Sunday, April 03, 2005 9:51 PM
> >Subject: [concurrency-interest] Cancelling Tasks returning immediately..
> >
> >
> >Hello,
> >
> >We have unique situation where I thought I may seek you help for
> some useful
> >hints.
> >
> >My thread pool has an unbounded queue associated to it. However, while a
> >task is waiting on the queue to be
> >
> >Processed, it cannot wait indefinitely, after a certain period
> of time, it
> >needs to cancel out and return a different result.
> >
> >Is it possible to achieve this using the concurrent utilities of JDK1.5.
> >
> >My queue has to be unbounded, and the task (callable) that gets
> put on the
> >queue needs to be time bound. How can I make the cancelled task return
> >immediately as opposed to the result that can only be retrieved
> using method
> >get when the computation has completed or the task gets its turn to be
> >executed by the pool.
> >
> >Appreciate your help.
> >
> >Thanks
> >
> >MH
> >
> >
> >
> >_______________________________________________
> >Concurrency-interest mailing list
> >Concurrency-interest at altair.cs.oswego.edu
> >http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> >
> >
> >
>
>



More information about the Concurrency-interest mailing list