[concurrency-interest] Simple ScheduledFuture problem

Dhanji R. Prasanna dhanji at gmail.com
Tue Aug 22 19:45:10 EDT 2006


Im curious, what is the purpose of declaring flag volatile AND
synchronizing setter access to it?

If it is volatile it will (could) be overwritten concurrently in a
visible manner anyway. Afaik the purpose of synchronizing access to a
field is to prevent diverging concurrent states (i.e. multiple states
of the same variable cached in separate threads and merged back
non-deterministically), afaik this is a moot point since the field is
volatile--a read should yield the same value of "flag" for all threads
should it not?

Since there is no happens-before edge established in setFlag(), I fail
to see how synchronizing it does anything useful?

I may very well be missing something here...

On 8/23/06, robert lazarski <robertlazarski at gmail.com> wrote:
> Thanks Tim. If its any consolation, I did buy the concurrency in
> practice book and I'm reading it every day, though I just started ;-)
> . PingTask is an inner class and can't have static vars , so this is
> the entire new version. Thanks!
>
> package org;
>
> import java.io.BufferedReader;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> import java.net.InetAddress;
> import java.net.Socket;
> import java.net.URL;
> import java.net.URLConnection;
> import java.util.HashMap;
> import java.util.Map;
> import java.util.concurrent.Executors;
> import java.util.concurrent.FutureTask;
> import java.util.concurrent.ScheduledExecutorService;
> import java.util.concurrent.ScheduledFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.Future;
> import java.util.concurrent.TimeoutException;
> import java.util.concurrent.ExecutionException;
> import java.util.concurrent.CancellationException;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
>
> public class Ping {
>
>      // for the inner class PingTask
>      static ExecutorService exec = Executors.newCachedThreadPool();
>      private volatile int  flag = 0;
>      private static int STOPPED = new Integer(0);
>      private static int BEGIN_INIT = new Integer(1);
>      private static int RUNNING = new Integer(2);
>          private static Map <Integer, String> map;
>
>      /** commons logging declaration. */
>      private static Log logger = LogFactory.getLog(
>                Ping.class);
>
>      public static void main(String[] args) throws Exception {
>
>              new Ping();
>      }
>
>      Ping () {
>          ScheduledExecutorService ses =
> Executors.newSingleThreadScheduledExecutor();
>          // Do pings, starting now, with a 2 second delay in between
>          ScheduledFuture <?> ping = ses.scheduleWithFixedDelay(new PingTask(),
>              0L, 2000L, TimeUnit.MILLISECONDS);
>      }
>
>      static {
>          map = new HashMap<Integer, String>();
>          map.put(STOPPED, "STOPPED");
>          map.put(BEGIN_INIT, "BEGIN_INIT");
>          map.put(RUNNING, "RUNNING");
>      }
>
>      private synchronized void setFlag(int var) {
>          flag = var;
>      }
>
>      /** Get message via an int.
>       * @param code Integer mapped to message
>       * @return String mapped message
>       */
>      public final String getMessage(int code) {
>          return map.get(code);
>      }
>
>      class PingFuture implements Runnable {
>          private void doConnect(String host, int port, boolean
> on_connect, int state) {
>              try {
>                  logger.debug("connecting to host: " +host+ ", port: " + port);
>                  InetAddress addr = InetAddress.getByName(host);
>                  // will throw an exception if could not connect
>                  Socket s = new Socket(addr, port);
>                  s.close();
>                  if (on_connect) {
>                      logger.debug("Found port: " + port);
>                      setFlag(state);
>                  }
>
>              } catch (Exception ex) {
>                  logger.error("Can't find port: " + port);
>                  logger.error(ex.getMessage(), ex);
>                  if (!on_connect) {
>                      setFlag(state);
>                  }
>              }
>
>          }
>
>          private void doConnect(URL host, boolean on_connect, int
> state) throws IOException {
>              InputStream is = null;
>              BufferedReader in = null;
>              try {
>                  logger.debug("connecting to url: " + host.toString());
>                  URLConnection uc = host.openConnection();
>                  if (uc == null) {
>                      logger.error("Got a null URLConnection object!");
>                      return;
>                  }
>                  is = uc.getInputStream();
>                  if (is == null) {
>                      logger.error("Got a null content object!");
>                      return;
>                  }
>                  in = new BufferedReader(new InputStreamReader(
>                        is));
>                  String inputLine;
>                  // just test that its readable for now
>                  while ((inputLine = in.readLine()) != null)  {
>                        ;
>                  }
>                  if (on_connect) {
>                      logger.debug("Found url: " + host.toString());
>                      setFlag(state);
>                  }
>
>              } catch (Exception ex) {
>                  logger.error("Can't find url: " + host.toString());
>                  logger.error(ex.getMessage(), ex);
>                  if (!on_connect) {
>                      setFlag(state);
>                  }
>              } finally {
>                  if (is != null) {
>                      is.close();
>                  }
>                  if (in != null) {
>                      in.close();
>                  }
>              }
>
>          }
>
>          public void run() {
>
>              try {
>                  if (flag == STOPPED) {
>                      doConnect("localhost", 1099, true, BEGIN_INIT);
>                  }
>                  if (flag == BEGIN_INIT) {
>                      // test state of stopped to prevent endless loop
>                      doConnect("localhost", 1099, false, STOPPED);
>                      doConnect(new
> URL("http://localhost:8080/maragato/"), true, RUNNING);
>
>                  }
>                  if (flag == RUNNING) {
>                      doConnect(new
> URL("http://localhost:8080/maragato/"), false, STOPPED);
>                  }
>
>              } catch (Exception ex) {
>                    logger.error(ex.getMessage(), ex);
>              }
>          }
>      } // end inner class PingFuture
>
>      class PingTask implements Runnable {
>
>
>          public void run() {
>
>              Future f = null;
>              boolean interrupted = false;
>              try {
>                  f = exec.submit(new PingFuture());
>                  f.get(5000, TimeUnit.MILLISECONDS);
>
>              } catch (ExecutionException ex) {
>                    interrupted = true;
>                    logger.error("Future threw an error: \n" +
> ex.getCause(), ex);
>              } catch (CancellationException ex) {
>                    interrupted = true;
>                    logger.error("Future cancelled: \n" + ex.getCause(), ex);
>              } catch (TimeoutException ex) {
>                    interrupted = true;
>                    logger.error("Future timed out: \n" + ex.getCause(), ex);
>              } catch (InterruptedException ex) {
>                    interrupted = true;
>                    logger.error("Future interrupted: \n" + ex.getCause(), ex);
>              } catch (Exception ex) {
>                    logger.error("Unexpected error: \n" + ex.getMessage(), ex);
>              }
>              finally {
>                  f.cancel(true);
>                  logger.debug("CURRENT STATE: " + getMessage(flag));
>
>              }
>              if (interrupted) {
>                  Thread.currentThread().interrupt();
>              }
>          }
>      } // end inner class PingTask
> }
>
>
> On 8/22/06, Tim Peierls <tim at peierls.net> wrote:
>
>
> On 8/22/06, Tim Peierls <tim at peierls.net> wrote:
> > Also, instead of catching Exception, you can catch the more specific
> > ExecutionException -- thrown by Future.get() -- and examine the underlying
> > Throwable with getCause().
> >
> >
> > --tim
> >
> >
> > >
> > >
> > > On 8/22/06, robert lazarski <robertlazarski at gmail.com> wrote:
> > > > I finally got some time to implement the latest suggestions of David
> > > > and Tim. This is what I came up with:
> > > >
> > > > class PingTask implements Runnable {
> > > >
> > > >          public void run() {
> > > >
> > > >              Future f = null;
> > > >              boolean interrupted = false;
> > > >              try {
> > > >                  ExecutorService exec = Executors.newCachedThreadPool();
> > > >                  f = exec.submit(new PingFuture());
> > > >                  f.get(5000, TimeUnit.MILLISECONDS);
> > > >
> > > >              } catch (TimeoutException ex) {
> > > >                    interrupted = true;
> > > >                    logger.error("Future timed out: \n" +
> > ex.getMessage(), ex);
> > > >              } catch (InterruptedException ex) {
> > > >                    interrupted = true;
> > > >                    logger.error ("Future interrupted: \n" +
> > ex.getMessage(), ex);
> > > >              } catch (Exception ex) {
> > > >                    logger.error(ex.getMessage(), ex);
> > > >              }
> > > >              finally {
> > > >                  f.cancel(true);
> > > >                  logger.debug("CURRENT STATE: " + getMessage(flag));
> > > >
> > > >              }
> > > >              if (interrupted) {
> > > >                  Thread.currentThread().interrupt();
> > > >              }
> > > >          }
> > > >      } // end inner class PingTask
> > > >
> > > > Thanks for the feedback!
> > > > Robert
> > > >
> > >
> >
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at altair.cs.oswego.edu
> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
>


More information about the Concurrency-interest mailing list