[concurrency-interest] Migrating DNS problem to j.u.c

robert lazarski robertlazarski at gmail.com
Thu Dec 14 10:08:12 EST 2006


Indeed, I was submitting tasks to a shutdown pool, thanks. I now have
a latch, which seems to now know when to shutdown the pool.

My current problem is that the future times out correctly, but
DirContext.list() isn't being cancelled of course, as it throws
"java.net.ConnectException: Connection timed out" long after
Future.cancel() . Page 148 of jcip states that closing the underlying
socket could help, but in this case I'm not sure how I could do that.
The program calls shutdown after about 90 seconds, but hangs for 5 and
a half more minutes. Any ideas?

Here's my latest code - not pretty but I'm learning alot ;-) .
Robert

package org;

import java.util.concurrent.*;

public class RecursionLatch {
    private final CountDownLatch done = new CountDownLatch(1);

    public boolean isSet() {
        return (done.getCount() == 0);
    }

    public synchronized void setCompleted() {
        if (!isSet()) {
            done.countDown();
        }
    }

    public void taskCompleted() throws InterruptedException {
        done.await();
    }
}

package org;

import static java.lang.System.out;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Hashtable;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import javax.naming.NameClassPair;
import javax.naming.NamingEnumeration;
import javax.naming.directory.Attributes;
import javax.naming.directory.DirContext;
import javax.naming.directory.InitialDirContext;
import javax.naming.directory.Attribute;
import javax.naming.directory.Attributes;
import javax.naming.NamingException;

public class DNS
{
  private static final AtomicInteger taskCount = new AtomicInteger(0);
  private static final RecursionLatch solution = new RecursionLatch();

  public static void main(String[] args) throws Exception {
      helloDNS();
  }

  /*******************************************************************
  Use this method to test dns
  *******************************************************************/
  private static void helloDNS() throws Exception {
       // calculateDNS("mailsrv2.atlantico.com.br", "atlantico.com.br");
       calculateDNS("bee.uspnet.usp.br", "usp.br");
  }

  private static List<String> calculateDNS(final String ipDNS, final
String domain)
          throws Exception {

      final Hashtable<String, String> env = new Hashtable<String, String>();
      env.put("java.naming.factory.initial",
"com.sun.jndi.dns.DnsContextFactory");
      env.put("java.naming.provider.url", "dns://" + ipDNS + "/");

      final DirContext ictx;
      ictx = new InitialDirContext(env);
      ExecutorService exec = Executors.newCachedThreadPool(
              new ONExceptionThreadFactory(new ONExceptionHandler()));

      NamingEnumeration hostEnumeration = null;
      boolean gotList = false;
      Callable <NamingEnumeration> listTask =
          new Callable<NamingEnumeration>() {
              public NamingEnumeration call() {
                  try {
                      return ictx.list(domain);
                  } catch (Exception ex) {
                      ex.printStackTrace();
                  }
                  return null;
              }
          };
      Future <NamingEnumeration> future = exec.submit(listTask);
      try {
          hostEnumeration = future.get(10L, TimeUnit.SECONDS);
          if (hostEnumeration != null) {
              gotList = true;
          }
      } catch (Exception ex) {
          ex.printStackTrace();
      } finally {
          future.cancel(true);
      }

      try {
          if (!gotList) {
              throw new Exception("Can't connect to DNS server");
          }
          // skip those already found
          Queue <String> domainsVisitedQueue = new
ConcurrentLinkedQueue<String>();
          domainsVisitedQueue.add(domain);

          Queue <String> resultQueue = new ConcurrentLinkedQueue<String>();
          parallelRecursiveDNS(exec, ictx, hostEnumeration,
domainsVisitedQueue, resultQueue );
          // wait for latch
          solution.taskCompleted();
          out.println("latch returned, waiting for shutdown");

          for (String s : resultQueue) {
              out.println("WTF: " + s);
          }
          return new ArrayList<String>(resultQueue);
      } catch (Exception ex) {
          ex.printStackTrace();
          throw new Exception (ex);
      } finally {
          exec.shutdown();
          out.println("awaiting termination");
          exec.awaitTermination(100L, TimeUnit.SECONDS);
          out.println("termination complete");
          if (ictx != null) {
            try {
                ictx.close();
            } catch (NamingException ex) {
                ex.printStackTrace();
            }
        }
      }

  }	

