[concurrency-interest] Please comment on this approach

Donnie Hale donnie at haleonline.net
Wed May 4 22:50:36 EDT 2005


David,

Thanks for the comments. Follow-ups:

1. The docs for defaultThreadFactory and the ThreadFactory interface read as
though they are available general purpose usage. That's why I picked it.
I'll look into SingleThreadExecutor to see how that would alter what I've
put together.

2. Back in my native Win32 days, I'd use a sentinel Event and
WaitForMultipleObjects to be notified that I should fall out the bottom of a
thread. There's probably something like that in the concurrent API, but I'm
just getting familiar with it and have likely missed the "canonical" way to
do that, so to speak. Perhaps you can point me a little further in the right
direction.

Thanks again,

Donnie
 

-----Original Message-----
From: David Holmes [mailto:dholmes at dltech.com.au] 
Sent: Wednesday, May 04, 2005 9:11 PM
To: Donnie Hale
Cc: concurrency-jsr
Subject: RE: [concurrency-interest] Please comment on this approach

Your basic approach seems sound enough. Basically the main thread pushes the
objects to be processed into a LinkedBlockingQueue and you create a thread
that pulls them from the queue and processes them and pushes them out to
JMS.

If you intend to use this a lot then encapsulating the thread and queue
together makes good sense, but for one-off usage it isn't necessary to make
it so complicated.

Two minor points:

1.  thread_ = Executors.defaultThreadFactory().newThread(r);

The default thread factory wasn't intended for general purpose thread
construction outside of the pool. As this is your thread, that you control,
just construct it. It won't do any harm, but it is an odd usage in my view
and your thread will get an odd name. :)

That said, you might consider using a SingleThreadExecutor just in case you
want to crank up the concurrency at a later stage.

2. Rather than stopping the thread by interupting it, consider just putting
a sentinel object into the queue. You can clear the queue before inserting
the sentinel if you want to stop the current contents from being cleared. Of
course if you want to terminate the currently processing object then
interrupt is your only chance - assuming the processing code checks for
interrupts. The interrupt isn't wrong, I just prefer a sentinel for this
kind of situation, and interrupts always require you know how all the code
you will execute responds to interruption.

Cheers,
David Holmes

> -----Original Message-----
> From: concurrency-interest-bounces at cs.oswego.edu
> [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of Donnie 
> Hale
> Sent: Thursday, 5 May 2005 8:46 AM
> To: concurrency-interest at altair.cs.oswego.edu
> Subject: [concurrency-interest] Please comment on this approach
>
>
> I'll try to make this as brief as possible. I have an app where the 
> main thread will do all file I/O, generating some items that need 
> processed. The result of that processing will be some other items that 
> will be posted to a message queue using JMS (another somewhat I/O 
> intensive operation). I want those two processing stages to be handled 
> each in their own dedicated thread reading from a LinkedBlockingQueue. 
> The reason for a dedicated thread to handle queued items is due to the 
> high overhead of creating the resources needed to process the queued 
> items (e.g. a JMS connection) - I don't want to either recreate those 
> every time or create lots of them for all the threads in a pool. BTW, 
> I'm using the backport library, as I must use JDK 1.4.
>
> Below is a (hopefully :) reusable approach I've come up with. My main 
> questions are:
>
> 1) Am I missing something in the library that already does this?
> 2) If not, does my approach have any fundamental design or 
> implementation flaws?
>
> // fairly obvious interface for handling items pulled from queue 
> public interface GenericThreadQueueHandler {
> 	void initialize(GenericThreadQueue q);
> 	void itemDequeued(Object o);
> 	void cleanup();
> }
>
> /**
>  * The envisioned usage is in situations where long-lived, relatively 
> high-
>  * overhead, possibly thread-specific resources are required to 
> process the
>  * items as they are dequeued (e.g. a JMS connection). In those 
> situations, the
>  * resources can't (or shouldn't) be recreated for every item that is 
> dequeued.
>  */
> public abstract class GenericThreadQueue {
>
> 	public GenericThreadQueue(GenericThreadQueueHandler h) {
> 		queue_ = new LinkedBlockingQueue();
> 		handler_ = h;
> 	}
>
> 	public void start() {
> 		Runnable r = new GenericRunnable(this, handler_);
> 		thread_ = Executors.defaultThreadFactory().newThread(r);
> 		thread_.setDaemon(true);
> 		thread_.start();
> 	}
>
> 	public void stop() {
> 		thread_.interrupt();
> 		try {
> 			thread_.join();
> 		}
> 		catch (InterruptedException xcpt) {
> 			// do nothing
> 		}
> 	}
>
> 	private static class GenericRunnable implements Runnable {
> 		public GenericRunnable(GenericThreadQueue q, 
> GenericThreadQueueHandler h) {
> 			queue_ = q;
> 			handler_ = h;
> 		}
>
> 		public void run() {
> 			handler_.initialize(queue_);
>
> 			try {
> 				while (true) {
>
> 					// get next item from queue
> 					Object o;
>
> 					try {
> 						o = queue_.dequeue();
> 					}
> 					catch (InterruptedException xcpt) {
> 						// signal to end thread
> 						break;
> 					}
>
> 					// process item
> 					handler_.itemDequeued(o);
> 				}
> 			}
> 			finally {
> 				handler_.cleanup();
> 			}
> 		}
>
> 		GenericThreadQueue queue_;
> 		private GenericThreadQueueHandler handler_;
> 	}
>
> 	protected final void enqueue(Object o) throws InterruptedException {
> 		queue_.put(o);
> 	}
>
> 	protected final Object dequeue() throws InterruptedException {
> 		return queue_.take();
> 	}
>
> 	private LinkedBlockingQueue queue_;
> 	private Thread thread_;
> 	private GenericThreadQueueHandler handler_; }
>
> Thanks, and sorry for the length.
>
> Donnie
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>





More information about the Concurrency-interest mailing list