[concurrency-interest] implementing a DB write-behind algorithm

Tim Peierls tim at peierls.net
Mon May 1 17:15:35 EDT 2006


Short answer: Yes.

Longer answer:
<shamelessPlug>
We introduce the @GuardedBy annotation in Java Concurrency in Practice.
@GuardedBy(lock) documents that a field or method should be accessed only
with the specific lock held. In particular, @GuardedBy("this") means that
all accesses must be done while holding the intrinsic lock of the containing
object. Documenting the synchronization policy of a class through such
annotations makes it easier to reason about its correctness and easier to
maintain the class without breaking it.

All non-volatile, non-final fields of a class that can be accessed by
multiple threads must be guarded in this way. In that code example, the
start, stop, and isRunning methods all access the consumer field. I would
have made the field volatile, but the first two methods are check-then-act
sequences that must be performed atomically with respect to each other, so
volatile isn't enough in this case.
</shamelessPlug>

Regarding your concern about polling from the database writer thread:

BlockingQueue is generally the appropriate interface to use when dealing
with producer-consumer designs, because it gives you the most flexibility in
dealing with overproduction (producers adding work faster than consumers can
process it) or underproduction (consumers processing work faster than
producers can supply them).  It's perfectly reasonable for a consumer thread
to loop calling take() and timed poll(). You normally don't want to spin in
an untimed poll() loop -- but that's what you'd be forced to do with a
ConcurrentLinkedQueue.

As Richie Jefts hinted, you should to be prepared to handle saturation from
the upstream (producer) end. You have two main choices: use a timed
queue.put() behind the scenes and throw an application-level checked
exception on timeout -- forcing clients to catch that checked exception --
or simply use an untimed queue.put() and document that WriteBehind.put can
block indefinitely by putting "throws InterruptedException" in the
signature, as in the sample code.

--tim



On 5/1/06, Alexandru Popescu <the.mindstorm.mailinglist at gmail.com> wrote:
>
> One more question about the code sample: what is the
> [code]@GuardedBy("this")[/code] annotation responsible for? Mark that
> any operations on that field must be synched?
>
> ./alex
> --
> .w( the_mindstorm )p.
>
>
> On 4/30/06, Tim Peierls <tim at peierls.net> wrote:
> > On 4/30/06, Alexandru Popescu
> > <the.mindstorm.mailinglist at gmail.com > wrote:
> >
> > > I firstly have to confess that when getting to concurrency related
> > > problems, I am getting confused quite quickly :-).
> >
> >
> > You're not alone! :-)
> >
> >
> > > Now, the current problem I am trying to solve is: I am trying to
> > > figure out how to implement a DB write-behind strategy. Multiple
> > > processes will post records to be written to the DB, but the actual
> > > writes should happen on a separate process. So, far I was thinking
> > > about 2 possible approaches:
> > > a) continous write-behind: multiple processes write to a queue which
> > > is continously polled by a separate process. When an element is found
> > > on the queue, than the write process removes it from queue and
> > > attempts to write it to the DB.
> > >
> > > To have this done, I was looking in the direction of
> > ConcurrentLinkedQueue.
> > >
> > > b) batched write-behind: multiple processes post to a size-bounded
> > > queue. When the max size is reached, the original queue is passed to
> > > the parallel write process and replaced with a new queue.
> > >
> > > To have this done, I was looking in the direction of
> > > LinkedBlockingQueue with an additional atomic operation of swapping
> > > the old queue with the new empty one.
> > >
> > > My question is: am I looking in the right direction or I am completely
> > > wrong. Any ideas and help are highly appreciated.
> > >
> >
> > The use of BlockingQueue.put makes it possible to implement strategies
> that
> > make the caller block, while still permitting strategies that don't
> block.
> > So I would avoid ConcurrentLinkedQueue here, because it does not
> implement
> > the BlockingQueue interface.
> >
> > You can use an unbounded LinkedBlockingQueue for continuous
> write-behind,
> > and ArrayBlockingQueue (always bounded) for batched write-behind.
> Instead of
> > swapping in a new queue, the consumer thread could just poll until the
> batch
> > size was reached (using a timeout to avoid the risk of batches never
> > completing), and then send the batch. The batch size need not be the
> same as
> > the queue capacity.
> >
> > Here's an uncompiled, untested fragment that illustrates the idea:
> >
> >  public interface WriteBehind<T> {
> >     void put(T record) throws InterruptedException;
> > }
> >
> > public interface RecordWriter<T> {
> >     void write(List<T> records) throws InterruptedException;
> >  }
> >
> > class AbstractWriteBehind<T> implements WriteBehind<T> {
> >     private final BlockingQueue<T> queue;
> >     private final RecordWriter<T> writer;
> >     @GuardedBy("this") private Future<Void> consumer = null;
> >
> >     protected AbstractWriteBehind(BlockingQueue<T> queue,
> > RecordWriter<T> writer) {
> >         this.queue = queue;
> >         this.writer = writer;
> >     }
> >
> >     class Consumer implements Callable<Void> {
> >         public Void call() throws InterruptedException {
> >             consume(queue, writer);
> >             return null;
> >         }
> >     }
> >
> >     public synchronized void start() {
> >         if (consumer == null) {
> >             ExecutorService exec =
> > Executors.newSingleThreadExecutor();
> >             try {
> >                 consumer = exec.submit(new Consumer());
> >             } finally {
> >                 exec.shutdown();
> >              }
> >         }
> >     }
> >
> >     public synchronized boolean isRunning() {
> >         return consumer != null;
> >     }
> >
> >     public synchronized void stop() {
> >         if (consumer != null) {
> >             consumer.cancel(true);
> >             consumer = null;
> >         }
> >     }
> >
> >     public final void put(T record) throws InterruptedException {
> >         queue.put(record);
> >     }
> >
> >     protected abstract void consume(BlockingQueue<T> queue,
> RecordWriter<T>
> > writer)
> >         throws InterruptedException;
> > }
> >
> > class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
> >     ContinousWriteBehind(RecordWriter<T> writer) {
> >         super(new LinkedBlockingQueue<T>(), writer);
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; )
> >             writer.write (Collections.singletonList(rec));
> >     }
> > }
> >
> > class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
> >     private final int maxBuf;
> >     private final List<T> buf;
> >     private final long time;
> >     private final TimeUnit unit;
> >
> >     BatchedWriteBehind(RecordWriter<T> writer, int
> > capacity, int maxBuf,
> >                        long time, TimeUnit unit) {
> >         super(new ArrayBlockingQueue<T>(capacity), writer);
> >         this.maxBuf = maxBuf;
> >         this.buf = new ArrayList<T>(maxBuf);
> >         this.time = time;
> >         this.unit = unit;
> >     }
> >
> >     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
> >             throws InterruptedException {
> >         for (T rec; (rec = q.take()) != null; ) {
> >             buf.add(rec);
> >             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> > null)
> >                  buf.add(rec);
> >             writer.write(buf);
> >             buf.clear();
> >         }
> >     }
> > }
> >
> >  --tim
> >
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060501/b0719e42/attachment-0001.html


More information about the Concurrency-interest mailing list