[concurrency-interest] Please comment on this approach

Donnie Hale donnie at haleonline.net
Wed May 4 18:46:25 EDT 2005


I'll try to make this as brief as possible. I have an app where the main
thread will do all file I/O, generating some items that need processed. The
result of that processing will be some other items that will be posted to a
message queue using JMS (another somewhat I/O intensive operation). I want
those two processing stages to be handled each in their own dedicated thread
reading from a LinkedBlockingQueue. The reason for a dedicated thread to
handle queued items is due to the high overhead of creating the resources
needed to process the queued items (e.g. a JMS connection) - I don't want to
either recreate those every time or create lots of them for all the threads
in a pool. BTW, I'm using the backport library, as I must use JDK 1.4.

Below is a (hopefully :) reusable approach I've come up with. My main
questions are:

1) Am I missing something in the library that already does this?
2) If not, does my approach have any fundamental design or implementation
flaws?

// fairly obvious interface for handling items pulled from queue
public interface GenericThreadQueueHandler {
	void initialize(GenericThreadQueue q);
	void itemDequeued(Object o);
	void cleanup();
}

/**
 * The envisioned usage is in situations where long-lived, relatively high-
 * overhead, possibly thread-specific resources are required to process the
 * items as they are dequeued (e.g. a JMS connection). In those situations,
the
 * resources can't (or shouldn't) be recreated for every item that is
dequeued. 
 */
public abstract class GenericThreadQueue {

	public GenericThreadQueue(GenericThreadQueueHandler h) {
		queue_ = new LinkedBlockingQueue();
		handler_ = h;
	}
	
	public void start() {
		Runnable r = new GenericRunnable(this, handler_);
		thread_ = Executors.defaultThreadFactory().newThread(r);
		thread_.setDaemon(true);
		thread_.start();
	}
	
	public void stop() {
		thread_.interrupt();
		try {
			thread_.join();
		}
		catch (InterruptedException xcpt) {
			// do nothing
		}
	}
	
	private static class GenericRunnable implements Runnable {
		public GenericRunnable(GenericThreadQueue q,
GenericThreadQueueHandler h) {
			queue_ = q;
			handler_ = h;
		}
		
		public void run() {
			handler_.initialize(queue_);

			try {
				while (true) {
					
					// get next item from queue
					Object o;
					
					try {
						o = queue_.dequeue();
					}
					catch (InterruptedException xcpt) {
						// signal to end thread
						break;
					}
					
					// process item
					handler_.itemDequeued(o);
				}
			}
			finally {
				handler_.cleanup();
			}
		}
		
		GenericThreadQueue queue_;
		private GenericThreadQueueHandler handler_;
	}
	
	protected final void enqueue(Object o) throws InterruptedException {
		queue_.put(o);
	}
	
	protected final Object dequeue() throws InterruptedException {
		return queue_.take();
	}
	
	private LinkedBlockingQueue queue_;
	private Thread thread_;
	private GenericThreadQueueHandler handler_;
}

Thanks, and sorry for the length.

Donnie



More information about the Concurrency-interest mailing list