[concurrency-interest] ConcurrentHashMap NullPointerException

BJ Low notorand at gmail.com
Sun Jan 6 22:04:05 EST 2008


Sorry it shld be this

    public void onOpen(Conn conn) {

        // create new Member1 and store to Conn-Member1 mapping
        this.mapOfMembers.put(conn, new Member1());
    }

and removed during

    public void onClose(Conn conn) {

        // remove the Conn-Member1 mapping
        Member1 member1 = this.mapOfMembers.remove(conn);
    }

however during accessing it during

    public void onRead(Conn conn) {

        // get the Conn-Member1 mapping
        Member1 member1 = this.mapOfMembers.get(conn);
         .....
         .....
    }

It shld be Conn not Upstream that is passed as the parameter. Conn is the
parent class of Upstream. Upstream is the inner class of Conn. However like
i say, it is not impt of how the key is constructed, as i did a
synchronized(this) before every event delivery, therefore even if another
object class X is constructed during onOpen() as the key, it shld also be
visible

class Conn {
   ....
   ....- Hide quoted text -


   class Upstream implements Runnable {

        private Logger logger = Logger.getLogger(Conn.Upstream.class);

        private ReentrantLock lockBuffer = new ReentrantLock();
        private ArrayDeque<EventIO> buffer = new ArrayDeque<EventIO>();

        private boolean streamInProc = false;
        private boolean eof = false;

        private Upstream() {
        }

        private void add(EventIO eventIO) {
            try {
                // prevent case where Event exist in buffer, but not
submitted to threadpool
                // - Thread_1, + IOHandlerThreadPool
                //
                // + run()
                // + if (this.buffer.size() != 0) test false
                // - add(EventIO eventIO)
                // - this.buffer.add(eventIO)
                // - if (!this.streamInProc) test false
                // + this.streamInProc = false
                // event in buffer but not submitted
                //
                // prevent concurrent access to buffer as ArrayDeque is not
thread-safe
                // - Thread_1, + IOHandlerThreadPool
                //
                // - add(EventIO eventIO)
                // - this.buffer.add(eventIO)
                // + run()
                // + eventIO = this.buffer.poll()
                this.lockBuffer.lock();

                // to prevent out-of-memory by not adding, within lockBuffer
to prevent memory consistency errors
                if (!this.eof) {

                    this.buffer.add(eventIO);

                    if (!this.streamInProc) {
                        Conn.this.ioHandlerThreadPool.submit(this);
                        this.streamInProc = true;
                    }

                    // CLOSE, CONNECT_FAIL will be the last event to be
processed, therefore any further add to the back add()/front close() of the
buffer is
                    // redundant
                    if ((eventIO == EventIO.CLOSE) || (eventIO ==
EventIO.CONNECT_FAIL)) {
                        this.eof = true;
                    }
                }
            }
            finally {
                this.lockBuffer.unlock();
            }
        }

