[concurrency-interest] Special Conditions and PriorityBlockingQueue
David Walend
david at walend.net
Sat Sep 23 10:47:23 EDT 2006
In the SomnifugiJMS project, I'm getting close to implementing the
full JMS specification. The last big technical hurdle is implementing
message selectors for JMS Queues. (Message selectors are these odd
bits of SQL92 that make sure a particular queue only receives
messages with particular properties represented as key/value pairs.)
Currently, JMS Queues are wrapped around BlockingQueues, usually
PriorityBlockingQueues. I just looked at the PriorityBlockingQueue
code, which wraps a ReentrantLock and a notEmpty Condition around a
PriorityQueue.
Message selectors in Topics were fairly easy. Each subscriber to a
topic gets its own BlockingQueue. If a subscriber's message selector
doesn't match the message, the message doesn't drop into that
subscriber's BlockingQueue.
That approach won't work for JMS Queues. All QueueReceivers currently
work off the same BlockingQueue, and I'd like to keep that abstraction.
I have two vague ideas for how to get message selectors working for
JMS Queues, neither of which is great. I'd like to get some feedback
before trying either.
Multiple Conditions: Write a custom BlockingQueue with a Condition
for each message selector. Start with the code for
PriorityBlockingQueue. Add custom code to create a Map<JMS
QueueReceiver with a MessageSelector,Condition>, that holds waiting
receivers. When a message enters the queue, check the waiting
receivers to see if any message selectors match the message; if so,
signal that receiver's take() or poll() call. When a receiver first
take()s or poll()s or is signaled by its Condition, iterate through
the queue for a matching message. If there's a message, return it and
(if needed) remove the entry from the Map. If no message matches, add
the receiver to the Map.
Multiple BlockingQueues: Use a Map<MessageSelector,BlockingQueue>,
with one entry for each MessageSelector plus one for all messages:
When a new QueueReceiver with a new MessageSelector is created, add a
new entry to the Map. When a message is sent, check all the
MessageSelectors, and place the message in all the corresponding
BlockingQueues. When a QueueReceiver takes() a message successfully,
remove() it from all the BlockingQueues. Fold all this into a class
that implements BlockingQueue.
Neither approach is without problems. The first requires creating
Conditions dynamically, which the example code doesn't do, plus
iterating through the queue (could be mitigated by holding onto the
matching message). The second has this traffic jam of remove()ing
from multiple Queues. I'll take any advice on where to start.
Thanks,
Dave
David Walend
david at walend.net
More information about the Concurrency-interest
mailing list