  private static void parallelRecursiveDNS(final ExecutorService exec,
final DirContext ictx,
          final NamingEnumeration hostEnumeration, final
Collection<String> domainsVisitedQueue,
          final Collection<String> results)
      throws Exception {


      while (hostEnumeration.hasMore()) {

          taskCount.incrementAndGet();
          exec.execute(new Runnable() {
              public void run() {
                  long runStart = System.currentTimeMillis();
                  try {
                      String host = null;
                      host = ((NameClassPair) hostEnumeration.next())
                              .getNameInNamespace();
                      if (results.contains(host)) {
                          return;
                      }
                      results.add(host);
                      out.println("Found host: " + host);
                      Attributes aDNS = ictx.getAttributes(host,
                                  new String[] { "NS" });
                      NamingEnumeration allDNS = aDNS.getAll();
                      while (allDNS.hasMore()) {
                          out.println("Entering allDNS: ");
                          Attribute attr = (Attribute) allDNS.next();
                          NamingEnumeration values = attr.getAll();
                          final String dns = values.next().toString();
                          if (domainsVisitedQueue.contains(dns)) {
                              continue;
                          }
                          domainsVisitedQueue.add(dns);

                          NamingEnumeration newEnumeration = null;
                          boolean gotList = false;
                          Callable <NamingEnumeration> listTask =
                              new Callable<NamingEnumeration>() {
                                  public NamingEnumeration call() {
                                      try {
                                          out.println("doing future on
ictx.list()");
                                          return ictx.list(dns);
                                      } catch (Exception ex) {
                                          ex.printStackTrace();
                                      }
                                      return null;
                                  }
                              };
                          Future <NamingEnumeration> future =
exec.submit(listTask);
                          try {
                              newEnumeration = future.get(10L,
TimeUnit.SECONDS);
                              if (newEnumeration != null) {
                                  gotList = true;
                              }
                          } catch (Exception ex) {
                              ex.printStackTrace();
                          } finally {
                              future.cancel(true);
                          }
                          if (!gotList) {
                              continue;
                          }
                          parallelRecursiveDNS(exec, ictx,
newEnumeration, domainsVisitedQueue, results);
                      }
                  } catch (Exception ex) {
                      ex.printStackTrace();
                  } finally {
                      long runEnd = System.currentTimeMillis();
                      out.println("runnable execution time was "
                        + (runEnd - runStart) / 1000 + " seconds ");
                      if (taskCount.decrementAndGet() == 0) {
                          out.println("\n\nparallelRecursiveDNS finished\n\n");
                          solution.setCompleted();
                      }
                  }

              }
          });
      }
  }
}


On 12/13/06, David Holmes <dcholmes at optusnet.com.au> wrote:
> Robert,
>
> You are shutting down the pool while it is still in use. The Runnables you
> submit in the first parallelRecursiveDNS still submit further Runnables in
> their own recursion.
>
> This is likely why you get the RejectedExecutionExceptions - you are
> submitting to a shutdown pool.
>
> You should shutdown the pool after you have completed the task.
>
> David Holmes
>
> > -----Original Message-----
> > From: concurrency-interest-bounces at cs.oswego.edu
> > [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of robert
> > lazarski
> > Sent: Thursday, 14 December 2006 12:24 AM
> > Cc: Concurrency-interest at cs.oswego.edu
> > Subject: Re: [concurrency-interest] Migrating DNS problem to j.u.c
> >
> >
> > OK, I put exec.shutdown() right before exec.awaitTermination().
> >
> > However, I'm unclear on why the program wouldn't know if the recursion
> > is finished - lets assume no timeout. It roughly follows the jcip
> > listing 8.11 and 8.12, which doesn't have a latch or atomic counter.
> > Do I need a latch and atomic counter like the puzzle program for my
> > program to work right, as shown? I ask because each Runnable in my
> > case has the potential to add to the Collection, unlike the puzzler
> > which is looking for one result.
> >
> > My main concern beyond the correct results is time: The program as
> > shown takes 7 minutes to run, though the original broken program was
> > even slower.
> >
> > Thanks,
> > Robert
> >
> > On 12/12/06, Tim Peierls <tim at peierls.net> wrote:
> > > You need to shutdown the ExecutorService before
> > awaitTermination can ever
> > > succeed. See Listings 7.16 and 7.22 in JCiP for examples of
> > awaitTermination
> > > with shutdown and shutdownNow.
> > >
> > > In your case, you don't know when the recursion is finished, so
> > you don't
> > > know when to shut down the pool. See the discussion on p.187 of
> > JCiP about
> > > stopping ConcurrentPuzzleSolver for different ways to deal with this.
> > >
> > > --tim
> > >
> > >
> > > On 12/12/06, robert lazarski < robertlazarski at gmail.com> wrote:
> > > >
> > > > On 12/12/06, Tim Peierls < tim at peierls.net> wrote:
> > > > > The minimal approach
> > > >
> > > > Thanks for the reply Tim. I've decided to start from scratch as the
> > > > original code is broke in several ways. The whole recursion part may
> > > > be a misunderstanding of the original coder. Anyways, while I'm trying
> > > > to figure it out I implemented it using j.u.c and I'm posting it here
> > > > in case someone could review it.
> > > >
> > > > There is one question I have: awaitTermination doesn't do what I
> > > > expect - end after 100 seconds, ie, I get the List returned but the
> > > > main() keeps going. Perhaps something to do with daemon threads. Any
> > > > insight appreciated.
> > > >
> > > > package org;
> > > >
> > > > import static java.lang.System.out;
> > > >
> > > > import java.util.ArrayList;
> > > > import java.util.Collection ;
> > > > import java.util.Hashtable;
> > > > import java.util.List;
> > > > import java.util.Queue;
> > > > import java.util.concurrent.ConcurrentLinkedQueue;
> > > > import java.util.concurrent.Executor;
> > > > import java.util.concurrent.ExecutorService ;
> > > > import java.util.concurrent.Executors;
> > > > import java.util.concurrent.TimeUnit;
> > > >
> > > > import javax.naming.NameClassPair;
> > > > import javax.naming.NamingEnumeration;
> > > > import javax.naming.directory.Attributes;
> > > > import javax.naming.directory.DirContext;
> > > > import javax.naming.directory.InitialDirContext;
> > > > import javax.naming.directory.Attribute;
> > > > import javax.naming.directory.Attributes;
> > > >
> > > > public class DNS
> > > > {
> > > >
> > > >   public static void main(String[] args) throws Exception {
> > > >       helloDNS();
> > > >   }
> > > >
> > > >
> > > /*******************************************************************
> > > >   Use this method to test dns
> > > >
> > > *******************************************************************/
> > > >   private static void helloDNS() throws Exception {
> > > >        calculateDNS("bee.uspnet.usp.br", "usp.br");
> > > >   }
> > > >
> > > >   private static List<String> calculateDNS(String dnsIP,
> > String domain)
> > > >           throws Exception {
> > > >
> > > >       Hashtable<String, String> env = new Hashtable<String, String>();
> > > >       env.put("java.naming.factory.initial",
> > > > "com.sun.jndi.dns.DnsContextFactory ");
> > > >       env.put("java.naming.provider.url",  "dns://" + dnsIP + "/");
> > > >
> > > >       // obter contexto inicial
> > > >       DirContext ictx = null;
> > > >       NamingEnumeration hostEnumeration = null;
> > > >       try {
> > > >           out.println("getting conn: ");
> > > >           ictx = new InitialDirContext(env);
> > > >           out.println("getting conn, getting list ");
> > > >           hostEnumeration = ictx.list (domain);
> > > >           out.println("got list ");
> > > >       } catch (Exception ex) {
> > > >           ex.printStackTrace();
> > > >           throw new Exception(ex);
> > > >       }
> > > >       ExecutorService exec = Executors.newCachedThreadPool (
> > > >               new ONExceptionThreadFactory(new ONExceptionHandler()));
> > > >       // skip those already found
> > > >       Queue <String> domainsVisitedQueue = new
> > > ConcurrentLinkedQueue<String>();
> > > >       domainsVisitedQueue.add (domain);
> > > >       Queue <String> resultQueue = new
> > > ConcurrentLinkedQueue<String>();
> > > >       parallelRecursiveDNS(exec, ictx, hostEnumeration,
> > > > domainsVisitedQueue, resultQueue);
> > > >       exec.awaitTermination (100L, TimeUnit.SECONDS);
> > > >
> > > >       return new ArrayList<String>(resultQueue);
> > > >   }
> > > >
> > > >   private static void parallelRecursiveDNS(final Executor exec, final
> > > > DirContext ictx,
> > > >           final NamingEnumeration hostEnumeration, final
> > > > Collection<String> domainsVisitedQueue,
> > > >           final Collection<String> results)
> > > >       throws Exception {
> > > >
> > > >       while (hostEnumeration.hasMore()) {
> > > >
> > > >           exec.execute(new Runnable() {
> > > >               public void run() {
> > > >                   try {
> > > >                       String host = null;
> > > >                       host = ((NameClassPair) hostEnumeration.next())
> > > >                               .getNameInNamespace();
> > > >                       results.add(host);
> > > >                       out.println("Found host: " + host);
> > > >                       // all 'A records'
> > > >                       Attributes a =
> > > ictx.getAttributes(host,
> > > >                               new String[] { "A" });
> > > >                       if (a.get("A") != null) {
> > > >                           String ip =
> > > a.get("A").get().toString();
> > > >                            results.add(ip);
> > > >                           out.println("Found ip: " + ip);
> > > >                       }
> > > >                       // enter task suitable for recursion?
> > > >                       Attributes aDNS =
> > > ictx.getAttributes(host,
> > > >                                   new String[] { "NS" });
> > > >                       NamingEnumeration allDNS =
> > > aDNS.getAll();
> > > >                       while (allDNS.hasMore()) {
> > > >                            out.println("Entering allDNS: ");
> > > >                           Attribute attr = (Attribute)
> > > allDNS.next();
> > > >                           NamingEnumeration values =
> > > attr.getAll();
> > > >                           String dns =
> > > values.next().toString();
> > > >                           if (domainsVisitedQueue.contains(dns)) {
> > > >                               continue;
> > > >                           }
> > > >                           NamingEnumeration newHost =
> > > ictx.list(dns);
> > > >                           out.println("doing recursion: ");
> > > >                           parallelRecursiveDNS(exec,
> > > ictx, newHost,
> > > > domainsVisitedQueue, results);
> > > >                       }
> > > >                   } catch (Exception ex) { ex.printStackTrace(); }
> > > >
> > > >               }
> > > >
> > > >           });
> > > >       }
> > > >   }
> > > > }
> > > > _______________________________________________
> > > > Concurrency-interest mailing list
> > > > Concurrency-interest at altair.cs.oswego.edu
> > > >
> > > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> > > >
> > >
> > >
> > _______________________________________________
> > Concurrency-interest mailing list
> > Concurrency-interest at altair.cs.oswego.edu
> > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>


More information about the Concurrency-interest mailing list