[concurrency-interest] producer-consumer POISON msg

Satyendra Gurjar sg at sgurjar.com
Wed Dec 1 13:42:00 EST 2010


Later on, I have realized my problem was due to following code. I was using
find command to get the list of file (as File.listFiles was very slow, and
no built-in support for glob). StreamReader stdout puts File on the filequeue
that it reads from stdout of Process. Process.waitFor returns while
StreamReader
stdout still putting Files on the filequeue and not finished, causing Producer
thread to end before all produced files are put on the filequeue,
causing POISION msg to put ahead of real msg.

class Producer implements Runnable {
  String basedir, filenameglob;
  BlockingQueue<File> filequeue;

  Producer(String basedir, String filenameglob, BlockingQueue<File> filequeue){
        this.basedir      = basedir;
        this.filenameglob = filenameglob;
        this.filequeue    = filequeue;
  }

  public void run(){
    try{
      String cmd = "find " + basedir + " -name " + filenameglob;
      Process proc = Runtime.getRuntime().exec(cmd);

      StreamReader stderr = new StreamReader(proc.getErrorStream());
      StreamReader stdout = new StreamReader(proc.getInputStream(), filequeue);
      new Thread(stderr).start();
      new Thread(stdout).start();

      int exitValue = proc.waitFor();

    } catch(InterruptedException ex) {
      ex.printStackTrace(System.err);
      Thread.currentThread().interrupt();
    } catch(Exception ex) {
      ex.printStackTrace(System.err);
    }
  }
}

I fixed run method as following

      String cmd = "find " + basedir + " -name " + filenameglob;
      Process proc = Runtime.getRuntime().exec(cmd);

      Thread stderr_th = new Thread(new StreamReader(proc.getErrorStream()));
      Thread stdout_th = new Thread(new
StreamReader(proc.getInputStream(), filequeue));
      stderr_th.start();
      stdout_th.start();

      int exitValue = proc.waitFor();

      stderr_th.join();
      stdout_th.join();

Thanks.

On Wed, Dec 1, 2010 at 2:44 AM, David Holmes <davidcholmes at aapt.net.au> wrote:
> Just a clarification:
>
>> Hello, I'm trying producer-consumer pattern as described in jcip 5.3
>> I'm putting POISON MSG for consumers to stop after finish
>> consuming real msgs.
>> But what I'm seeing is consumers consumes POISON MSG before real msgs and
>> stops. Following is my code, please help me understand the behavior.
>
> How do you observe this? Are there still real messages in the queue after
> the consumers are done - how do you detect that? Or is it just that some
> consumers report real messages after other report that they have been
> poisoned? This latter case is quite possible because a consumer can take a
> real message and then get descheduled and only resume after other consumers
> have been poisoned.
>
> David Holmes
>
>> Thanks.
>>
>>
>> static final File POISON_MSG = new File("");
>>
>> static void main(String[] args) throws Exception {
>>
>>  int N_CONSUMERS = 10, BOUND = 1000;
>>  BlockingQueue<File> blockingqueue = new LinkedBlockingQueue<File>(BOUND);
>>
>>  ArrayList<Thread> producers = new ArrayList<Thread>();
>>  for(String[] file : logfiles) { // producer threads
>>      Thread t = new Thread(new Producer(file[0], file[1], blockingqueue));
>>      t.start();
>>      producers.add(t);
>>  }
>>
>>  Crawler crawler = new Crawler(); // Crawler is immutable
>>
>>  for(int i=0; i < N_CONSUMERS; i++) { // consumer threads
>>      new Thread(new Consumer(blockingqueue, crawler)).start();
>>  }
>>
>>  // wait for all producers to finish
>>  for(Thread p : producers) p.join();
>>
>>  // then put N_CONSUMERS POISON_MSG for consumers to stop
>>  for(int i=0; i < N_CONSUMERS; i++) blockingqueue.put(POISON_MSG);
>>
>> }
>> _______________________________________________
>> Concurrency-interest mailing list
>> Concurrency-interest at cs.oswego.edu
>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>
>



More information about the Concurrency-interest mailing list