[concurrency-interest] Backport: Signal Question?

Brian O'Connell broconne at vt.edu
Sun May 27 16:41:39 EDT 2007


I have code in a production system that is deployed on multiple nodes and
has been executing unmodified for over a year.  Within the past few days I
have detected multiple failures across all deployed nodes.  While the code
has not changed the underlying systems are often patched with security
fixes, etc.  The nodes of the system are linux running on powerpc.  A quick
"sum" test tells me I am on backport 2.0_01 with java version "java full
version "J2RE 1.4.2 IBM build cxppc32142-20050609".  I have narrowed the
problem to a bug in my "OrderedQueue.java" which is composed with an
underlying PriorityQueue from the backport library.    The OrderedQueue is
supposed to act as a priority queue, except elements are expected to be
added implementing an interface describing their number in a sequence.
Consumers are blocked from taking an element until the next expected
sequence number is available or the specified wait time elapses at which
point they may return the next lowest sequence number for processing.  (In
retrospect I should name my method something besides take since it in no way
is true to the documentation for the Queue interface).  These queues may
have multiple producers but are guaranteed to only have one consumer
thread.  By adding copious logging I have narrowed it further to an infinite
loop in my take method.  I will attach both the take and processAddition
method (called by offer).

Here is take:
   public Object take(long timeout, TimeUnit timeUnit) throws
