[concurrency-interest] ExecutorCompletionService is dangerous for critical app!?

yangjs yangjs at alibaba-inc.com
Fri Aug 11 15:05:12 EDT 2006


    This is my custom implementation of the CompletionService interface.
Can I use this impl for my work?

public class BlockingCompletionService<V> implements CompletionService<V> {
 private final Executor executor;

 private final BlockingQueue<Future<V>> completionQueue;

 private boolean isBlocking;

 /**
  * FutureTask extension to enqueue upon completion
  */
 private class QueueingFuture extends FutureTask<V> {
  QueueingFuture(Callable<V> c) {
   super(c);
  }

  QueueingFuture(Runnable t, V r) {
   super(t, r);
  }

  protected void done() {
   if (isBlocking) {
    try {
     completionQueue.put(this);
    } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     cancel(true);
    }
   } else {
    completionQueue.add(this);
   }
  }
 }

 public BlockingCompletionService(Executor executor) {
  if (executor == null)
   throw new NullPointerException();
  this.executor = executor;
  this.completionQueue = new LinkedBlockingQueue<Future<V>>();
 }

 public BlockingCompletionService(Executor executor,
   BlockingQueue<Future<V>> completionQueue, boolean isBlocking) {
  if (executor == null || completionQueue == null)
   throw new NullPointerException();
  this.executor = executor;
  this.completionQueue = completionQueue;
  this.isBlocking = isBlocking;
 }

 public Future<V> submit(Callable<V> task) {
  if (task == null)
   throw new NullPointerException();
  QueueingFuture f = new QueueingFuture(task);
  executor.execute(f);
  return f;
 }

 public Future<V> submit(Runnable task, V result) {
  if (task == null)
   throw new NullPointerException();
  QueueingFuture f = new QueueingFuture(task, result);
  executor.execute(f);
  return f;
 }

 public Future<V> take() throws InterruptedException {
  return completionQueue.take();
 }

 public Future<V> poll() {
  return completionQueue.poll();
 }

 public Future<V> poll(long timeout, TimeUnit unit)
   throws InterruptedException {
  return completionQueue.poll(timeout, unit);
 }

}


----- Original Message ----- 
From: "Doug Lea" <dl at cs.oswego.edu>
To: "yangjs" <yangjs at alibaba-inc.com>
Cc: <concurrency-interest at cs.oswego.edu>
Sent: Saturday, August 12, 2006 2:16 AM
Subject: Re: [concurrency-interest] ExecutorCompletionService is dangerous 
for critical app!?


> yangjs wrote:
>>
>>    I  use ExecutorCompletionService in my app ,that is a 
>> producer-consumer pattern. code like follow:
>>  ExecutorCompletionService jobQueue = new
>>     ExecutorCompletionService<Document>(
>>        getThreadExecutor(), getBlockingQueue(100));/
>>      when queue  full, completionQueue.add(this) will throw exception
>>     but swallowed by Future self.
>
> Sorry that the javadoc did not make clear that the supplied queue
> should be unbounded. This should/will be clarified. The rationale
> for not using BlockingQueue.put here is that there is no
> policy control in this simple class for dealing with either
> InterruptedExceptions that might occur with put, or the potential
> for saturation and lockup.
>
> Unless/until we get good suggestions for dealing with
> such things in a generic way, I'm afraid that in those cases
> where you have to deal with them, you'll need a custom
> implementation of the CompletionService interface.
>
> -Doug
> 


More information about the Concurrency-interest mailing list