[concurrency-interest] implementing a DB write-behind algorithm

Alexandru Popescu the.mindstorm.mailinglist at gmail.com
Mon May 1 13:28:38 EDT 2006


One more question about the code sample: what is the
[code]@GuardedBy("this")[/code] annotation responsible for? Mark that
any operations on that field must be synched?

./alex
--
.w( the_mindstorm )p.


On 4/30/06, Tim Peierls <tim at peierls.net> wrote:
> On 4/30/06, Alexandru Popescu
> <the.mindstorm.mailinglist at gmail.com > wrote:
>
> > I firstly have to confess that when getting to concurrency related
> > problems, I am getting confused quite quickly :-).
>
>
> You're not alone! :-)
>
>
> > Now, the current problem I am trying to solve is: I am trying to
> > figure out how to implement a DB write-behind strategy. Multiple
> > processes will post records to be written to the DB, but the actual
> > writes should happen on a separate process. So, far I was thinking
> > about 2 possible approaches:
> > a) continous write-behind: multiple processes write to a queue which
> > is continously polled by a separate process. When an element is found
> > on the queue, than the write process removes it from queue and
> > attempts to write it to the DB.
> >
> > To have this done, I was looking in the direction of
> ConcurrentLinkedQueue.
> >
> > b) batched write-behind: multiple processes post to a size-bounded
> > queue. When the max size is reached, the original queue is passed to
> > the parallel write process and replaced with a new queue.
> >
> > To have this done, I was looking in the direction of
> > LinkedBlockingQueue with an additional atomic operation of swapping
> > the old queue with the new empty one.
> >
> > My question is: am I looking in the right direction or I am completely
> > wrong. Any ideas and help are highly appreciated.
> >
>
> The use of BlockingQueue.put makes it possible to implement strategies that
> make the caller block, while still permitting strategies that don't block.
> So I would avoid ConcurrentLinkedQueue here, because it does not implement
> the BlockingQueue interface.
>
> You can use an unbounded LinkedBlockingQueue for continuous write-behind,
> and ArrayBlockingQueue (always bounded) for batched write-behind. Instead of
> swapping in a new queue, the consumer thread could just poll until the batch
> size was reached (using a timeout to avoid the risk of batches never
> completing), and then send the batch. The batch size need not be the same as
> the queue capacity.
>
> Here's an uncompiled, untested fragment that illustrates the idea:
>
>  public interface WriteBehind<T> {
>     void put(T record) throws InterruptedException;
> }
>
> public interface RecordWriter<T> {
>     void write(List<T> records) throws InterruptedException;
>  }
>
> class AbstractWriteBehind<T> implements WriteBehind<T> {
>     private final BlockingQueue<T> queue;
>     private final RecordWriter<T> writer;
>     @GuardedBy("this") private Future<Void> consumer = null;
>
>     protected AbstractWriteBehind(BlockingQueue<T> queue,
> RecordWriter<T> writer) {
>         this.queue = queue;
>         this.writer = writer;
>     }
>
>     class Consumer implements Callable<Void> {
>         public Void call() throws InterruptedException {
>             consume(queue, writer);
>             return null;
>         }
>     }
>
>     public synchronized void start() {
>         if (consumer == null) {
>             ExecutorService exec =
> Executors.newSingleThreadExecutor();
>             try {
>                 consumer = exec.submit(new Consumer());
>             } finally {
>                 exec.shutdown();
>              }
>         }
>     }
>
>     public synchronized boolean isRunning() {
>         return consumer != null;
>     }
>
>     public synchronized void stop() {
>         if (consumer != null) {
>             consumer.cancel(true);
>             consumer = null;
>         }
>     }
>
>     public final void put(T record) throws InterruptedException {
>         queue.put(record);
>     }
>
>     protected abstract void consume(BlockingQueue<T> queue, RecordWriter<T>
> writer)
>         throws InterruptedException;
> }
>
> class ContinuousWriteBehind<T> extends AbstractWriteBehind<T> {
>     ContinousWriteBehind(RecordWriter<T> writer) {
>         super(new LinkedBlockingQueue<T>(), writer);
>     }
>
>     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
>             throws InterruptedException {
>         for (T rec; (rec = q.take()) != null; )
>             writer.write (Collections.singletonList(rec));
>     }
> }
>
> class BatchedWriteBehind<T> extends AbstractWriteBehind<T> {
>     private final int maxBuf;
>     private final List<T> buf;
>     private final long time;
>     private final TimeUnit unit;
>
>     BatchedWriteBehind(RecordWriter<T> writer, int
> capacity, int maxBuf,
>                        long time, TimeUnit unit) {
>         super(new ArrayBlockingQueue<T>(capacity), writer);
>         this.maxBuf = maxBuf;
>         this.buf = new ArrayList<T>(maxBuf);
>         this.time = time;
>         this.unit = unit;
>     }
>
>     protected void consume(BlockingQueue<T> q, RecordWriter<T> writer)
>             throws InterruptedException {
>         for (T rec; (rec = q.take()) != null; ) {
>             buf.add(rec);
>             while (buf.size() < maxBuf && (rec = q.poll(time, unit)) !=
> null)
>                  buf.add(rec);
>             writer.write(buf);
>             buf.clear();
>         }
>     }
> }
>
>  --tim
>



More information about the Concurrency-interest mailing list