[concurrency-interest] Adding interfaces Sink and Source?

Tim Peierls tim@peierls.net
Sat, 02 Oct 2004 21:38:10 -0400


Doug Lea wrote:
> The way we left it, it is still possible for individual developers
> to unify usages themselves by adding a bit of glue. For example:
> 
> interface Puttable<E> { void put(E x) throws Exception; }
> class MyLinkedQueue<E> extends LinkedQueue<E> implements Puttable<E> {
>   ...
> }

Expanding on this a bit, it might be convenient to use wrapper factories
that convert BlockingQueues to custom Puttable and Takable interfaces.

class QUtil {
   static Puttable<E> asPuttable(final BlockingQueue<E> q) {
     return new Puttable<E>() {
       public void put(E e) throws InterruptedException { q.put(e); }
     }
   }
   static Takable<E> asTakable(final BlockingQueue<E> q) {
     return new Takable<E>() {
       public E take() throws InterruptedException { return q.take(); }
     }
   }

But if the only reason you miss Puttable and Takable is the way that they
prevented consumers from accidentally or maliciously accessing producer
methods and vice versa, then a simple way to achieve the same effect without
introducing new interfaces is via a factory that creates wrapped blocking
queues that throw UOE on all but a subset of the queue methods:

class QUtil {
   static BlockingQueue<E> putOnlyBlockingQueue(final BlockingQueue<E> q) {
     return new BlockingQueue<E>() {
       public void put(E e) throws InterruptedException { q.put(e); }
       // similarly for all "put"-like methods that you want to support

       public E take() { throw new UnsupportedOperationException(); }
       // similarly for all "take"-like methods and anything else you
       // don't want to support
     };
   }

   static BlockingQueue<E> takeOnlyBlockingQueue(final BlockingQueue<E> q) {
     return new BlockingQueue<E>() {
       public E take() throws InterruptedException { return q.take(); }
       // similarly for all "take"-like methods that you want to support

       public void put(E e) { throw new UnsupportedOperationException(); }
       // similarly for all "put"-like methods and anything else you
       // don't want to support
     };
   }
}

Use this utility to wrap a queue before handing it over to producers and
consumers. For example,

   interface Producer {
     public void produceTo(BlockingQueue<Foo> putOnlyQueue);
   }

   interface Consumer {
     public void consumeFrom(BlockingQueue<Foo> takeOnlyQueue);
   }

   void start(Collection<Producer> producers,
              Collection<Consumer> consumers,
              BlockingQueue<Foo> fooq) {

     for (Producer producer : producers)
       producer.produceTo(QUtil.putOnlyBlockingQueue(fooq));

     for (Consumer consumer : consumers)
       consumer.consumeFrom(QUtil.takeOnlyBlockingQueue(fooq));
   }

Now malicious or broken producers and consumers can't touch the "wrong"
end of the queue. This approach trades away compile time type information
in favor of a smaller set of interfaces and run time checking, roughly
analogous to the design tradeoff whose resolution in the Collections
framework resulted in the Collections.unmodifiableXXX static methods.

--tim