[concurrency-interest] Simple ScheduledFuture problem

robert lazarski robertlazarski at gmail.com
Tue Aug 22 19:50:18 EDT 2006


It was due to advice from Joe BowBeer:

" 1. setFlag is synchronized but there is no synchronized getFlag method.

Either added synchronized getFlag, or declare flag to be "volatile". "

Robert

On 8/22/06, Dhanji R. Prasanna <dhanji at gmail.com> wrote:
> Im curious, what is the purpose of declaring flag volatile AND
> synchronizing setter access to it?
>
> If it is volatile it will (could) be overwritten concurrently in a
> visible manner anyway. Afaik the purpose of synchronizing access to a
> field is to prevent diverging concurrent states (i.e. multiple states
> of the same variable cached in separate threads and merged back
> non-deterministically), afaik this is a moot point since the field is
> volatile--a read should yield the same value of "flag" for all threads
> should it not?
>
> Since there is no happens-before edge established in setFlag(), I fail
> to see how synchronizing it does anything useful?
>
> I may very well be missing something here...
>
> On 8/23/06, robert lazarski <robertlazarski at gmail.com> wrote:
> > Thanks Tim. If its any consolation, I did buy the concurrency in
> > practice book and I'm reading it every day, though I just started ;-)
> > . PingTask is an inner class and can't have static vars , so this is
> > the entire new version. Thanks!
> >
> > package org;
> >
> > import java.io.BufferedReader;
> > import java.io.IOException;
> > import java.io.InputStream;
> > import java.io.InputStreamReader;
> > import java.net.InetAddress;
> > import java.net.Socket;
> > import java.net.URL;
> > import java.net.URLConnection;
> > import java.util.HashMap;
> > import java.util.Map;
> > import java.util.concurrent.Executors;
> > import java.util.concurrent.FutureTask;
> > import java.util.concurrent.ScheduledExecutorService;
> > import java.util.concurrent.ScheduledFuture;
> > import java.util.concurrent.TimeUnit;
> > import java.util.concurrent.ExecutorService;
> > import java.util.concurrent.Executors;
> > import java.util.concurrent.Future;
> > import java.util.concurrent.TimeoutException;
> > import java.util.concurrent.ExecutionException;
> > import java.util.concurrent.CancellationException;
> >
> > import org.apache.commons.logging.Log;
> > import org.apache.commons.logging.LogFactory;
> >
> > public class Ping {
> >
> >      // for the inner class PingTask
> >      static ExecutorService exec = Executors.newCachedThreadPool();
> >      private volatile int  flag = 0;
> >      private static int STOPPED = new Integer(0);
> >      private static int BEGIN_INIT = new Integer(1);
> >      private static int RUNNING = new Integer(2);
> >          private static Map <Integer, String> map;
> >
> >      /** commons logging declaration. */
> >      private static Log logger = LogFactory.getLog(
> >                Ping.class);
> >
> >      public static void main(String[] args) throws Exception {
> >
> >              new Ping();
> >      }
> >
> >      Ping () {
> >          ScheduledExecutorService ses =
> > Executors.newSingleThreadScheduledExecutor();
> >          // Do pings, starting now, with a 2 second delay in between
> >          ScheduledFuture <?> ping = ses.scheduleWithFixedDelay(new PingTask(),
> >              0L, 2000L, TimeUnit.MILLISECONDS);
> >      }
> >
> >      static {
> >          map = new HashMap<Integer, String>();
> >          map.put(STOPPED, "STOPPED");
> >          map.put(BEGIN_INIT, "BEGIN_INIT");
> >          map.put(RUNNING, "RUNNING");
> >      }
> >
> >      private synchronized void setFlag(int var) {
> >          flag = var;
> >      }
> >
> >      /** Get message via an int.
> >       * @param code Integer mapped to message
> >       * @return String mapped message
> >       */
> >      public final String getMessage(int code) {
> >          return map.get(code);
> >      }
> >
> >      class PingFuture implements Runnable {
> >          private void doConnect(String host, int port, boolean
> > on_connect, int state) {
> >              try {
> >                  logger.debug("connecting to host: " +host+ ", port: " + port);
> >                  InetAddress addr = InetAddress.getByName(host);
> >                  // will throw an exception if could not connect
> >                  Socket s = new Socket(addr, port);
> >                  s.close();
> >                  if (on_connect) {
> >                      logger.debug("Found port: " + port);
> >                      setFlag(state);
> >                  }
> >
> >              } catch (Exception ex) {
> >                  logger.error("Can't find port: " + port);
> >                  logger.error(ex.getMessage(), ex);
> >                  if (!on_connect) {
> >                      setFlag(state);
> >                  }
> >              }
> >
> >          }
> >
> >          private void doConnect(URL host, boolean on_connect, int
> > state) throws IOException {
> >              InputStream is = null;
> >              BufferedReader in = null;
> >              try {
> >                  logger.debug("connecting to url: " + host.toString());
> >                  URLConnection uc = host.openConnection();
> >                  if (uc == null) {
> >                      logger.error("Got a null URLConnection object!");
> >                      return;
> >                  }
> >                  is = uc.getInputStream();
> >                  if (is == null) {
> >                      logger.error("Got a null content object!");
> >                      return;
> >                  }
> >                  in = new BufferedReader(new InputStreamReader(
> >                        is));
> >                  String inputLine;
> >                  // just test that its readable for now
> >                  while ((inputLine = in.readLine()) != null)  {
> >                        ;
> >                  }
> >                  if (on_connect) {
> >                      logger.debug("Found url: " + host.toString());
> >                      setFlag(state);
> >                  }
> >
> >              } catch (Exception ex) {
> >                  logger.error("Can't find url: " + host.toString());
> >                  logger.error(ex.getMessage(), ex);
> >                  if (!on_connect) {
> >                      setFlag(state);
> >                  }
> >              } finally {
> >                  if (is != null) {
> >                      is.close();
> >                  }
> >                  if (in != null) {
> >                      in.close();
> >                  }
> >              }
> >
> >          }
> >
> >          public void run() {
> >
> >              try {
> >                  if (flag == STOPPED) {
> >                      doConnect("localhost", 1099, true, BEGIN_INIT);
> >                  }
> >                  if (flag == BEGIN_INIT) {
> >                      // test state of stopped to prevent endless loop
> >                      doConnect("localhost", 1099, false, STOPPED);
> >                      doConnect(new
> > URL("http://localhost:8080/maragato/"), true, RUNNING);
> >
> >                  }
> >                  if (flag == RUNNING) {
> >                      doConnect(new
> > URL("http://localhost:8080/maragato/"), false, STOPPED);
> >                  }
> >
> >              } catch (Exception ex) {
> >                    logger.error(ex.getMessage(), ex);
> >              }
> >          }
> >      } // end inner class PingFuture
> >
> >      class PingTask implements Runnable {
> >
> >
> >          public void run() {
> >
> >              Future f = null;
> >              boolean interrupted = false;
> >              try {
> >                  f = exec.submit(new PingFuture());
> >                  f.get(5000, TimeUnit.MILLISECONDS);
> >
> >              } catch (ExecutionException ex) {
> >                    interrupted = true;
> >                    logger.error("Future threw an error: \n" +
> > ex.getCause(), ex);
> >              } catch (CancellationException ex) {
> >                    interrupted = true;
> >                    logger.error("Future cancelled: \n" + ex.getCause(), ex);
> >              } catch (TimeoutException ex) {
> >                    interrupted = true;
> >                    logger.error("Future timed out: \n" + ex.getCause(), ex);
> >              } catch (InterruptedException ex) {
> >                    interrupted = true;
> >                    logger.error("Future interrupted: \n" + ex.getCause(), ex);
> >              } catch (Exception ex) {
> >                    logger.error("Unexpected error: \n" + ex.getMessage(), ex);
> >              }
> >              finally {
> >                  f.cancel(true);
> >                  logger.debug("CURRENT STATE: " + getMessage(flag));
> >
> >              }
> >              if (interrupted) {
> >                  Thread.currentThread().interrupt();
> >              }
> >          }
> >      } // end inner class PingTask
> > }
> >
> >
> > On 8/22/06, Tim Peierls <tim at peierls.net> wrote:
> >
> >
> > On 8/22/06, Tim Peierls <tim at peierls.net> wrote:
> > > Also, instead of catching Exception, you can catch the more specific
> > > ExecutionException -- thrown by Future.get() -- and examine the underlying
> > > Throwable with getCause().
> > >
> > >
> > > --tim
> > >
> > >
> > > >
> > > >
> > > > On 8/22/06, robert lazarski <robertlazarski at gmail.com> wrote:
> > > > > I finally got some time to implement the latest suggestions of David
> > > > > and Tim. This is what I came up with:
> > > > >
> > > > > class PingTask implements Runnable {
> > > > >
> > > > >          public void run() {
> > > > >
> > > > >              Future f = null;
> > > > >              boolean interrupted = false;
> > > > >              try {
> > > > >                  ExecutorService exec = Executors.newCachedThreadPool();
> > > > >                  f = exec.submit(new PingFuture());
> > > > >                  f.get(5000, TimeUnit.MILLISECONDS);
> > > > >
> > > > >              } catch (TimeoutException ex) {
> > > > >                    interrupted = true;
> > > > >                    logger.error("Future timed out: \n" +
> > > ex.getMessage(), ex);
> > > > >              } catch (InterruptedException ex) {
> > > > >                    interrupted = true;
> > > > >                    logger.error ("Future interrupted: \n" +
> > > ex.getMessage(), ex);
> > > > >              } catch (Exception ex) {
> > > > >                    logger.error(ex.getMessage(), ex);
> > > > >              }
> > > > >              finally {
> > > > >                  f.cancel(true);
> > > > >                  logger.debug("CURRENT STATE: " + getMessage(flag));
> > > > >
> > > > >              }
> > > > >              if (interrupted) {
> > > > >                  Thread.currentThread().interrupt();
> > > > >              }
> > > > >          }
> > > > >      } // end inner class PingTask
> > > > >
> > > > > Thanks for the feedback!
> > > > > Robert
> > > > >
> > > >
> > >
> > _______________________________________________
> > Concurrency-interest mailing list
> > Concurrency-interest at altair.cs.oswego.edu
> > http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
> >
>


More information about the Concurrency-interest mailing list