[concurrency-interest] Creating a Pipeline Using Executor Framework

Thejo Kote thejo at kote.in
Mon Jun 15 15:13:48 EDT 2009


The Cassandra Project (from Facebook, now in incubation at Apache) has a
nice implementation of SEDA (http://www.eecs.harvard.edu/~mdw/proj/seda/)
which may give you ideas about implementing such a system.

Thejo

On Mon, Jun 15, 2009 at 11:08 PM, Stephen Duncan Jr <
stephen.duncan at gmail.com> wrote:

> I have an existing design & implementation of a sort of pipeline of tasks
> that are run for each of a series of items.  Following my understanding of
> advice in JCiP, I used the Executor and related classes as the
> consumer-producer framework for this pipeline.  During the implementation I
> made some decisions that, while they worked for this application, didn't
> seem to be going with the grain of the framework.  Since I have several
> other similar pipelines to be implemented, I was hoping to get some
> feedback/critiquing/suggestions-for-improvement on the design.
>
> I'll try to provide enough detail to be useful without getting in the way,
> and would be happy to provide more if necessary.  The primary goal is a
> flexible design that scales dynamically where possible, and is easily
> configurable to match scaling needs where necessary.  Basically, I want to
> use this design repeatedly and be fairly confident that it will be unlikely
> that the concurrency mechanism is a bottleneck (and so far that's certainly
> true, the actual processing has always been the bottleneck).
>
> In this existing procedure, the pipeline of tasks mirrors a work flow of an
> external web-service to call.  Each task is essentially broken up by an I/O
> boundary such as a call to a web-service or to a database: AddToQueue,
> CheckStatus, GetResults, StoreResults (in future pipelines, the more typical
> pipeline will be: check out records, process records, store results).
> CheckStatus may be run multiple times for an item until success.
>
> So, the flow of an item through the pipeline is:
> AddToQueue->CheckStatus->...->CheckStatus->GetResults->StoreResults
>
> The implementation has a ThreadPoolExecutor for each of these pipeline
> stages.  The CheckStatus one is a ScheduledThreadPoolExecutor so that if
> CheckStatus comes back with "queued", then it schedules another check after
> some delay.  General the executor framework matched up well; scheduling,
> using a bounded queue with a CallerRunsPolicy, and the shutdown mechanism.
> However, several design questions came up, and my chosen solution didn't
> always match up to the features I expected.
>
> The first question was how to get the item from stage to stage in the
> pipeline.  In other words, how does each task know about the executor that
> it should submit to upon completion of its task.  The path I chose was to
> have an ExecutionContext object that had getters for each executor in the
> pipeline, and a reference to the executionContext was passed to the
> constructor of each task.  So the pseudo-code for a tasks run method would
> be:
>
> public void doTask() {
>     doProcess();
>     this.executionContext.getNextTaskExecutor().submit(new NextTask(data,
> this.executionContext);
> }
>
> This approach seemed to work well, but I'm not sure if there's other design
> alternatives that I didn't think of.
>
> The second question was how to know when everything has either made it
> through the pipeline or failed along the way.  This is the part I'm most
> concerned with.  My design makes no use of Future objects, nor an
> ExecutorCompletionService.  I couldn't figure out a way to use these
> effectively.  Instead, as part of the ExecutionContext, I added a
> BlockingQueue of finished items, so that upon either a failure along the way
> or completion of the last task the item is added to the queue.  Then the
> main queue, knowing how many items it submitted, loops and waits to take
> each item off this queue:
>
>         for (int i = 0; i < items.size(); i++)
>         {
>             try
>             {
>                 this.executionContext.getFinishedItems().take();
>             }
>             catch (final InterruptedException e)
>             {
>                 Thread.currentThread().interrupt();
>                 throw new RuntimeException(
>                     "Interrupted while waiting for items to finish.");
>             }
>         }
>
> One early alternative I had was to have the main method try to shut down
> and then wait for completion of each pipeline stage in succession.  However,
> the fact that the CheckStatus stage submits tasks back to itself made it
> impossible to safely identify when that stage was completely done.
>
> Also, the fact that failure can happen at any stage along the way, and you
> don't want to submit a task to stage 2, then 3, then 4 if it fails at stage
> 1, prevented me from just having a completion service on the last stage.
>
> So, are there alternate approaches out there that I should look at?  I'm
> open to suggestions from small changes to whole new ways of looking at the
> problem.  As I said, if you need more detail to be able to answer, just let
> me know, and I'd be happy to provide it.
>
> Thanks in advance for your help.
>
> --
> Stephen Duncan Jr
> www.stephenduncanjr.com
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20090616/118db500/attachment.html>


More information about the Concurrency-interest mailing list