[concurrency-interest] TransferQueue batch processing

Carfield Yim carfield at carfield.com.hk
Thu Jul 14 11:56:51 EDT 2016


I get code like this as a feed handler:

  private def consume(name: String, callback: (List[Base]) =>
Array[Array[_]], queue: BlockingQueue[Base]) = {
    val listener = new Thread(new Runnable {
      def run() {
        while (true) {
          var list = new ArrayList[Base]();
          try {
            queue.drainTo(list);
            if (list.size() > 0) {
              var converted = callback(list.toList);
              batchInsert(name, converted);
            } else {
              Thread.sleep(100);
            }
          } catch {
            case e: Exception =>
              logger.error(list.toString(), e);
          }
        }
      }
    }, name);
    listener.start();
  }

  def consume(x: Trade) = {
    tradeQueue.put(x)
  }

It work reasonable good but there are still time that too much update from
upstream causing the message blocked and doesn't process fast enough.
Recently I come up with this article (http://php.sabscape.com/blog/?p=557)
saying that transferqueue may help to reduce the blocking time. However,
after I change to use transferqueue and change put() to transfer(), it
actually slower the processing time.

I believe the reason is transferqueue.transfer() will face the handler get
the message asap, thus queue.drainTo(list) will always draining single
element to the list and there won't be batch processing.

Thus, if that mean transfer queue doesn't work for my case? Will it is
similar that disruptor also don't target for this kind of batch processing?
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://cs.oswego.edu/pipermail/concurrency-interest/attachments/20160714/f640de55/attachment.html>


More information about the Concurrency-interest mailing list