        @Override
        public void run() {
            try {
                int granularity = 0;

                // stop delivery when granularity is reached, or no more
EventIO left in buffer to deliver
                boolean endLoop = false;
                boolean closed = false;
                while (!endLoop && !closed) {

                    EventIO eventIO = null;
                    try {
                        // prevent concurrent access to buffer as ArrayDeque
is not thread-safe
                        // - Thread_1, + IOHandlerThreadPool
                        //
                        // - add(EventIO eventIO)
                        // - this.buffer.add(eventIO)
                        // + run()
                        // + eventIO = this.buffer.poll()
                        this.lockBuffer.lock();

                        // remove an Object from buffer
                        eventIO = this.buffer.poll();
                    }
                    finally {
                        this.lockBuffer.unlock();
                    }

                    if (eventIO == null) {
                        endLoop = true;
                    }
                    else {
                        // prevent memory consistency errors whereby values
written by current thread from IOHandlerThreadPool is not flushed, causing
the next
                        // run() reading stale values if it is a different
thread from this current thread
                        synchronized (this) {

                            // delivery
                            if (eventIO == EventIO.OPEN) {
                                Conn.this.handleOpen();
                            }
                            else if (eventIO == EventIO.READ) {
                                Conn.this.handleRead ();
                            }
                            else if ((eventIO == EventIO.IDLE_READ) ||
(eventIO == EventIO.IDLE_WRITE)) {
                                Conn.this.handleIdle(eventIO);
                            }
                            else if (eventIO == EventIO.CLOSE) {
                                Conn.this.handleClose();
                                closed = true;
                            }
                            else if (eventIO == EventIO.CONNECT_FAIL) {
                                Conn.this.handleConnectFail();
                                closed = true;
                            }
                        }

                        // update granularity
                        granularity++;
                        if (granularity == 10) {
                            endLoop = true;
                        }
                    }
                }

                // if EventIO.CLOSE is reached, then permanently
streamInProc stays true, ensuring no more EventIO will be delivered
                if (!closed) {
                    try {
                        // prevent case where Event exist in buffer, but not
submitted to threadpool
                        // - Thread_1, + IOHandlerThreadPool
                        //
                        // + run()
                        // + if (this.buffer.size() != 0) test false
                        // - add(EventIO eventIO)
                        // - this.buffer.add(eventIO)
                        // - if (!this.streamInProc) test false
                        // + this.streamInProc = false
                        // event in buffer but not submitted
                        this.lockBuffer.lock();

                        if (this.buffer.size() != 0) {
                            Conn.this.ioHandlerThreadPool.submit(this);
                        }
                        else {
                            this.streamInProc = false;
                        }
                    }
                    finally {
                        this.lockBuffer.unlock ();
                    }
                }
            }
            catch (Throwable e) {
                // log FATAL, critical thread dies, node unable to function
                LoggerProxy.fatal( this.logger , "SYSTEM", "Died of
unchecked exception, node terminating", e);
                // fatal, terminate the program
                System.exit(1);
            }
        }
    }

}


> On Jan 7, 2008 10:39 AM, David Holmes <dcholmes at optusnet.com.au> wrote:
>
> >  BJ,
> >
> > The upstream parameter is being used as the key in put/get. Where does
> > this come from?
> >
> > David
> >
> > -----Original Message-----
> > *From:* BJ Low [mailto:notorand at gmail.com]
> > *Sent:* Monday, 7 January 2008 12:24 PM
> > *To:* dholmes at ieee.org
> > *Cc:* concurrency-interest at cs.oswego.edu
> > *Subject:* Re: [concurrency-interest] ConcurrentHashMap
> > NullPointerException
> >
> >
> >
> > On Jan 7, 2008 10:05 AM, David Holmes <dcholmes at optusnet.com.au> wrote:
> >
> > >  BJ,
> > >
> > > The original problem involved the construction and publication of the
> > > key object X. I still can not see where/how this key object is created and
> > > published.
> > >
> >
> > The key is created in the listener during onOpen()
> >
> >     public void onOpen(Upstream upstream) {
> >
> >         // create new Member1 and store to Upstream-Member1 mapping
> >         this.mapOfMembers.put(upstream, new Member1());
> >     }
> >
> > and removed during
> >
> >     public void onClose(Upstream upstream) {
> >
> >         // remove the Upstream-Member1 mapping
> >         Member1 member1 = this.mapOfMembers.remove(upstream);
> >     }
> >
> > however during accessing it during
> >
> >     public void onRead(Upstream upstream) {
> >
> >         // get the Upstream-Member1 mapping
> >         Member1 member1 = this.mapOfMembers.get(upstream);
> >          .....
> >          .....
> >     }
> >
> > it returns null instead, and it is ensured that ordering onOpen ->
> > onRead() -> .... -> onRead() -> onClose()
> > In fact, it is not important how the key is published cause everytime i
> > do an event delivery i will synchronized(this) to ensure that any values
> > that is created will be a write-through cache.
> >
> >   > It is a very rare occurrance. I have run rigourous tests on the code
> > > day and
> > > > night and it never happens, there is only once when i am doing
> > > simple testing
> > > > (under very insignifcant load) that it occurs.
> > >
> > > That is the opposite of what you would expect with a JMM issue. It is
> > > more what you would expect with a "simple" race condition.
> > >
> >
> > I think JMM issues are harder to arise. Frequently I have not declared a
> > boolean variable to to be volatile, and memory consistency issues do not
> > arise at all. Well i have not done this kind of tests under the server mode
> > jvm but in client mode, this kind of memory consistency issues seldom occur.
> > In fact, race condition deadlocks and such is easier to detect when the code
> > is put under rigorous day and night load tests
> >
> >
> > > > I am not very sure if it is fixed, but it seems that someone else
> > > has also
> > > > encountered the same kind of bug http://osdir.com/ml/java.jsr.166-concurrency/2003-12/msg00018.html
> > >
> > > Unfortunately after having been told that CHM does establish some JMM
> > > happens-before relationships, the original reporter didn't take this any
> > > further, so we have no way to know what actually happened in his case.
> > >
> > >
> >
> > Unfortunately there is no contact for this reporter, but I think it is
> > very hard to reproduce the same error again
> >
> >
> > >  David Holmes
> > >
> > >  -----Original Message-----
> > > *From:* concurrency-interest-bounces at cs.oswego.edu [mailto:concurrency-interest-bounces at cs.oswego.edu
> > > ]*On Behalf Of *BJ Low
> > > *Sent:* Monday, 7 January 2008 11:20 AM
> > > *To:* dholmes at ieee.org
> > > *Cc:* concurrency-interest at cs.oswego.edu
> > >  *Subject:* Re: [concurrency-interest] ConcurrentHashMap
> > > NullPointerException
> > >
> > >
> > >
> > > On Jan 7, 2008 6:32 AM, David Holmes <dcholmes at optusnet.com.au> wrote:
> > >
> > > >  BJ,
> > > >
> > > > I'm still missing two key parts to this:
> > > >
> > > > 1. As Endre asked, how do you know that Thread-1 has completed its
> > > > action
> > > >
> > >
> > > class Upstream implements Runnable {
> > >
> > >         private Logger logger = Logger.getLogger(Conn.Upstream.class);
> > >
> > >         private ReentrantLock lockBuffer = new ReentrantLock();
> > >         private ArrayDeque<EventIO> buffer = new
> > > ArrayDeque<EventIO>();
> > >
> > >         private boolean streamInProc = false;
> > >         private boolean eof = false;
> > >
> > >         private Upstream() {
> > >         }
> > >
> > >         private void add(EventIO eventIO) {
> > >             try {
> > >                 // prevent case where Event exist in buffer, but not
> > > submitted to threadpool
> > >                 // - Thread_1, + IOHandlerThreadPool
> > >                 //
> > >                 // + run()
> > >                 // + if (this.buffer.size() != 0) test false
> > >                 // - add(EventIO eventIO)
> > >                 // - this.buffer.add(eventIO)
> > >                 // - if (!this.streamInProc) test false
> > >                 // + this.streamInProc = false
> > >                 // event in buffer but not submitted
> > >                 //
> > >                 // prevent concurrent access to buffer as ArrayDeque
> > > is not thread-safe
> > >                 // - Thread_1, + IOHandlerThreadPool
> > >                 //
> > >                 // - add(EventIO eventIO)
> > >                 // - this.buffer.add(eventIO)
> > >                 // + run()
> > >                 // + eventIO = this.buffer.poll()
> > >                 this.lockBuffer.lock();
> > >
> > >                 // to prevent out-of-memory by not adding, within
> > > lockBuffer to prevent memory consistency errors
> > >                 if (!this.eof) {
> > >
> > >                     this.buffer.add(eventIO);
> > >
> > >                     if (!this.streamInProc) {
> > >                         Conn.this.ioHandlerThreadPool.submit(this);
> > >                         this.streamInProc = true;
> > >                     }
> > >
> > >                     // CLOSE, CONNECT_FAIL will be the last event to
> > > be processed, therefore any further add to the back add()/front close() of
> > > the buffer is
> > >                     // redundant
> > >                     if ((eventIO == EventIO.CLOSE) || (eventIO ==
> > > EventIO.CONNECT_FAIL)) {
> > >                         this.eof = true;
> > >                     }
> > >                 }
> > >             }
> > >             finally {
> > >                 this.lockBuffer.unlock();
> > >             }
> > >         }
> > >
> > >         @Override
> > >         public void run() {
> > >             try {
> > >                 int granularity = 0;
> > >
> > >                 // stop delivery when granularity is reached, or no
> > > more EventIO left in buffer to deliver
> > >                 boolean endLoop = false;
> > >                 boolean closed = false;
> > >                 while (!endLoop && !closed) {
> > >
> > >                     EventIO eventIO = null;
> > >                     try {
> > >                         // prevent concurrent access to buffer as
> > > ArrayDeque is not thread-safe
> > >                         // - Thread_1, + IOHandlerThreadPool
> > >                         //
> > >                         // - add(EventIO eventIO)
> > >                         // - this.buffer.add(eventIO)
> > >                         // + run()
> > >                         // + eventIO = this.buffer.poll()
> > >                         this.lockBuffer.lock();
> > >
> > >                         // remove an Object from buffer
> > >                         eventIO = this.buffer.poll();
> > >                     }
> > >                     finally {
> > >                         this.lockBuffer.unlock();
> > >                     }
> > >
> > >                     if (eventIO == null) {
> > >                         endLoop = true;
> > >                     }
> > >                     else {
> > >                         // prevent memory consistency errors whereby
> > > values written by current thread from IOHandlerThreadPool is not flushed,
> > > causing the next
> > >                         // run() reading stale values if it is a
> > > different thread from this current thread
> > >                         synchronized (this) {
> > >
> > >                             // delivery
> > >                             if (eventIO == EventIO.OPEN) {
> > >                                 Conn.this.handleOpen();
> > >                             }
> > >                             else if (eventIO == EventIO.READ) {
> > >                                 Conn.this.handleRead ();
> > >                             }
> > >                             else if ((eventIO == EventIO.IDLE_READ) ||
> > > (eventIO == EventIO.IDLE_WRITE)) {
> > >                                 Conn.this.handleIdle(eventIO);
> > >                             }
> > >                             else if (eventIO == EventIO.CLOSE) {
> > >                                 Conn.this.handleClose();
> > >                                 closed = true;
> > >                             }
> > >                             else if (eventIO == EventIO.CONNECT_FAIL)
> > > {
> > >                                 Conn.this.handleConnectFail();
> > >                                 closed = true;
> > >                             }
> > >                         }
> > >
> > >                         // update granularity
> > >                         granularity++;
> > >                         if (granularity == 10) {
> > >                             endLoop = true;
> > >                         }
> > >                     }
> > >                 }
> > >
> > >                 // if EventIO.CLOSE is reached, then permanently
> > > streamInProc stays true, ensuring no more EventIO will be delivered
> > >                 if (!closed) {
> > >                     try {
> > >                         // prevent case where Event exist in buffer,
> > > but not submitted to threadpool
> > >                         // - Thread_1, + IOHandlerThreadPool
> > >                         //
> > >                         // + run()
> > >                         // + if (this.buffer.size() != 0) test false
> > >                         // - add(EventIO eventIO)
> > >                         // - this.buffer.add(eventIO)
> > >                         // - if (!this.streamInProc) test false
> > >                         // + this.streamInProc = false
> > >                         // event in buffer but not submitted
> > >                         this.lockBuffer.lock();
> > >
> > >                         if (this.buffer.size() != 0) {
> > >                             Conn.this.ioHandlerThreadPool.submit
> > > (this);
> > >                         }
> > >                         else {
> > >                             this.streamInProc = false;
> > >                         }
> > >                     }
> > >                     finally {
> > >                         this.lockBuffer.unlock ();
> > >                     }
> > >                 }
> > >             }
> > >             catch (Throwable e) {
> > >                 // log FATAL, critical thread dies, node unable to
> > > function
> > >                 LoggerProxy.fatal(this.logger , "SYSTEM", "Died of
> > > unchecked exception, node terminating", e);
> > >                 // fatal, terminate the program
> > >                 System.exit(1);
> > >             }
> > >         }
> > >     }
> > >
> > > As you can see above, EventIO is deposited into the buffer, and a
> > > thread from threadpool is awakened to process the event, however it is only
> > > possible for only 1 thread at any instance to do the run() method by using
> > > the streamInProc boolean. Therefore there will only be 1 single thread
> > > processing Upstream in any single instance
> > >
> > >
> > > > 2. How is the instance of X accessed from two different Runnables
> > > >
> > > > Also, returning to the original issue, the use of the
> > > > ConcurrentHashMap is significant here as well, as it also introduces
> > > > happens-before relationships.
> > > >
> > > >
> > > The thing is that during event delivery, thread-1 from threadpool
> > > might be doing the run() method, and the next time round, thread-2 from
> > > threadpool might be doing the run() method also. ConcurrentHashMap is
> > > accessed during event delivery methods like Conn.this.handleOpen(),
> > > Conn.this.handleRead() etc. There is this time when CHM is accessed,
> > > it returns null, when in fact the element is put into the CHM already. It is
> > > a very rare occurrance. I have run rigourous tests on the code day and night
> > > and it never happens, there is only once when i am doing simple testing
> > > (under very insignifcant load) that it occurs. I am not very sure if it is
> > > fixed, but it seems that someone else has also encountered the same kind of
> > > bug http://osdir.com/ml/java.jsr.166-concurrency/2003-12/msg00018.html
> > >
> > >
> > > >  So I still need more detail on the code to commemnt further.
> > > >
> > > > Cheers,
> > > > David Holmes
> > > >
> > > >  -----Original Message-----
> > > > *From:* concurrency-interest-bounces at cs.oswego.edu [mailto:concurrency-interest-bounces at cs.oswego.edu
> > > > ]*On Behalf Of *BJ Low
> > > > *Sent:* Sunday, 6 January 2008 12:15 AM
> > > > *To:* Endre Stølsvik
> > > > *Cc:* concurrency-interest at cs.oswego.edu; dholmes at ieee.org
> > > > *Subject:* Re: [concurrency-interest] ConcurrentHashMap
> > > > NullPointerException
> > > >
> > > >  I think perhaps I never explain myself properly, Tim Peierlshowever did understood what i am trying to say.
> > > >
> > > > At time t1, Thread-X submits a Runnable to threadpool,
> > > > At time t2, Thread-1 from the threadpool runs the Runnable and call
> > > > changeVal(1);
> > > >
> > > > AFTER changeVal(1) from Thread-1 is completed, then at time t3 ....
> > > >
> > > > At time t3, Thread-X submits another Runnable to threadpool
> > > > At time t4, Thread-2 from the threadpool runs the Runnable and call
> > > > readVal();
> > > >
> > > > .... i missed out the red statement above. I think in above case,
> > > > the happens-before property is very important, cause if at t3, it is
> > > > Thread-1, but not Thread-X that submits the Runnable, then there will not be
> > > > any memory consistency errors.
> > > >
> > > > On Jan 5, 2008 7:52 PM, Endre Stølsvik <Online at stolsvik.com> wrote:
> > > >
> > > > > BJ Low wrote:
> > > > > > Hi,
> > > > > >
> > > > > > I am pretty sure of below. It is very hard for me to post the
> > > > > code cause
> > > > > > the code is proprietary, but perhaps let me ask this in another
> > > > > way
> > > > > >
> > > > > > Suppose I have a class below
> > > > > >
> > > > > > class X {
> > > > > >      int i = 0;
> > > > > >
> > > > > >      void changeVal(int i) {
> > > > > >         this.i = i;
> > > > > >      }
> > > > > >
> > > > > >      int readVal() {
> > > > > >         return this.i;
> > > > > >      }
> > > > > > }
> > > > > >
> > > > > > At time t1, Thread-X submits a Runnable to threadpool,
> > > > > > At time t2, Thread-1 from the threadpool runs the Runnable and
> > > > > call
> > > > > > changeVal(1);
> > > > > > At time t3, Thread-X submits another Runnable to threadpool
> > > > > > At time t4, Thread-2 from the threadpool runs the Runnable and
> > > > > call
> > > > > > readVal();
> > > > > >
> > > > > > The question is, will Thread-2 see the value of i as 0 or 1?
> > > > >
> > > > > "Thread-X" - is that supposed to be the same thread at the two
> > > > > times?
> > > > > Because I personally use "X" as "some".
> > > > >
> > > > > But lets assumes that (it doesn't really matter either - this is
> > > > > broken
> > > > > no matter!).
> > > > >
> > > > > There's no reason to argue complex memory situations and
> > > > > happens-before
> > > > > or anything like that here - this is plain as day: Thread X works
> > > > > on its
> > > > > own, and thus basically submits two jobs right after each other.
> > > > > For the
> > > > > sake of the argument: The first runnable, in Thread-1, takes 3
> > > > > hours to
> > > > > complete, finally doing changeVal(1). The second Runnable runs
> > > > > readVal()
> > > > > about .001 ms after submit. What is the answer?
> > > > >
> > > > > And the time labels "t[1-4]" is of no use what so ever when
> > > > > interleaved
> > > > > between threads that doesn't do explicit synchronization. Assuming
> > > > > again
> > > > > that Thread-X is the same in both instances, there is a "t, t+x"
> > > > > situation between the two submits (the first submit
> > > > > /happens-before/ the
> > > > > second submit, by way of code order).
> > > > >   However, one cannot argue anything about Thread-1 (other than
> > > > > that it
> > > > > happens AFTER the submit of itself, which is at "t"), and Thread-2
> > > > > (other than that is happens AFTER the submit of itself, which is
> > > > > at "t+x").
> > > > >   The order of _when_ those two runnables are run on their
> > > > > respective
> > > > > worker threads, is fully non-deterministic: both the work they do
> > > > > (the
> > > > > time it takes), the inner workings of the thread pool (e.g.
> > > > > whether a
> > > > > new Thread has to be made for one of the submissions, perhaps the
> > > > > first,
> > > > > not the second) - and finally, the operating system: it might
> > > > > decide
> > > > > based on some cosmic reasons, that Thread-1 shouldn't run, or is
> > > > > pre-empted before the changeVal() while a whole bunch of other
> > > > > threads
> > > > > gets time slices, while Thread-2 by the same unfair magic gets
> > > > > scheduled
> > > > > right away.
> > > > >
> > > > > These are really basic multi-threading issues, not having anything
> > > > > to do
> > > > > with complex code-reorderings or anything like that.
> > > > >
> > > > > What you want here, is synchronized code in class X, and some flag
> > > > > that
> > > > > denotes whether the job is done. The flag is set to true when
> > > > > "changeVal()" is run, and then the /this X/ is .notify()ed (within
> > > > >
> > > > > synchronized on /this X/, of course). The readVal() method checks
> > > > > (within synchronzied on /this X/, of course) whether the flag is
> > > > > true.
> > > > > If it isn't it goes into .wait() on /this X/, looping until flag
> > > > > is true
> > > > > (must loop due to "spurious wakeups"). Hey presto.. Also check out
> > > > > Callable vs. Future - it does just this.
> > > > >
> > > > > Endre.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Regards,
> > > > BJ Low
> > > >
> > > >
> > >
> > >
> > > --
> > > Regards,
> > > BJ Low
> > >
> > >
> >
> >
> > --
> > Regards,
> > BJ Low
> >
> >
>
>
> --
> Regards,
> BJ Low




-- 
Regards,
BJ Low
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20080107/98f177dd/attachment-0001.html 


More information about the Concurrency-interest mailing list