[concurrency-interest]Queues that support destroy
Constantine Sapuntzakis
csapuntz@stanford.edu
Thu, 29 Aug 2002 00:40:23 -0700
>
>
>
>You want to deal with threads that are blocked waiting when the queue
>is closed. Okay so now there are two policies that need to be
>specified:
>
>- what to do with items in the queue when it is closed/destroyed
>
You're right. I hadn't considered that there were two policies here.
>
>- what to do with threads blocked waiting when it is closed/destroyed
>
What applications would require threads to continue to block? I don't
see any use for an implementation that doesn't deal with blocked threads.
I have included diffs to the BoundedBuffer class in util.concurrent that
implements
both policies. It changes emptySlots <0 to indicate that puts are not
allowed and
usedSlots < 0 to indicate that takes are not allowed.
-Costa
--- concurrent/BoundedBuffer.java Sat May 15 16:35:46 1999
+++
projects/collective2/src/edu/stanford/suif/util/BoundedBuffer.java
Thu Aug 29 00:37:21 2002
@@ -105,13 +108,18 @@
if (Thread.interrupted()) throw new InterruptedException();
synchronized(putMonitor_) {
- while (emptySlots_ <= 0) {
+ while (emptySlots_ == 0) {
try { putMonitor_.wait(); }
catch (InterruptedException ex) {
putMonitor_.notify();
throw ex;
}
}
+ if (emptySlots_ < 0) {
+ // Probably should throw exception to indicate put didn't
+ // succeed
+ return;
+ }
insert(x);
}
incUsedSlots();
@@ -124,7 +132,7 @@
synchronized(putMonitor_) {
long start = (msecs <= 0)? 0 : System.currentTimeMillis();
long waitTime = msecs;
- while (emptySlots_ <= 0) {
+ while (emptySlots_ == 0) {
if (waitTime <= 0) return false;
try { putMonitor_.wait(waitTime); }
catch (InterruptedException ex) {
@@ -133,6 +141,9 @@
}
waitTime = msecs - (System.currentTimeMillis() - start);
}
+ if (emptySlots_ < 0) {
+ return false;
+ }
insert(x);
}
incUsedSlots();
@@ -145,13 +156,16 @@
if (Thread.interrupted()) throw new InterruptedException();
Object old = null;
synchronized(this) {
- while (usedSlots_ <= 0) {
+ while (usedSlots_ == 0) {
try { wait(); }
catch (InterruptedException ex) {
notify();
throw ex;
}
}
+ return null;
+ }
old = extract();
}
incEmptySlots();
@@ -165,7 +179,7 @@
long start = (msecs <= 0)? 0 : System.currentTimeMillis();
long waitTime = msecs;
- while (usedSlots_ <= 0) {
+ while (usedSlots_ == 0) {
if (waitTime <= 0) return null;
try { wait(waitTime); }
catch (InterruptedException ex) {
@@ -175,12 +189,35 @@
waitTime = msecs - (System.currentTimeMillis() - start);
}
+ if (usedSlots_ < 0) {
+ return null;
+ }
old = extract();
}
incEmptySlots();
return old;
}
+ public void destroy() {
+ synchronized (putMonitor_) {
+ // Make sure it stays negative
+ emptySlots_ = Integer.MIN_VALUE;
+ putMonitor_.notify();
+ }
+
+ synchronized (this) {
+ usedSlots_ = Integer.MIN_VALUE;
+ notify();
+ }
+ }
+
+ public void destroyButAllowDraining() {
+ synchronized (putMonitor_) {
+ // Make sure it stays negative
+ emptySlots_ = Integer.MIN_VALUE;
+ putMonitor_.notify();
+ }
+ }
}