[concurrency-interest] Weird behavior

jean.morissette666 at videotron.ca jean.morissette666 at videotron.ca
Fri Apr 8 21:42:46 EDT 2005


Hi all,
I have written a very simple test (see below) that consists of one stage 
connected by one input queue and one output queue.  The stage mainly forwards 
elements from its input queue to its output queue.  Elements are time stamped 
and enqueued in ascending order.  Note that the stage is single threaded.  

Here is the output:

Element timestamp = 0
Element timestamp = 1
Element timestamp = 2
Element timestamp = 3
Element timestamp = 4
Element timestamp = 5
Element timestamp = 9
Element timestamp = 6
Exception in thread "Thread-2" java.lang.AssertionError: Ouput elements not 
ordered
	at execution.operator.Test$3.run(Test.java:99)


More weird, the test will pass without error if you (un)comment the two 
indicated lines XXX.


My config: 
- Linux 2.6 
- Sun jdk1.5.0_01
- Javac


Am I missing something?
Thanks,
-Jean


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;

public class Test {

	BlockingQueue outputQueue;
	BlockingQueue inputQueue;

	// Size of the window
	int windowSize;

	// Timestamp of the last output tuple
	long lastOutputTs = 0;

	// MIN (Number of tuples processed so far, windowSize)
	int numProcessed;

	public void process(int numElements) {

		Element inElement;

		// XXX : test will work if you comment this line
		Element outElement = new Element();

		for (int e = 0 ; e < numElements; e++) {
			inElement = (Element) inputQueue.poll();
			if (inElement == null) break;

			assert (inElement.timestamp >= lastOutputTs);
			outputQueue.offer(inElement);
			lastOutputTs = inElement.timestamp;

			assert (numProcessed <= windowSize);
			if (numProcessed == windowSize) {

				// XXX : the test will work if you uncomment this line
//				Element outElement = new Element();
				outElement.timestamp = inElement.timestamp;

				assert (outElement.timestamp >= lastOutputTs);
				outputQueue.offer(outElement);
			}
			else {
				numProcessed++;
			}
		}
	}

	public static class Element {

		public long timestamp;

		public String toString() {
			return "Element timestamp = " + timestamp;
		}

	}


	public static void main(String[] args) throws InterruptedException {
		final Test stage = new Test();
		final BlockingQueue inputQ = new ArrayBlockingQueue(1000);
		final BlockingQueue outputQ = new ArrayBlockingQueue(1000);

		stage.windowSize = 5;
		stage.inputQueue = inputQ;
		stage.outputQueue = outputQ;

		// start producer
		new Thread() {
			public void run() {
				for (int i = 0; i < 100; i++) {
					Element el = new Element();
					el.timestamp = i;
					inputQ.offer(el);
					if (1 % 10 == 0) yield();
				}
			}
		}.start();

		// start stage
		new Thread() {
			public void run() {
				while (true) {
					stage.process(10);
				}
			}
		}.start();

		// start consumer
		new Thread() {
			public void run() {
				long lastTs = 0;
				while (true) {
					Element el = null;
					try {
						el = (Element) outputQ.take();
						System.out.println(el);
						assert (el.timestamp >= lastTs) : "Ouput elements not ordered";
						lastTs = el.timestamp;
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}.start();

	}

}


More information about the Concurrency-interest mailing list