[concurrency-interest] Single producer, single consumer: unexpected delays for producer

Daniel Harvey dharvey at tachyoncm.com
Sat Aug 9 11:05:07 EDT 2008


David,
I have timed the entire onMessage() processing for the producer  
(which is called very often) and also just the time the producer  
spends in calling queue.offer() (via sendThread.send, called about  
1/5 as often). What is so odd to me is that 90% of the total time  
spent in onMessage() comes from time spent in queue.offer() despite  
the fact that this is only called for 20% of the messages. If I have  
the sendThread skip the socketChannel.write() command, than the  
situation changes greatly: time spent in queue.offer drops by a  
literally a factor of 6 or so.

I haven't tracked the queue size over time, but do know that it isn't  
filling up because I'm not seeing any error messages from queue.offer 
() returning false. As long as the queue is below its fixed max size,  
should its actual size affect the amount of work needed to perform a  
queue insertion?

Anyway, I will certainly try Sun jdk, and look into whether CentOS  
provides the POSIX api's.

I'll also give Carfield's suggestion a try (thanks), though in past  
experiments I have done I haven't found the drainTo to be _that_ much  
more efficient that repeated take()'s.

-Dan

On Aug 9, 2008, at 12:45 AM, David Holmes wrote:

> Dan,
>
> Have you actually just timed the queue operations? You indicated  
> that if the
> consumer just did take() in a loop then there was a 6x speed-up in the
> producer. That would seem to indicate that the queue itself is not the
> problem. It could just be an unfortunate harmonic where the  
> producer and
> consumer always collide at the queue, but even so the delay  
> shouldn't result
> in a 6x slowdown. You might try tracking the queue size to see if  
> grows and
> shrinks as expected or whether it oscillates around being empty.
>
> AS for whether system calls are involved ... probably, but I don't  
> know how
> anything in JRockit is implemented so I can't say for sure. I'm  
> assuming it
> will use both of your processors.
>
> Can you download JDK 6 and try that for comparison? (You might also  
> try an
> evaluation download of Sun Java Real-time System if CentOS provides  
> the
> real-time POSIX API's - and presuming it's actually a real-time  
> linux. :) )
>
> Cheers,
> David Holmes
>
>> -----Original Message-----
>> From: concurrency-interest-bounces at cs.oswego.edu
>> [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of  
>> Daniel
>> Harvey
>> Sent: Saturday, 9 August 2008 12:33 PM
>> To: dholmes at ieee.org
>> Cc: concurrency-interest at cs.oswego.edu
>> Subject: Re: [concurrency-interest] Single producer,single consumer:
>> unexpected delays for producer
>>
>>
>> David, many thanks for your response. I'm running on Linux (CentOS)
>> with a 2 processor machine, though am planning on moving to an 8
>> processor machine in the next few days. I'm currently using jrockit
>> (java 1.6) as the JVM. Today I downloaded jrockit-realtime so perhaps
>> that will help reduce the amount of dropped messages.
>>
>> What confuses me is where the delay is showing up on the producer.
>> Its not showing up as a slowdown in reading from the socket, just in
>> the call to queue.offer(). That queue is really the only shared
>> object that is being used by both threads. I was wondering whether
>> the thread signaling (when the message is put in the queue) is an OS
>> or system level call, and whether that can get held up by the socket
>> writing call? That might explain why the socket sending could hold up
>> the queue insertions. If so, can you think of any way around my  
>> issue?
>>
>> Thanks,
>> Dan
>>
>> On Aug 8, 2008, at 9:49 PM, David Holmes wrote:
>>
>>> Dan,
>>>
>>> The simple answer is that the consumer is using a resource that the
>>> producer
>>> needs. It could be processor cycles, or network access, or
>>> contention on
>>> some shared object at the application, VM or OS level. Assuming  
>>> you're
>>> multi-processor I'd suspect contention somewhere in the TCP/IP
>>> stack. What
>>> OS are you on? How many processors do you have?
>>>
>>> At 100,000 messages per second you only have 10 microseconds to  
>>> fully
>>> process each one. That's a tall order on a non-real-time system.
>>> Even if
>>> your best processing time is within 10us, it isn't a sustainable
>>> rate, so
>>> you should expect to see a lot of variation in the message rate
>>> that you can
>>> handle. When GC kicks in you're going to drop a lot of messages.
>>>
>>> Cheers,
>>> David Holmes
>>>
>>>> -----Original Message-----
>>>> From: concurrency-interest-bounces at cs.oswego.edu
>>>> [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of
>>>> Daniel
>>>> Harvey
>>>> Sent: Saturday, 9 August 2008 1:11 AM
>>>> To: concurrency-interest at cs.oswego.edu
>>>> Subject: [concurrency-interest] Single producer,single consumer:
>>>> unexpected delays for producer
>>>>
>>>>
>>>> Hi,
>>>> I'm writing a program that receives a very high message rate
>>>> (~100,000 per second) over a TCP socket, filters out the useful
>>>> information (about 1/5 of the above), and forwards that information
>>>> over another TCP socket. I have been having trouble keeping up with
>>>> the incoming messages, and I think its an oddity of the
>>>> LinkedBlockingQueue I am using to queue the useful updates for  
>>>> later
>>>> sending. The code looks something like this:
>>>>
>>>> public class MessageForwarder {
>>>> 	SendThread	sendThread = new SendThread();
>>>> 	 ...
>>>>
>>>> 	public void onMessage(RapidMessage message) {
>>>> 		long start = System.nanoTime();
>>>>
>>>> 		// Do a bunch of checks to decide if useful...
>>>> 		boolean useful = ...
>>>>
>>>> 		if (useful) {
>>>> 			sendThread.send(throttled.toMessageString())
>>>> 		}
>>>> 		processingTime += (System.nanoTime() - start);
>>>> 	}
>>>>
>>>> 	// inner class
>>>> 	public class SendThread extends Thread {
>>>>
>>>> 		private final BlockingQueue<String> queued = new
>>>> LinkedBlockingQueue<String>(10000);
>>>> 		private final CharsetEncoder              encoder =
>>>> Charset.forName
>>>> ("ISO-8859-1").newEncoder();
>>>>
>>>> 		public void send(String message) {
>>>> 			long start = System.nanoTime();
>>>> 	 		boolean success = queued.offer(message);
>>>> 			sendingTime +=  (System.nanoTime() - start);
>>>>
>>>> 			if (!success) {
>>>> 				log.error("Failed to send message:
>>>> queue is full");
>>>> 			}
>>>> 		}
>>>>
>>>> 		public void run() {
>>>> 			while (true) {
>>>> 				String message = queued.take();
>>>> 				ByteBuffer buffer =
>>>> encoder.encode(CharBuffer.wrap(message));
>>>> 				socketChannel.write(buffer);
>>>> 			}
>>>> 		}
>>>> 	}
>>>> ...
>>>> }
>>>>
>>>> When I print out periodically the value of the "processingTime" and
>>>> the "sendingTime", they are similar: the majority of the processing
>>>> time is taking place in the call to sendThread.send(). The  
>>>> checks to
>>>> see if the message is useful are fast enough that it could run
>>>> 100,000 messages per second, but the calls to "send" are much  
>>>> slower,
>>>> only 20,000 messages per second at best. Note that the queue is not
>>>> filling up (not seeing the error message anywhere). What I really
>>>> don't understand is that if I have the consumer (Sending) thread
>>>> throw away the message after it takes it from the queue (ie just do
>>>> repeated "take"s), then the call to sendThread.send() speeds up  
>>>> by a
>>>> factor of around 6. I don't understand how if the queue isn't  
>>>> filling
>>>> up, why would the actions of the consumer matter to the speed of  
>>>> the
>>>> producer?
>>>>
>>>> Any help or advice you can give would be much appreciated!
>>>>
>>>> Thanks,
>>>> Dan Harvey
>>>>
>>>> _______________________________________________
>>>> Concurrency-interest mailing list
>>>> Concurrency-interest at cs.oswego.edu
>>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>>
>>
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>



More information about the Concurrency-interest mailing list