[concurrency-interest] Composing multiple tasks into a single Future/FutureTask

Joe Bowbeer joe.bowbeer at gmail.com
Wed Apr 18 19:27:03 EDT 2012


Below is a sketch using a periodic TimerTask to do the fsync() and set the
results.  Cancellation is implemented by checking whether future.isDone()
at opportune times.

public class Put {

    final Executor worker = Executors.newSingleThreadExecutor();

    final BlockingQueue<Written> queue = new LinkedBlockingQueue<>();

    final Timer fsyncTimer = new Timer();

    public interface Result { }

    public Future<Result> put(final byte[] key, final byte[] value) {
        final SettableFuture<Result> future = new SettableFuture<>();
        final long start = System.currentTimeMillis();
        worker.execute(new Runnable() {

            public void run() {
                try {
                    if (future.isDone()) return;
                    byte[] data = compress(key, value);
                    if (future.isDone()) return;
                    write(data);
                    if (future.isDone()) return;
                    queue.put(new Written(start, value.length, data.length,
future));
                } catch (InterruptedException ex) {
                    throw new IllegalStateException(ex); // TODO
                } catch (Throwable t) {
                    future.setException(t);
                }
            }
        });
        return future;
    }

    public void start() {
        TimerTask fsyncTask = new TimerTask() {

            public void run() {
                List<Written> list = new ArrayList<>();
                if (queue.drainTo(list) > 0) {
                    fsync();
                    long now = System.currentTimeMillis();
                    for (Written w : list) {
                        w.future.set(makeResult(now - w.start, w.length,
w.compressed));
                    }
                }
            }
        };

        fsyncTimer.schedule(fsyncTask, 5000, 5000);
    }

    public void stop() {
        fsyncTimer.cancel();
        // TODO: drain queue?
    }


    static class Written {
        final long start;
        final long length;
        final long compressed;
        final SettableFuture<Result> future;
        /* ... */
    }

    /* ... */
}


On Wed, Apr 18, 2012 at 9:57 AM, Ariel Weisberg wrote:

