[concurrency-interest]Queues that support destroy

Constantine Sapuntzakis csapuntz@stanford.edu
Thu, 29 Aug 2002 00:40:23 -0700


>
>
>
>You want to deal with threads that are blocked waiting when the queue
>is closed. Okay so now there are two policies that need to be
>specified:
>
>- what to do with items in the queue when it is closed/destroyed
>
You're right. I hadn't considered that there were two policies here.

>
>- what to do with threads blocked waiting when it is closed/destroyed
>
What applications would require threads to continue to block?  I don't 
see any use for an implementation that doesn't deal with blocked threads.

I have included diffs to the BoundedBuffer class in util.concurrent that 
implements
both policies. It changes emptySlots <0 to indicate that puts are not 
allowed and
usedSlots < 0 to indicate that takes are not allowed.

-Costa

--- concurrent/BoundedBuffer.java    Sat May 15 16:35:46 1999
+++ 
projects/collective2/src/edu/stanford/suif/util/BoundedBuffer.java    
Thu Aug 29 00:37:21 2002
@@ -105,13 +108,18 @@
     if (Thread.interrupted()) throw new InterruptedException();
 
     synchronized(putMonitor_) {
-      while (emptySlots_ <= 0) {
+      while (emptySlots_ == 0) {
     try { putMonitor_.wait(); }
         catch (InterruptedException ex) {
           putMonitor_.notify();
           throw ex;
         }
       }
+      if (emptySlots_ < 0) {
+      // Probably should throw exception to indicate put didn't
+      // succeed
+      return;
+      }
       insert(x);
     }
     incUsedSlots();
@@ -124,7 +132,7 @@
     synchronized(putMonitor_) {
       long start = (msecs <= 0)? 0 : System.currentTimeMillis();
       long waitTime = msecs;
-      while (emptySlots_ <= 0) {
+      while (emptySlots_ == 0) {
         if (waitTime <= 0) return false;
     try { putMonitor_.wait(waitTime); }
         catch (InterruptedException ex) {
@@ -133,6 +141,9 @@
         }
         waitTime = msecs - (System.currentTimeMillis() - start);
       }
+      if (emptySlots_ < 0) {
+      return false;
+      }
       insert(x);
     }
     incUsedSlots();
@@ -145,13 +156,16 @@
     if (Thread.interrupted()) throw new InterruptedException();
     Object old = null;
     synchronized(this) {
-      while (usedSlots_ <= 0) {
+      while (usedSlots_ == 0) {
         try { wait(); }
         catch (InterruptedException ex) {
           notify();
           throw ex;
         }
       }
+      return null;
+      }
       old = extract();
     }
     incEmptySlots();
@@ -165,7 +179,7 @@
       long start = (msecs <= 0)? 0 : System.currentTimeMillis();
       long waitTime = msecs;
      
-      while (usedSlots_ <= 0) {
+      while (usedSlots_ == 0) {
         if (waitTime <= 0) return null;
         try { wait(waitTime); }
         catch (InterruptedException ex) {
@@ -175,12 +189,35 @@
         waitTime = msecs - (System.currentTimeMillis() - start);
 
       }
+      if (usedSlots_ < 0) {
+      return null;
+      }
       old = extract();
     }
     incEmptySlots();
     return old;
   }
 
+  public void destroy() {
+      synchronized (putMonitor_) {
+      // Make sure it stays negative
+      emptySlots_ = Integer.MIN_VALUE;
+      putMonitor_.notify();
+      }
+
+      synchronized (this) {
+      usedSlots_ = Integer.MIN_VALUE;
+      notify();
+      }
+  }
+
+  public void destroyButAllowDraining() {
+      synchronized (putMonitor_) {
+      // Make sure it stays negative
+      emptySlots_ = Integer.MIN_VALUE;
+      putMonitor_.notify();
+      }
+  }
 }