[concurrency-interest] Single producer, single consumer: unexpected delays for producer

Daniel Harvey dharvey at tachyoncm.com
Fri Aug 8 11:10:38 EDT 2008


Hi,
I'm writing a program that receives a very high message rate  
(~100,000 per second) over a TCP socket, filters out the useful  
information (about 1/5 of the above), and forwards that information  
over another TCP socket. I have been having trouble keeping up with  
the incoming messages, and I think its an oddity of the  
LinkedBlockingQueue I am using to queue the useful updates for later  
sending. The code looks something like this:

public class MessageForwarder {
	SendThread	sendThread = new SendThread();
	 ...

	public void onMessage(RapidMessage message) {
		long start = System.nanoTime();

		// Do a bunch of checks to decide if useful...
		boolean useful = ...
	
		if (useful) {
			sendThread.send(throttled.toMessageString())
		}
		processingTime += (System.nanoTime() - start);
	}

	// inner class
	public class SendThread extends Thread {
	
		private final BlockingQueue<String> queued = new  
LinkedBlockingQueue<String>(10000);
		private final CharsetEncoder              encoder = Charset.forName 
("ISO-8859-1").newEncoder();

		public void send(String message) {
			long start = System.nanoTime();
	 		boolean success = queued.offer(message);
			sendingTime +=  (System.nanoTime() - start);

			if (!success) {
				log.error("Failed to send message: queue is full");
			}
		}

		public void run() {
			while (true) {
				String message = queued.take();
				ByteBuffer buffer = encoder.encode(CharBuffer.wrap(message));
				socketChannel.write(buffer);
			}
		}
	}
...
}

When I print out periodically the value of the "processingTime" and  
the "sendingTime", they are similar: the majority of the processing  
time is taking place in the call to sendThread.send(). The checks to  
see if the message is useful are fast enough that it could run  
100,000 messages per second, but the calls to "send" are much slower,  
only 20,000 messages per second at best. Note that the queue is not  
filling up (not seeing the error message anywhere). What I really  
don't understand is that if I have the consumer (Sending) thread  
throw away the message after it takes it from the queue (ie just do  
repeated "take"s), then the call to sendThread.send() speeds up by a  
factor of around 6. I don't understand how if the queue isn't filling  
up, why would the actions of the consumer matter to the speed of the  
producer?

Any help or advice you can give would be much appreciated!

Thanks,
Dan Harvey



More information about the Concurrency-interest mailing list