[concurrency-interest] Thread pools threads exhausted?

robert lazarski robertlazarski at gmail.com
Mon Jan 19 15:59:15 EST 2009


On Mon, Jan 19, 2009 at 4:36 PM, Joe Bowbeer <joe.bowbeer at gmail.com> wrote:
> Note that executorService.submit can throw RejectedExecutionException but
> this is not handled in readUrl.
>
> Recommend:
>
> Configure the rejected execution handler on your executor service to log
> rejections.
>
> And/or handle rejected executions in readUrl.
>
> Also, are you only submitting to one host?  I notice maxConnectionsPerHost
> is the same as the fixed size of the thread pool.
>
> Joe

Thanks for the help Joe, Yes, I am always submitting to one host.
Since I was catching Exception and  RejectedExecutionHandler inherits
it, wouldn't I have caught it even though it also inherits
RuntimeException? Anyways, I explicitly catch it now.

I've got some new code shown below. The immediate question I have is
can I do this in the RejectedExecutionHandler?

executor.getQueue().put(runnable);

Here's the new code, thanks for the help.

public class URLReader {
	
	private static final ThreadFactory factory = new
ONExceptionThreadFactory(new ONExceptionHandler());
	// Create a ThreadPoolExecutor using a bounded buffer
	ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 30000,
TimeUnit.MILLISECONDS,
			new ArrayBlockingQueue<Runnable>(10), factory);

	// See the HttpClient options if need be in HttpConnectionManager the
associated Parameters class.
	private HttpClient objHttp;

	/** commons logging declaration. */
    private static Log logger = LogFactory.getLog(
    		URLReader.class);

    private int timeout_in_seconds;
    private int max_tries;
	private String appInstance;
	
	/**
     * Connect to given URL and return response, or timeout. See
http://commons.apache.org/httpclient/threading.html
     * for programming guidelines and default settings.
     *
     * The behavior of this method is to encode the url.
     */
    public String readURL(final String url) throws Exception {

    	// lazy init
    	if (objHttp == null) {
    		pool.setRejectedExecutionHandler(new
ONExceptionRejectedExecutionHandler());
    		MultiThreadedHttpConnectionManager mgr = new
MultiThreadedHttpConnectionManager();
    		HttpConnectionManagerParams params = new HttpConnectionManagerParams();
    		params.setDefaultMaxConnectionsPerHost(10);
    		mgr.setParams(params);
    		objHttp = new HttpClient(mgr);
    	}
		Callable<String> task = new Callable<String>() {
		     public String call() {
	        	 logger.debug("appInstance: "+appInstance+ " , says ...
readURL call() invoked with url: " + url);
	             // Send the POST request.
	             PostMethod objPost = new PostMethod(url);
		         try {
		             objHttp.executeMethod(objPost);
	                 // Read the response.
		             byte[] byteResponse = objPost.getResponseBody();
		             return new String(byteResponse);
		         } catch (Exception ex) {
		        	 logger.error(ex.getMessage(), ex);
		         } finally {
		        	 objPost.releaseConnection();
		         }
                 // couldn't connect!
		         return null;
		     }
		};

		Future<String> future = pool.submit(task);
		try {
			// wait and timeout or return result
			return future.get(timeout_in_seconds, TimeUnit.SECONDS);
		} catch (RejectedExecutionException ex) {
            // We don't need the result, so cancel the task
            future.cancel(true);
        	logger.error("readURL() thread rejected with message: " +
ex.getMessage(), ex);
		} catch (InterruptedException ex) {
            // Re-assert the thread's interrupted status
            Thread.currentThread().interrupt();
            // We don't need the result, so cancel the task too
            future.cancel(true);
            logger.error("Thread interrupted, connection to url: " +
url + " is cancelled: " + ex.getMessage(), ex);
        } catch (Exception ex) {
            // We don't need the result, so cancel the task
            future.cancel(true);
        	logger.error(ex.getMessage(), ex);
        }
        // couldn't connect!
        return null;
	}
    }

public class ONExceptionRejectedExecutionHandler implements
		RejectedExecutionHandler {
	
	   /** commons logging declaration. */
    private static Log logger = LogFactory.getLog(
    		ONExceptionRejectedExecutionHandler.class);

	@Override
	public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
		try {
			logger.error("rejectedExecution() invoked with
executor.isShutdown() value: " + executor.isShutdown() +
		            " , number of active threads: " + executor.getActiveCount() +
		            " , approximate TaskCount: " + executor.getTaskCount() +
		            " , approximate CompletedTaskCount: " +
executor.getCompletedTaskCount() +
		            " , PoolSize: " + executor.getPoolSize());
			logger.error("rejectedExecution() adding Runnable back onto the Queue");
			executor.getQueue().put(runnable);
		} catch (Exception ex) {
			 throw new RejectedExecutionException(ex);
		}
	}

}


More information about the Concurrency-interest mailing list