[concurrency-interest] Creating a Pipeline Using Executor Framework

Stephen Duncan Jr stephen.duncan at gmail.com
Mon Jun 15 13:38:29 EDT 2009


I have an existing design & implementation of a sort of pipeline of tasks
that are run for each of a series of items.  Following my understanding of
advice in JCiP, I used the Executor and related classes as the
consumer-producer framework for this pipeline.  During the implementation I
made some decisions that, while they worked for this application, didn't
seem to be going with the grain of the framework.  Since I have several
other similar pipelines to be implemented, I was hoping to get some
feedback/critiquing/suggestions-for-improvement on the design.

I'll try to provide enough detail to be useful without getting in the way,
and would be happy to provide more if necessary.  The primary goal is a
flexible design that scales dynamically where possible, and is easily
configurable to match scaling needs where necessary.  Basically, I want to
use this design repeatedly and be fairly confident that it will be unlikely
that the concurrency mechanism is a bottleneck (and so far that's certainly
true, the actual processing has always been the bottleneck).

In this existing procedure, the pipeline of tasks mirrors a work flow of an
external web-service to call.  Each task is essentially broken up by an I/O
boundary such as a call to a web-service or to a database: AddToQueue,
CheckStatus, GetResults, StoreResults (in future pipelines, the more typical
pipeline will be: check out records, process records, store results).
CheckStatus may be run multiple times for an item until success.

So, the flow of an item through the pipeline is:
AddToQueue->CheckStatus->...->CheckStatus->GetResults->StoreResults

The implementation has a ThreadPoolExecutor for each of these pipeline
stages.  The CheckStatus one is a ScheduledThreadPoolExecutor so that if
CheckStatus comes back with "queued", then it schedules another check after
some delay.  General the executor framework matched up well; scheduling,
using a bounded queue with a CallerRunsPolicy, and the shutdown mechanism.
However, several design questions came up, and my chosen solution didn't
always match up to the features I expected.

The first question was how to get the item from stage to stage in the
pipeline.  In other words, how does each task know about the executor that
it should submit to upon completion of its task.  The path I chose was to
have an ExecutionContext object that had getters for each executor in the
pipeline, and a reference to the executionContext was passed to the
constructor of each task.  So the pseudo-code for a tasks run method would
be:

public void doTask() {
    doProcess();
    this.executionContext.getNextTaskExecutor().submit(new NextTask(data,
this.executionContext);
}

This approach seemed to work well, but I'm not sure if there's other design
alternatives that I didn't think of.

The second question was how to know when everything has either made it
through the pipeline or failed along the way.  This is the part I'm most
concerned with.  My design makes no use of Future objects, nor an
ExecutorCompletionService.  I couldn't figure out a way to use these
effectively.  Instead, as part of the ExecutionContext, I added a
BlockingQueue of finished items, so that upon either a failure along the way
or completion of the last task the item is added to the queue.  Then the
main queue, knowing how many items it submitted, loops and waits to take
each item off this queue:

        for (int i = 0; i < items.size(); i++)
        {
            try
            {
                this.executionContext.getFinishedItems().take();
            }
            catch (final InterruptedException e)
            {
                Thread.currentThread().interrupt();
                throw new RuntimeException(
                    "Interrupted while waiting for items to finish.");
            }
        }

One early alternative I had was to have the main method try to shut down and
then wait for completion of each pipeline stage in succession.  However, the
fact that the CheckStatus stage submits tasks back to itself made it
impossible to safely identify when that stage was completely done.

Also, the fact that failure can happen at any stage along the way, and you
don't want to submit a task to stage 2, then 3, then 4 if it fails at stage
1, prevented me from just having a completion service on the last stage.

So, are there alternate approaches out there that I should look at?  I'm
open to suggestions from small changes to whole new ways of looking at the
problem.  As I said, if you need more detail to be able to answer, just let
me know, and I'd be happy to provide it.

Thanks in advance for your help.

-- 
Stephen Duncan Jr
www.stephenduncanjr.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20090615/48374945/attachment-0001.html>


More information about the Concurrency-interest mailing list