[concurrency-interest] Simple ScheduledFuture problem

robert lazarski robertlazarski at gmail.com
Wed Aug 16 17:34:05 EDT 2006


Hi all,

I've seen similair questions on the list but I'm still stuck. I just
want to timeout a future that is scheduled at a fixed rate. The code
below works, but does not run at a scheduled rate for about two
minutes until it recovers after timeout! Maybe I should pass a
ThreadFactory to the Executor?  I'm unclear how to apply
future.get(timeout) when using a fixed rate. Please help, code below
(just a quick hack, nothing serious) . What times out is
URLConnectiion.openConnection();

package org;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.Timer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class Ping {
	
	  private int flag = 0;
	  private static int STOPPED = new Integer(0);
	  private static int BEGIN_INIT = new Integer(1);
	  private static int RUNNING = new Integer(2);
          private static Map <Integer, String> map;
	
	  /** commons logging declaration. */
	  private static Log logger = LogFactory.getLog(
	    		Ping.class);

	  /** Timer expressed in milliseconds as six seconds */
	  private final static long ONCE_PER_SIX_SECONDS = 1000*6;

	  public static void main(String[] args) throws Exception {

              new Ping();
	  }

          Ping () {
              boolean interrupted = false;
              ScheduledExecutorService ses =
Executors.newSingleThreadScheduledExecutor();
              // Do pings, starting now, with a 6 second delay
              ScheduledFuture <?> ping = ses.scheduleAtFixedRate(new
PingTask(),
                  0L, 6000L, TimeUnit.MILLISECONDS);
	  }

          static {
              map = new HashMap<Integer, String>();
              map.put(STOPPED, "STOPPED");
              map.put(BEGIN_INIT, "BEGIN_INIT");
              map.put(RUNNING, "RUNNING");
          }

          class PingTask implements Runnable {

            private void doConnect(String host, int port, boolean
on_connect, int state) {
                try {
                    InetAddress addr = InetAddress.getByName(host);
                    // will throw an exception if could not connect
                    Socket s = new Socket(addr, port);
                    s.close();
                    if (on_connect) {
                        logger.debug("Found port: " + port);
                        setFlag(state);
                    }

                } catch (Exception ex) {
                    logger.error("Can't find port: " + port);
                    logger.error(ex.getMessage(), ex);
                    if (!on_connect) {
                        setFlag(state);
                    }
                }

            }

            private void doConnect(URL host, boolean on_connect, int state) {
                try {
                    logger.debug("connecting to url: " + host.toString());
                    URLConnection uc = host.openConnection();
                    if (uc == null) {
                        logger.error("Got a null URLConnection object!");
                        return;
                    }
                    InputStream is = uc.getInputStream();
                    if (is == null) {
                        logger.error("Got a null content object!");
                        return;
                    }
                    BufferedReader in = new BufferedReader(
                        new InputStreamReader(
                        is));
                    String inputLine;
                    // just test that its readable for now
                    while ((inputLine = in.readLine()) != null)  {
                        ;
                    }
                    in.close();
                    if (on_connect) {
                        logger.debug("Found url: " + host.toString());
                        setFlag(state);
                    }

                } catch (Exception ex) {
                    logger.error("Can't find url: " + host.toString());
                    logger.error(ex.getMessage(), ex);
                    if (!on_connect) {
                        setFlag(state);
                    }
                }

          }


          public void run() {

	      Timer timer = new Timer(true);
              try {
	          // Schedule a 5 second interupt for the timeout - ping
should complete
	          // in less than 5 seconds
	          timer.schedule(new TimeOutTask(Thread.currentThread()), 5000);
                  if (flag == STOPPED) {
                      doConnect("localhost", 1099, true, BEGIN_INIT);
                  }
                  if (flag == BEGIN_INIT) {
                      // test state of stopped to prevent endless loop
                      doConnect("localhost", 1099, false, STOPPED);
                      doConnect(new
URL("http://localhost:8080/maragato/"), true, RUNNING);
                  }
                  if (flag == RUNNING) {
                      doConnect("localhost", 1099, false, STOPPED);
                  }
                } catch (Exception ex) {
                    logger.error(ex.getMessage(), ex);
                }
                finally {
                    logger.debug("CURRENT STATE: " + getMessage(flag));
                    // task must be cancelled so Thread.interrupt() is
_not_ called
	            timer.cancel();
                }
            }
        }

        private synchronized void setFlag(int var) {
            flag = var;
        }

        /** Get message via an int.
         * @param code Integer mapped to message
         * @return String mapped message
         */
        public final String getMessage(int code) {
            return map.get(code);
        }
}

package org;

import java.util.TimerTask;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class TimeOutTask extends TimerTask {
	
	 /** commons logging declaration. */
    private static Log logger = LogFactory.getLog(
    		TimeOutTask.class);
	
	Thread t;

	TimeOutTask(Thread t) {
	    this.t = t;
	}

	/**
	An implementation of the Abstract Class TimerTask method run()
	*/
	public void run() {
	    if(t!= null && t.isAlive()) {
	        t.interrupt();
	        logger.error("thread: " + t.getName() + ", task timed out!");
	    }
	    else {
	    	logger.error("task timed out, but could not interrupt!");
	    }
	}
}


More information about the Concurrency-interest mailing list