[concurrency-interest] Special Conditions and PriorityBlockingQueue

David Walend david at walend.net
Sat Sep 23 10:47:23 EDT 2006

In the SomnifugiJMS project, I'm getting close to implementing the  
full JMS specification. The last big technical hurdle is implementing  
message selectors for JMS Queues. (Message selectors are these odd  
bits of SQL92 that make sure a particular queue only receives  
messages with particular properties represented as key/value pairs.)

Currently, JMS Queues are wrapped around BlockingQueues, usually  
PriorityBlockingQueues. I just looked at the PriorityBlockingQueue  
code, which wraps a ReentrantLock and a notEmpty Condition around a  

Message selectors in Topics were fairly easy. Each subscriber to a  
topic gets its own BlockingQueue. If a subscriber's message selector  
doesn't match the message, the message doesn't drop into that  
subscriber's BlockingQueue.

That approach won't work for JMS Queues. All QueueReceivers currently  
work off the same BlockingQueue, and I'd like to keep that abstraction.

I have two vague ideas for how to get message selectors working for  
JMS Queues, neither of which is great. I'd like to get some feedback  
before trying either.

Multiple Conditions: Write a custom BlockingQueue with a Condition  
for each message selector. Start with the code for  
PriorityBlockingQueue. Add custom code to create a Map<JMS  
QueueReceiver with a MessageSelector,Condition>, that holds waiting  
receivers. When a message enters the queue, check the waiting  
receivers to see if any message selectors match the message; if so,  
signal that receiver's take() or poll() call. When a receiver first  
take()s or poll()s or is signaled by its Condition, iterate through  
the queue for a matching message. If there's a message, return it and  
(if needed) remove the entry from the Map. If no message matches, add  
the receiver to the Map.

Multiple BlockingQueues: Use a Map<MessageSelector,BlockingQueue>,  
with one entry for each MessageSelector plus one for all messages:  
When a new QueueReceiver with a new MessageSelector is created, add a  
new entry to the Map. When a message is sent, check all the  
MessageSelectors, and place the message in all the corresponding  
BlockingQueues. When a QueueReceiver takes() a message successfully,  
remove() it from all the BlockingQueues. Fold all this into a class  
that implements BlockingQueue.

Neither approach is without problems. The first requires creating  
Conditions dynamically, which the example code doesn't do, plus  
iterating through the queue (could be mitigated by holding onto the  
matching message). The second has this traffic jam of remove()ing  
from multiple Queues. I'll take any advice on where to start.



David Walend
david at walend.net

More information about the Concurrency-interest mailing list