[concurrency-interest] Creating a Set of pings

robert lazarski robertlazarski at gmail.com
Sat May 27 10:28:44 EDT 2006


**Tim, thanks for your very insightful help - I've learned alot. I'll be
looking for your book here in Brazil, and Joshua's too ;-) . I've also taken
Dawid's advice about doing scans and have alerted the requirement guys.

I've coded both versions you mentioned - one with two invokeAll() as you
showed, and one with two ExecutorCompletionService . I won't clutter this
post with the invokeAll() version here, but anyone who care's to see them
both in their entirety can here:

invokeAll() version:
http://www.braziloutsource.com/random/CanSnmp.java

ExecutorCompletionService version:
http://www.braziloutsource.com/random/PerfSnmpScan.java

I'm still a little concerned on performance. The invokeAll() version is
taking 32 seconds on a class C subnet (for example, 192.168.10.1 to
192.168.10.254 ) , while under the same circumstances the
ExecutorCompletionService version is taking 25 seconds. I'm running a P4
3GHZ machine with just one NIC under Suse Linux 9.3 . This is for both an
isReachable() and snmp timeout of 1 second.

The other concern I have is that isReachable() with more than 5 threads
starts throwing "java.net.ConnectException: No buffer space available" when
performing on multiple subnets - for example class A with 255.255.248.0 as
the subnet mask, I get this error after about 1000 isReachables() were
performed.

Anyways, here's what I came up with. Any suggestions highly appreciated.
Thanks!

            // autodescoberta == auto discovery

        *public* static *List*<*String*>
autodescobertaSCAN3(final *String* mascara, final
*String* ip,
            final *String* comunidadeLeitura, final
*String* comunidadeEscrita,
            final *Integer* timeout, final
*Integer* porta, final *Long* idMapa)
            *throws* FramexException, *TimeoutException* {
        *try* {
            long scanStart = *System*.currentTimeMillis();
            *ExecutorService* reachableExec = *Executors*.newFixedThreadPool
(10);
            *ExecutorService* snmpConnectExec = *Executors*.newFixedThreadPool
(5);

            *List* <*String*> ips = getValidIPs(mascara, ip);
            *List*<*InetAddress*> addrs = *new* *ArrayList*
<*InetAddress*>();
            *for* (*Iterator* iter = ips.iterator(); iter.hasNext();) {
                addrs.add(*InetAddress*.getByName((
*String*) iter.next()));
            }

            *Collection*<*Callable*<*InetAddress*>> testReachableTasks =

*new* *ArrayList*<*Callable*<*InetAddress* >>(); *for* (final
*InetAddress*addr : addrs) { testReachableTasks.
add(*new* *Callable*<*InetAddress* >() { *public* *InetAddress* call
() { *try* { *return* addr.isReachable(timeout) ? addr : *null*; } *catch* (
*Exception* ex) { ex.printStackTrace(); } *return* *null*; } }); } *//
Manages a blocking queue of testReachableTasks* *// objects, passing their
results to the Snmp connector as they become available.* *
ExecutorCompletionService* reachableService = *new* *
ExecutorCompletionService*(reachableExec); *// Manage a blocking queue of
trySnmpConnectTasks* *// objects, adding their results to the
snmpManagebleResults as they become available.*
*ExecutorCompletionService*snmpService =
*new* *ExecutorCompletionService*(snmpConnectExec); *// * *List*<*Callable*<
*InetAddress*>> trySnmpConnectTasks = *new* *ArrayList*<*Callable*<*
InetAddress* >>(); *for* (*Callable*<*InetAddress*> reachableTask :
testReachableTasks) { reachableService.submit(reachableTask); }
intsnmpTryConnects =
0; *for* (int i = 0; i < testReachableTasks.size(); i++) { *try* { final *
InetAddress* reachableAddr = (*InetAddress*) reachableService. take().get();
*// address was not reachable* *if* (reachableAddr == *null*) { *continue*;
} *// initialize the snmp connect queue* *Callable* snmpCallable = *new* *
Callable*<*InetAddress *>() { *public* *InetAddress* call() *throws* java.
lang .Exception { *// facade.getByName() throws an exception* *try* { *
return* (getSnmpFacade().getByName(reachableAddr.getHostName (), porta,
timeout, comunidadeLeitura, MibConstants.SYS_OBJECT_ID + ".0") != *null*) ?
reachableAddr : *null*; } *catch* (*TimeoutException* ex) { *// this error
is expected to happen alot* } *catch* (*Exception* ex) { ex.printStackTrace();
} *return* *null*; } }; snmpService.submit(snmpCallable); snmpTryConnects++;
} *catch* (*Exception* ex) { ex.printStackTrace(); } } *// shut down the
'reachableExec' pool after all registered tasks have completed,* *// and
prevent new tasks from being assigned* reachableExec.shutdown(); *Set*<*
String*> snmpManagebleResults = *new* *HashSet* <*String*>(); *for* (int i =
0; i < snmpTryConnects; i++) { *try* { final *InetAddress* snmpAddr = (*
InetAddress*) snmpService. take().get(); *if* (snmpAddr != *null*) {
snmpManagebleResults.add(snmpAddr.getHostAddress()); } } *catch* (*Exception
* ex) { ex.printStackTrace(); } } *// shut down the 'snmpConnectExec' pool
after all registered tasks have completed,* *// and prevent new tasks from
being assigned* snmpConnectExec.shutdown(); long scanEnd = *System*.
currentTimeMillis(); out.println("snmp scan completed!!! , Execution time
was " + (scanEnd - scanStart) / 1000 + " seconds ");
*// compare result to previous result*
 } *catch* (*Exception* ex) { ex.printStackTrace(); *throw* *new*
FramexException(ex.getMessage(), ex); } *return* *null*; }Cheers,
 Robert