InterruptedException {

        final ReentrantLock lock = this.lock;

        logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".  take
Acquiring Lock");

        lock.lock();

        try {
        logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".  take
Acquired Lock");

            for (;;) {
                logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
peeking queue");

                NumberedElement element = (NumberedElement) queue.peek();

                logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
Queue peeked");

                if ( element != null ) {
                    long elementNumber = element.getNumber();

                    logger.log(Level.FINE, "Queue: \"" + this.toString() +
"\".  Element on Queue.  Object " + element.toString() + " Number: " +
elementNumber);

                    if ( (elementNumber <= nextNumber)
                            || (elementNumber ==
NumberedElement.SIGNAL_NUMBER) ) {

                        logger.log(Level.FINE, "Queue: \"" + this.toString()
+ "\".  Element on queue is the expected next element.  Removing and
returning" );

                        queue.remove();
                        removedObject( element );
                        return element;

                    }
                    else {
                        /* Element exists, just not right one */
                        try {

                            logger.log(Level.FINE, "Queue: \"" +
this.toString() + "\".  Element on queue is not the expected next element.
Waiting timeout on newLowest condition for  " + timeout + " for lower
element");

                            boolean signaled = newLowest.await( timeout,
timeUnit );

                            if ( signaled == false ) {
                                /* Timeout occurred. Take this element after
a recheck */
                                logger.log(Level.FINE, "Queue: \"" +
this.toString() + "\" not signaled for lower elment.");

                                logger.log(Level.FINE, "Queue: \"" +
this.toString() + "\" Checking to make sure element still exists on queue
and wasn't stolen during out signal await.");
                                element = (NumberedElement) queue.poll();
                                if ( element != null ) {
                                    logger.log(Level.FINE, "Queue: \"" +
this.toString() + "\" An element still exists.  Removed and returning");
                                    removedObject( element );
                                    return element;
                                }
                                else{
                                    logger.log(Level.FINE, "Queue: \"" +
this.toString() + "\" Element disappeared.. WTF?");
                                }
                            }
                            else{
                                    logger.log(Level.FINE, "Queue: \"" +
this.toString() + "\" New Lowest Signaled During wait period.
Rechecking.");
                            }
                        }
                        catch(InterruptedException exception) {
                               logger.log(Level.FINE, "Queue: \"" +
this.toString() + "\" Interrupted exception.");
                            notEmpty.signal();
                            throw exception;
                        }
                    }
                }
                else {
                    /* Queue is empty wait until it is not */
                    logger.log(Level.FINE, "Queue: \"" + this.toString() +
"\".  Queue empty awaiting notEmpty signal.");

                    notEmpty.await();

                    logger.log(Level.FINE, "Queue: \"" + this.toString() +
"\".  notEmpty signaled.");
                }
            }
        }
        finally {
            lock.unlock();
            logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
take Released Lock");
        }

    }


Here is processAddition:
private final void processAddition(Object queueElement) {


        NumberedElement element = (NumberedElement) queueElement;
        long number = element.getNumber();

        logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
Processing Addition to queue.  Object " + queueElement.toString() + "
Number: " + number);

        /*
         * Place object on queue. If object is next number up, notify
dequeue If object is not next number up, set timer
         * for dequeue
         *
         */
        logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
processAddition Acquiring Lock");

        final ReentrantLock lock = this.lock;
        lock.lock();

        logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
processAddition Acquired Lock");

        try {
            queue.offer( element );

            logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
Added object to Priority Queue.  Object " + queueElement.toString() + "
Number: " + number);

            notEmpty.signal();

            logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
Condition notEmpty signaled");

            if ( number == Long.MAX_VALUE ) {
                newLowest.signal();
                removeNow.signal();

                logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
Condition newLowest and removeNow signaled");

            }
            else {
                if ( number <= lowestNumber ) {
                    /* Notify to take off */
                    lowestNumber = number;
                    newLowest.signal();
                    logger.log(Level.FINE, "Queue: \"" + this.toString() +
"\".  Condition newLowest signaled");
                }
                if ( number <= nextNumber ) {
                    removeNow.signal();
                    logger.log(Level.FINE, "Queue: \"" + this.toString() +
"\".  Condition removeNow signaled");
                }
            }

            inserted = true;
        }
        finally {
            lock.unlock();
            logger.log(Level.FINE, "Queue: \"" + this.toString() + "\".
processAddition Released Lock");
        }

    }


Please note that the two entries in processAddition are the only place in
which newLowest.signal() is called and they are surrounded by logging calls
indicating such.  The behavior I have found is that occasionally an infinite
loop occurs in take.  I was under the impression that that if a signal
occurs an await(long,TimeUnit) would clear that signal before returning and
therefore the next call to await(long, TimeUnit) would not return true
unless another signal occurs either before or during the await call.  I do
not actually see that specified in the Condition javadoc but based on the
sample BoundedBuffer in the javadoc it seems that is likely the desired
behavior..  Here is the log entries from a failure scenario:

05/27 19:43:32[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on queue is
not the expected next element. Waiting timeout on newLowest condition for
2000 for lower element
05/27 19:43:34[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e" New Lowest Signaled
During wait period.  Rechecking.
05/27 19:43:34[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  peeking queue
05/27 19:43:34[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Queue peeked
05/27 19:43:34[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on Queue.
Object /projects/publish/odd/en_US/about/bNimble-53062.stg Number: 19645
05/27 19:43:34[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on queue is
not the expected next element. Waiting timeout on newLowest condition for
2000 for lower element
05/27 19:43:36[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e" New Lowest Signaled
During wait period.  Rechecking.
05/27 19:43:36[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  peeking queue
05/27 19:43:36[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Queue peeked
05/27 19:43:36[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on Queue.
Object /projects/publish/odd/en_US/about/bNimble-53062.stg Number: 19645
05/27 19:43:36[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on queue is
not the expected next element. Waiting timeout on newLowest condition for
2000 for lower element
05/27 19:43:38[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e" New Lowest Signaled
During wait period.  Rechecking.
05/27 19:43:38[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  peeking queue
05/27 19:43:38[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Queue peeked
05/27 19:43:38[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on Queue.
Object /projects/publish/odd/en_US/about/bNimble-53062.stg Number: 19645
05/27 19:43:38[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on queue is
not the expected next element. Waiting timeout on newLowest condition for
2000 for lower element
05/27 19:43:40[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e" New Lowest Signaled
During wait period.  Rechecking.
05/27 19:43:40[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  peeking queue
05/27 19:43:40[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Queue peeked
05/27 19:43:40[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on Queue.
Object /projects/publish/odd/en_US/about/bNimble-53062.stg Number: 19645
05/27 19:43:40[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on queue is
not the expected next element. Waiting timeout on newLowest condition for
2000 for lower element
05/27 19:43:42[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e" New Lowest Signaled
During wait period.  Rechecking.
05/27 19:43:42[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  peeking queue
05/27 19:43:42[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Queue peeked
05/27 19:43:42[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on Queue.
Object /projects/publish/odd/en_US/about/bNimble-53062.stg Number: 19645
05/27 19:43:42[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on queue is
not the expected next element. Waiting timeout on newLowest condition for
2000 for lower element
05/27 19:43:44[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e" New Lowest Signaled
During wait period.  Rechecking.
05/27 19:43:44[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  peeking queue
05/27 19:43:44[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Queue peeked
05/27 19:43:44[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on Queue.
Object /projects/publish/odd/en_US/about/bNimble-53062.stg Number: 19645
05/27 19:43:44[1010124611](      Fine)[OrderedQueue/take]Queue: "
events.bnimble.plugin.endpoint.OrderedQueue at 36bd2e9e".  Element on queue is
not the expected next element. Waiting timeout on newLowest condition for
2000 for lower element


As you can see every two seconds it pops out of an await thinking the signal
has been thrown without any logging indicating a signal() call.  Do I
misunderstand JUC signals and they don't get reset?  I know spurious wakeups
and signals may occur but this goes beyond spurious into infinite.  Is there
something obvious I am missing?


Thanks,
Brian
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20070527/36788da4/attachment-0001.html 


More information about the Concurrency-interest mailing list