[concurrency-interest] Repeatedly completing futures?

Millies, Sebastian Sebastian.Millies at softwareag.com
Thu Sep 11 09:36:24 EDT 2014


thanks for the pointer, I do think that someday I should look at Akka, for example. Meanwhile, though, I am specifically interested in exploring the out-of-the-box capabilities of Java 8. So here’s what I have come up with: I have a ProducerConsumerService, in which a producer puts values into a blocking queue, and the consumer(s) remove them. The consumers in addition get passed a supplier of futures. Each time a value is taken from the queue, a new future is supplied and completed with that value, which then triggers the asynchronous processing. Does that sound reasonable?

Sketch of an example:

// start producer and consumers, the producer gets passed a generator function, the consumers get passed the futureSupplier
ProducerConsumerService(UnaryOperator<T> generator, T startValue, Supplier<CompletableFuture<T>> futureSupplier);

// consume values and use them to complete a future
public class Consumer<T> implements Runnable {
  private final BlockingQueue<T> queue; // shared queue
  private final Supplier<CompletableFuture<T>> futureSupplier;

  @Override
  public void run() {
    try {
      while (true) {
        T value = queue.take();
        futureSupplier.get().complete(value);  // <-- HERE
      }
    } catch (InterruptedException e) {
      // terminate
      Thread.interrupted();
    }
  }
}

// USAGE example:

// return a new future that when completed will trigger some asynchronous processing steps
private CompletableFuture<Long> createProcessor() {
  CompletableFuture<Long> future = new CompletableFuture<>();
  future.thenApplyAsync(this::doSth)
        .thenApplyAsync(this::doMore);
  return future;
}


UnaryOperator<Long> generator = x -> x + 1;
Supplier<CompletableFuture<Long>> processor = this::createProcessor;
ProducerConsumerService<Long> pcs = new ProducerConsumerService<>(generator, 1L, processor);
pcs.start();

The point for me, I guess, is that I can define the asynchronous processing necessary for each incoming event outside the infrastructure that actually deals with producing and receiving these events.


n  Sebastian



From: Benjamin Sieffert [mailto:benjamin.sieffert at metrigo.de]
Sent: Thursday, September 11, 2014 10:29 AM
To: Millies, Sebastian
Cc: concurrency-interest at cs.oswego.edu
Subject: Re: [concurrency-interest] Repeatedly completing futures?

Hello there,

I think the guarantees concerning the number of times a Future will be completed/fail is very important, because it goes a long way to prevent multithreading mistakes when using them. E.g. if Future didn't come with this kind of guarantee and one wanted to trigger a callback logic only once, a naive implementation would be "if(! future.isDone()) { future.complete(..) }" which is obviously wrong.
The mechanism you are looking for, though, seems to be the kind of continuous pushing/polling mechanism that is commonly implemented by using queues. That is, your call to .complete() is rather like a .put() and your call to .get() is rather like a .take().
In regards to the functions you'd like to apply to the future's result along the way, then, I think there's a lot of frameworks out there that let you build these kinds of pipelines. You might especially want to look into actor oriented programming.
Regards
Benjamin


Software AG – Sitz/Registered office: Uhlandstraße 12, 64297 Darmstadt, Germany – Registergericht/Commercial register: Darmstadt HRB 1562 - Vorstand/Management Board: Karl-Heinz Streibich (Vorsitzender/Chairman), Dr. Wolfram Jost, Arnd Zinnhardt; - Aufsichtsratsvorsitzender/Chairman of the Supervisory Board: Dr. Andreas Bereczky - http://www.softwareag.com

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20140911/b6891bca/attachment.html>


More information about the Concurrency-interest mailing list