[concurrency-interest] Thread pools threads exhausted?

Joe Bowbeer joe.bowbeer at gmail.com
Mon Jan 19 16:13:11 EST 2009


Note that submit throws the exception, which will throw an exception to the
caller of readUrl.  You won't receive the future if the submission is
rejected.

I would strictly avoid manipulating the executor's queue while the executor
is running.

I would catch RejectedExecution in readUrl for starters in order to verify
and diagnose the problem.  Then I would consider selecting a different
rejected execution handler, if that solves the problem.

On Mon, Jan 19, 2009 at 12:59 PM, robert lazarski wrote:

> On Mon, Jan 19, 2009 at 4:36 PM, Joe Bowbeer wrote:
> > Note that executorService.submit can throw RejectedExecutionException but
> > this is not handled in readUrl.
> >
> > Recommend:
> >
> > Configure the rejected execution handler on your executor service to log
> > rejections.
> >
> > And/or handle rejected executions in readUrl.
> >
> > Also, are you only submitting to one host?  I notice
> maxConnectionsPerHost
> > is the same as the fixed size of the thread pool.
> >
> > Joe
>
> Thanks for the help Joe, Yes, I am always submitting to one host.
> Since I was catching Exception and  RejectedExecutionHandler inherits
> it, wouldn't I have caught it even though it also inherits
> RuntimeException? Anyways, I explicitly catch it now.
>
> I've got some new code shown below. The immediate question I have is
> can I do this in the RejectedExecutionHandler?
>
> executor.getQueue().put(runnable);
>
> Here's the new code, thanks for the help.
>
> public class URLReader {
>
>        private static final ThreadFactory factory = new
> ONExceptionThreadFactory(new ONExceptionHandler());
>         // Create a ThreadPoolExecutor using a bounded buffer
>        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 30000,
> TimeUnit.MILLISECONDS,
>                        new ArrayBlockingQueue<Runnable>(10), factory);
>
>        // See the HttpClient options if need be in HttpConnectionManager
> the
> associated Parameters class.
>        private HttpClient objHttp;
>
>        /** commons logging declaration. */
>    private static Log logger = LogFactory.getLog(
>                URLReader.class);
>
>    private int timeout_in_seconds;
>    private int max_tries;
>        private String appInstance;
>
>        /**
>     * Connect to given URL and return response, or timeout. See
> http://commons.apache.org/httpclient/threading.html
>     * for programming guidelines and default settings.
>     *
>     * The behavior of this method is to encode the url.
>     */
>    public String readURL(final String url) throws Exception {
>
>        // lazy init
>        if (objHttp == null) {
>                 pool.setRejectedExecutionHandler(new
> ONExceptionRejectedExecutionHandler());
>                 MultiThreadedHttpConnectionManager mgr = new
> MultiThreadedHttpConnectionManager();
>                HttpConnectionManagerParams params = new
> HttpConnectionManagerParams();
>                params.setDefaultMaxConnectionsPerHost(10);
>                mgr.setParams(params);
>                objHttp = new HttpClient(mgr);
>        }
>                Callable<String> task = new Callable<String>() {
>                     public String call() {
>                         logger.debug("appInstance: "+appInstance+ " , says
> ...
> readURL call() invoked with url: " + url);
>                     // Send the POST request.
>                     PostMethod objPost = new PostMethod(url);
>                         try {
>                             objHttp.executeMethod(objPost);
>                         // Read the response.
>                             byte[] byteResponse =
> objPost.getResponseBody();
>                             return new String(byteResponse);
>                         } catch (Exception ex) {
>                                 logger.error(ex.getMessage(), ex);
>                         } finally {
>                                 objPost.releaseConnection();
>                         }
>                 // couldn't connect!
>                         return null;
>                     }
>                };
>
>                 Future<String> future = pool.submit(task);
>                 try {
>                        // wait and timeout or return result
>                        return future.get(timeout_in_seconds,
> TimeUnit.SECONDS);
>                 } catch (RejectedExecutionException ex) {
>             // We don't need the result, so cancel the task
>            future.cancel(true);
>                 logger.error("readURL() thread rejected with message: " +
> ex.getMessage(), ex);
>                 } catch (InterruptedException ex) {
>            // Re-assert the thread's interrupted status
>            Thread.currentThread().interrupt();
>            // We don't need the result, so cancel the task too
>            future.cancel(true);
>            logger.error("Thread interrupted, connection to url: " +
> url + " is cancelled: " + ex.getMessage(), ex);
>        } catch (Exception ex) {
>            // We don't need the result, so cancel the task
>            future.cancel(true);
>                logger.error(ex.getMessage(), ex);
>        }
>        // couldn't connect!
>        return null;
>        }
>    }
>
> public class ONExceptionRejectedExecutionHandler implements
>                RejectedExecutionHandler {
>
>           /** commons logging declaration. */
>    private static Log logger = LogFactory.getLog(
>                 ONExceptionRejectedExecutionHandler.class);
>
>        @Override
>        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor
> executor) {
>                try {
>                        logger.error("rejectedExecution() invoked with
> executor.isShutdown() value: " + executor.isShutdown() +
>                            " , number of active threads: " +
> executor.getActiveCount() +
>                            " , approximate TaskCount: " +
> executor.getTaskCount() +
>                            " , approximate CompletedTaskCount: " +
> executor.getCompletedTaskCount() +
>                            " , PoolSize: " + executor.getPoolSize());
>                        logger.error("rejectedExecution() adding Runnable
> back onto the Queue");
>                        executor.getQueue().put(runnable);
>                } catch (Exception ex) {
>                         throw new RejectedExecutionException(ex);
>                }
>        }
>
> }
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20090119/8fc559d0/attachment.html>


More information about the Concurrency-interest mailing list