>   Hi,
>
>
>  Any others who come after definitely need to learn a few things, that's
> where documentation is nice.
>  A programmer who is not willing to learn is dangerous.
>
>  I agree, but I have to balance the overhead of making other people learn
> a new tool with the value I am going to extract from the tool. Going all in
> on Akka means that I can't use the java.util and Guava's concurrency
> functionality which people are already familiar with and is already used in
> several places.
>
>  After doing some quick cross checking it looks like Scala's
> Future/Promise is roughly equivalent to Java's Future/FutureTask which are
> extended by Guava to add the ListenableFuture functionality.
> ListenableFutures effectively extend the FutureTasks protected done method
> so that it invokes a set of listeners and has some plumbing for maintaining
> the set of listeners.
>
>
>  Can you illustrate where the ListenableFuture is more flexible?
>
>
>  Without actually validating with a compiler and without factoring out
> functions to eliminate the nesting I would do http://pastebin.com/eC5XfeW4
> .
>
>  There are two key difference I see between andThen and ListenableFuture.
> ListenableFuture can accept multiple listeners and doesn't have to be run
> on completion of the Future on which andThen would be invoked.
> ListenableFuture also allows you to explicitly specify what thread will run
> the handler which may allow you to skip some locking if you have a thread
> with exclusive access to state the handler needs.
>
>  However this doesn't handle cancellation. I might be okay with that and
> have cancel always return false.
>
>  Thanks,
>  Ariel
>
>  On Wed, Apr 18, 2012, at 05:12 PM, √iktor Ҡlang wrote:
>
>
>
>  2012/4/18 Ariel Weisberg
>
>    Hi,
>
>  It looks like Akka Futures don't implement the regular Future interface?
> My concern is that I would have to go all in on Akka for my
> Future/ExecutorService needs. I don't mind it, but it means that others who
> come after will have to learn to grok Akka and the javadocs and
> implementation are in Scala.
>
>
>  Implementing java.util.concurrent.Future would be of little value as it
> does not support non-blocking composition.
>
>  Any others who come after definitely need to learn a few things, that's
> where documentation is nice.
>  A programmer who is not willing to learn is dangerous.
>
>
>
>  The first issue which is that it isn't a causal relationship still
> exists. I can't invoke the fsync task once for each put.
>
>    The equivalent of and then with Guava is ListenableFuture which is
> more flexible in that you can add multiple dependencies and it separates
> the concern of when/how the dependencies will be executed.
>
>
>  Can you illustrate where the ListenableFuture is more flexible?
>
>
>
>  I think that Joe's suggestion of exposing the set methods would work. I
> can add listeners to the fsync task that invoke set on the FutureTask
> returned by the put API. Guava has a SettableFuture for this.
>
>
>  Promise is what you're looking for here.
>
>  Cheers,
>>
>
>
>  Thanks,
>  Ariel
>
>  On Wed, Apr 18, 2012, at 08:03 AM, √iktor Ҡlang wrote:
>
>
>
>  2012/4/18 Ariel Weisberg
>
>    Hi,
>
>  I am using Future and ListeningFuture interchangeably.
>
>  There isn't a causal relationship. The fsync task exists at the time the
> overall put Future is created, but it isn't run until some time later.
>
>  The thread running the fsync task has to cause all the put Futures
> depending on it to enter the completed state and then all the listeners
> have to be notified.
>
>  It seems like I want some combination of Guava's ListenableFutureTask and
> maybe extending ForwardingListenableFuture.
>
>  If the fsync action were a ListenableFutureTask I could forward the the
> listener registrations to it. However part of the return value is specific
> to each put (latency, compression ratio etc.). Also cancel and related
> wouldn't go to the fsync Future either, it would go to the write Future. If
> I bind the Future for the fsync task and the Future for the write task and
> then extend ForwardingFuture I could point each method to the correct
> place. That is probably the "simplest" way to do it, but I wanted to avoid
> extending a Future implementation and adding custom behavior.
>
>  I think it is mostly co-incidence that I happen to be able to map the
> various methods of ListenableFuture usefully to the two other
> ListenableFutures (fsync tasks, write task) and it doesn't seem like a
> great design pattern.
>
>  Akka is definitely something that interests me, especially the
> distributed aspects, but it is a  lot to take on to solve this problem.
>
>
>  The core-jar (akka-actor.jar) is completely standalone and has nothing to
> do with distributed aspects. Just use the Future-implementation.
>
>  Cheers,
>>
>
>
>  Thanks,
>  Ariel
>
>  On Wed, Apr 18, 2012, at 12:44 AM, √iktor Ҡlang wrote:
>
> Hi Ariel,
>
>  Akka's got composable Futures:
> http://doc.akka.io/docs/akka/2.0/java/futures.html
>
>  Essentiall, what you're looking for is the "andThen" operation:
>
>  (Scala)
>
>  val resultingFuture = Future { compressAndChecksum(payload) } andThen { _
> => fsync() }
>
>  Cheers,
>>
>  On Tue, Apr 17, 2012 at 5:39 PM, Ariel Weisberg wrote:
>
> Hi all,
>
> I have been looking for the right way to build an API that returns a
> Future that is composed of tasks that will be performed in multiple
> stages, potentially from more than one thread.
>
> My use cases is a disk based persistence store. The initial put API
> call creates a task that compresses and checksums the payload. The
> resulting Future is bound in a Runnable and submitted to a single
> thread ExecutorService that timestamps the entry and appends it to a
> file and it is the Future of the second task that is currently returned
> by the API.
>
>  The trick is that the Future that is returned by the put call should
>  not complete until the write is durable. Writes are made durable by
>  periodically invoking fsync from a dedicated thread or by rotating
>  write threads as each blocks on fsync/force. This allows for group
>  commit at an interval the IO device can handle without killing
>  throughput.
>
> Ideally I could create a FutureTask that is composed of several other
> Futures, some of which will not exist when the FutureTask is initially
> created and returned by the API. I do know how many dependencies there
> will be
> expected up front.
>
> If I had such a thing I would create a task to do the fsync and add it
> as the last dependency to every Future read by the put API call.
>
> This would be a lot easier if I were willing to have many threads
> standing around blocking on Futures, but that rubs me the wrong way. It
> isn't fun to jstack and see all the noise.
>
> I suspect that there is something in Guava
> com.google.common.util.concurrent.Futures to handle this case, but it
> isn't jumping out at me.
>
> Thanks,
> Ariel Weisberg
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20120418/96385780/attachment-0001.html>


More information about the Concurrency-interest mailing list