[concurrency-interest] Advice for execute( Runnable task, long timeout )

Jan Nielsen jnielsen at sungardsct.com
Tue Apr 26 19:11:55 EDT 2005


Hello all:

I have a need in JVM 1.3 and above (Solaris, Windows, RedHat, and HPUX) 
for the following API in dl.util.concurrent.PooledExecutor.execute( 
Runnable command, long msecs) which interrupts a job if it's not 
complete after msecs. Snippits are shown below and a modified version 
PooledExecutor is attached. The implementation seems work but it feels 
like a hack, in particular the Worker, Work, and Interrupt class 
interaction seems less than ideal. Is there a better way to implement 
this in dl.util.concurrent? Any thoughts, suggestions, insights and/or 
critique are greatly appreciated.

Many thanks!

-Jan

    /**
     * Complete the given command within the specified time. If the
     * command cannot be completed within the specified time, the thread
     * executing the command is interrupted. If the command has not yet
     * been assigned a thread the command will be marked as interrupted
     * and will not be assigned a thread of execution.
     *
     * @param command work to execute
     *
     * @param msecs milliseconds in future when work should complete
     **/
    public void execute(
        Runnable command,
        long msecs
        )
        throws InterruptedException
    {
        for (;;) {
            synchronized(this) {
                if (!shutdown_) {
                    int size = poolSize_;

                    Work work = new Work(command);

                    // Ensure minimum number of threads
                    if (size < minimumPoolSize_) {
                        if( 0 < msecs )
                        {
                            Interrupt interrupt = new Interrupt( work );
                            interrupter_.executeAfterDelay( msecs, 
interrupt );
                        }
                        addThread(work);
                        return;
                    }
         
                    // Try to give to existing thread
                    if (handOff_.offer(work, 0)) {
                        if( 0 < msecs )
                        {
                            Interrupt interrupt = new Interrupt( work );
                            interrupter_.executeAfterDelay( msecs, 
interrupt );
                        }             
                        return;
                    }
         
                    // If cannot handoff and still under maximum, create 
new thread
                    if (size < maximumPoolSize_) {
                        if( 0 < msecs )
                        {
                            Interrupt interrupt = new Interrupt( work );
                            interrupter_.executeAfterDelay( msecs, 
interrupt );
                        }
                        addThread(work);
                        return;
                    }
                }
            }

            // Cannot hand off and cannot create -- ask for help
            if (getBlockedExecutionHandler().blockedAction(command)) {
                return;
            }
        }
    }

    /**
     * Class defining the basic element of work for worker threads.
     **/
    protected class Work implements Runnable {
        protected Runnable command_;
        protected Thread thread_;
        protected boolean threadInterrupted_;
     
        /**
         * Constructs a element of work which should be complete by a
         * certain time, otherwise the work should be interrupted.
         *
         * @param command work to be executed
         **/
        protected Work(Runnable command)
        {
            command_ = command;
        }

        /**
         * Sets the thread assigned from the executor, and unsets it
         * when complete.
         *
         * @param thread executor's thread assigned to this work
         **/
        public synchronized void setThread(
            Thread thread
            )
        {
            thread_ = thread;
        }

        public void run()
        {
            if( !threadInterrupted_ )
            {
                command_.run();
            }
            else
            {
                thread_.interrupt();
            }
        }

        /**
         * Interrupts the scheduled work. If the
         **/
        public synchronized void interrupt()
        {
            if( null != thread_ )
                thread_.interrupt();
            else
                threadInterrupted_ = true;
        }
    }

    /**
     * Class to hold the work which should be interrupted if the
     * submitted work is not completed in time.
     **/
    protected class Interrupt
        implements Runnable
    {
        /** Work to interrupt. */
        private Work work_;
       
        protected Interrupt( Work work )
        {
            work_ = work;
        }

        public void run()
        {
            work_.interrupt();

            work_ = null;
        }       
    }

    /**
     * Class defining the basic run loop for pooled threads.
     **/
    protected class Worker implements Runnable {
        protected Work firstTask_;

        protected Worker(Work firstTask) { firstTask_ = firstTask; }

        public void run() {
            try {
                Work task = firstTask_;
                firstTask_ = null; // enable GC

                if (task != null) {
                    task.setThread( Thread.currentThread() );
                    task.run();
                    task.setThread( null );
                    task = null;
                }
       
                while ( (task = getTask()) != null) {
                    task.setThread( Thread.currentThread() );
                    task.run();
                    task.setThread( null );
                    task = null;
                }
            }
            catch (InterruptedException ex) { } // fall through
            finally {
                workerDone(this);
            }
        }
    }

    /**
     * Single thread of execution which interrupts jobs submitted to the
     * executor which do not complete in time.
     **/
    private ClockDaemon interrupter_ = new ClockDaemon(); 


To implement this API I followed a suggestion by Joe Bowbeer to a 
similar question for JDK 5. In my case



Joe Bowbeer wrote:

>Here's a simple approach:
>
>When you submit your FutureTask to the executor service, also schedule a 
>TimerTask to cancel that task.
>
>If your FutureTask needs to act at the moment it is cancelled, override the 
>"done" method and perform the desired action if isCancelled returns true.
>
>
>----- Original Message ----- 
>From: "Minnie Haridasa" <minnieh at corp.earthlink.net>
>To: <concurrency-interest at altair.cs.oswego.edu>
>Sent: Sunday, April 03, 2005 9:51 PM
>Subject: [concurrency-interest] Cancelling Tasks returning immediately..
>
>
>Hello,
>
>We have unique situation where I thought I may seek you help for some useful
>hints.
>
>My thread pool has an unbounded queue associated to it. However, while a
>task is waiting on the queue to be
>
>Processed, it cannot wait indefinitely, after a certain period of time, it
>needs to cancel out and return a different result.
>
>Is it possible to achieve this using the concurrent utilities of JDK1.5.
>
>My queue has to be unbounded, and the task (callable) that gets put on the
>queue needs to be time bound. How can I make the cancelled task return
>immediately as opposed to the result that can only be retrieved using method
>get when the computation has completed or the task gets its turn to be
>executed by the pool.
>
>Appreciate your help.
>
>Thanks
>
>MH
>
>
>
>_______________________________________________
>Concurrency-interest mailing list
>Concurrency-interest at altair.cs.oswego.edu
>http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>  
>

-------------- next part --------------
A non-text attachment was scrubbed...
Name: PooledExecutor.java
Type: java/*
Size: 38302 bytes
Desc: not available
Url : /pipermail/attachments/20050426/17b6cb04/PooledExecutor-0001.bin


More information about the Concurrency-interest mailing list