On 5/23/06, Tim Peierls <tim at peierls.net> wrote:
>
> If you don't mind doing all step 1 work before all step 2/3 work, use
> ExecutorService.invokeAll on a collection of Callable<InetAddress> tasks
> that test isReachable.
>
> For each of these that succeeds, create a new Callable<InetAddress> that
> tries to connect on port 161, and pass a collection these new tasks to
> another invocation of ExecutorService.invokeAll.
>
> For each of these that succeeds, put the resulting InetAddress into a
> plain HashSet -- CopyOnWriteArrayList is for lists that are frequently read
> by different threads but rarely written to. This set need only be accessed
> by one thread, the thread that creates it and compares it to some previous
> value -- this previous value could be a thread-safe set if other threads
> need to access it.
>
> Here's an uncompiled, untested, and possibly completely wrong-headed
> sketch:
>
> ExecutorService exec = ...;
> Collection<InetAddress> addrs = ...;
>
> List<Callable<InetAddress>> testReachableTasks =
>     new ArrayList<Callable<InetAddress>>();
> for (final InetAddress addr : addrs) {
>     testReachableTasks.add(new Callable<InetAddress>() {
>         public InetAddress call() {
>             return isReachable(addr) ? addr : null;
>         }
>     });
> }
> List<Future<InetAddress>> reachables =
>     exec.invokeAll(testReachableTasks);
>
> List<Callable<InetAddress>> tryConnectTasks =
>     new ArrayList<Callable<InetAddress>>();
> for (Future<InetAddress>> reachable : reachables) {
>     // need to handle exceptions thrown by Future.get()
>     final InetAddress addr = reachable.get();
>     if (addr == null) continue;
>
>     tryConnectTasks.add(new Callable<InetAddress>() {
>         public InetAddress call() {
>             return (canConnectUdp (addr)
>                  || canConnectSnmp(addr)) ? addr : null;
>         }
>     });
> }
> List<Future<InetAddress>> connectables =
>     exec.invokeAll(tryConnectTasks);
>
> Set<InetAddress> result = new HashSet<InetAddress>();
> for (Future<InetAddress>> connectable : connectables) {
>     // need to handle exceptions thrown by Future.get()
>     InetAddress addr = connectable.get();
>     if (addr != null) result.add(addr);
> }
> // compare result to previous result
>
> If you want to start doing step 2/3 work before all step 1 work is
> complete, use 2 instances of ExecutorCompletionService, one corresponding to
> each call to invokeAll, and pull completed tasks off of one completion queue
> to insert it into the next. It's a little trickier, since you have to count
> submissions to know when you're done with a completion queue.
>
> I can't advise on how to size the pool. How many network interfaces are
> there? How many processors are there? Seems like something you'd want to be
> able to tune.
>
> --tim
>
> On 5/22/06, robert lazarski <robertlazarski at gmail.com> wrote:
>
> > Could someone please give some advice on these requirements?
>
> 1) Find all IP's on a given subnet that responds to ICMP  (ping). So for
> say 192.168.10.0 with a netmask of 255.255.255.0, attempt a ping to 255
> addresses. In java  I believe I can use InetAddress.isReachable(timeout) .
>
>
> 2) On a valid IP, try to connect on port 161 via UDP and snmp.
>
> 3) Put the valid IP's inside an implementation of java.util.Set .
>
> 4) Compare new Set with a previous Set and show updates.
>
> My questions are:
>
> 1) Perform steps 1, 2, and possibly 3 in a Callable or Runnable. On
> typical intel hardware does an ExecutorService with a size of 5 sound about
> right?
> 2) Should I pass in a Callable and get the results back 255 times, or
> should I use Runnable and have the threads write the result to a common
> area, ie, an object with a synchronized method ?
> 3) Should I use one of the concurrent collections to store the results?
> CopyOnWriteArrayList seems to fit.
>
> Thanks for any help,
> Robert
>
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>
>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: /pipermail/attachments/20060527/1c668929/attachment-0001.html 


More information about the Concurrency-interest mailing list