[concurrency-interest] Concurrency Architecture

Gregg Wonderly gregg at cytetech.com
Mon Jun 15 12:05:03 EDT 2009


Another way to do this, is with Future.  I have sometimes used something like 
the following class (and the included test class) which uses Proxy to help with 
eliminating any server instance management.

The idea here is that server exceptions have a couple of different outcomes in 
client code, most typically.

1. When an error occurs, the method is known to be idempotent so you just call 
again.
2. The result of the method is unknown and the method is not known to be 
idempotent so you need to regain a view of the services state to make the next 
use of the service.

So, I push failure handling out to the invocation but to try and manage the 
recovery in a simple way.

With all the code below, once you have a ServerAcces<T> instance (acc) and have 
made an initial call to get a server (srv), you can just do

ServerAccess<T> acc;
T srv;

try {
	srv.invokeSomething(...);
} catch( RemoteException ex ) {
	log....
	srv = acc.getServer();
}

As the simple try something and if it fails, get a new server instance for the
next call. Note that this can create a delay in returning failure to outer level 
code because we are getting a new server instance after the failure, not on the 
next call.

If srv is local to the class and not a big concurrency issue, you might do 
something more like

try {
	if( srv == null ) {
		srv = acc.getServer();
	srv.invokeSomething(...);
} catch( RemoteException ex ) {
	log...
	srv = null;
}

There are of course other variations on the code below which can include doing 
the getServer() work inside of the Proxy.  This is one of the primary reasons I 
put a Proxy into most of this, because it makes it trivial to reorganize where 
the actual delay associated with finding the server occurs, so that users of the 
code as shown above, don't have to be altered to make it work in whatever way is 
appropriate as your application expands or contracts in complexity.

Gregg Wonderly

-------------------------------------------------------------------

public abstract class ServerAccess<T> {
	private volatile FutureTask<T> srvFut;
	private Logger log = Logger.getLogger(getClass().getName());
	private final Class[] interfaces;

	public ServerAccess( Class[]intfs ) {
		interfaces = intfs;
	}

	public T getServer() throws InterruptedException, ExecutionException {
		if( srvFut == null ) {
			synchronized( this ) {
				if( srvFut == null ) {
					srvFut = getConnectionCreator();
				}
			}
		}
		return wrapServer( srvFut.get() );
	}
	
	protected abstract FutureTask<T> getConnectionCreator();
	
	protected synchronized void invalidate() {
		srvFut = null;
	}
	
	protected T wrapServer( T srvr ) {
		return (T)Proxy.newProxyInstance( srvr.getClass().getClassLoader(), 
interfaces, new ClassHandler<T>( srvr ) );
	}

	private class ClassHandler<T> implements InvocationHandler {
		private T obj;
		public ClassHandler( T srvr ) {
			this.obj = srvr;
		}
		public Object invoke( Object proxy, Method method, Object[] args ) throws 
Throwable {
			try {
				return method.invoke( obj, args );
			} catch( Throwable ex ) {
				log.log(Level.SEVERE, ex.toString(), ex);
				// if IOException, get new server next call
				if( ex instanceof InvocationTargetException ) {
					InvocationTargetException itex = (InvocationTargetException)ex;
					ex = itex.getTargetException();
				}
				if( ex instanceof IOException )
					invalidate();
				throw ex;
			}
		}
	}
}

-----------------------------------------------------------------------------

public class ServerAccessTest implements Runnable {
	Logger log = Logger.getLogger(getClass().getName());
	public static void main( String args[] ) {
		for( int i = 0; i < 10; ++i ) {
			ServerAccessTest acc = new ServerAccessTest(i);
			acc.init();
			new Thread( acc ).start();
		}
	}
	
	public void run() {
		while( true ) {
			try {
				if( srv.getValue() > 200 ) {
					log.info( this+": stopping at "+srv.getValue());
					return;
				}
				srv.setValue( srv.getValue() + 1 );
				log.info( this+": val now="+srv.getValue() );
			} catch( IOException ex ) {
				log.log(Level.SEVERE, ex.toString()+": getting new server", ex);
				try {
					srv = acc.getServer();
				} catch (InterruptedException ex1) {
					log.log(Level.SEVERE, ex.toString(), ex);
				} catch (ExecutionException ex1) {
					log.log(Level.SEVERE, ex.toString(), ex);
				}
			}
		}
	}
	
	public String toString() {
		return "ServerAccessTest#"+inst;
	}
	private final int inst;
	public ServerAccessTest(int inst) {
		this.inst = inst;
	}
	ServerAccess<MyService> acc;
	MyService srv;
	private class MyAccess extends ServerAccess<MyService>
			implements Callable<MyService> {
		
		public MyAccess() {
			super( new Class[]{ MyService.class } );
		}

		public FutureTask<MyService> getConnectionCreator() {
			FutureTask<MyService> fut = new FutureTask<MyService>( this );
			log.info("Starting thread for future connection");
			new Thread( fut ).start();
			return fut;
		}

		public MyService call() {
			try {
				log.info("getting new MyService");
				// your example logic goes here returning
				// service you find to be live
				Thread.sleep((int) (3000 * Math.random() ));
			} catch (InterruptedException ex) {
				Logger.getLogger(ServerAccessTest.class.getName()).log(Level.SEVERE, null, ex);
			}
			return svc;
		}
	}
	static MySrvImpl svc = new MySrvImpl();
	private static class MySrvImpl implements MyService {
		volatile int val;
		public int getValue() throws IOException {
			if( Math.random() > .94 )
				throw new IOException("Simuated Failure at val="+val );
			return val;
		}

		public void setValue(int val) throws IOException {
			if( Math.random() < .03 )
				throw new IOException("Simuated Failure at val="+val );
			this.val = val;
		}
	}
	public void init() {
		acc = new MyAccess();
		try {
			srv = acc.getServer();
		} catch (InterruptedException ex) {
			log.log(Level.SEVERE, ex.toString(), ex);
		} catch (ExecutionException ex) {
			log.log(Level.SEVERE, ex.toString(), ex);
		}
	}
	
	public interface MyService extends Remote {
		public int getValue() throws IOException;
		public void setValue( int val ) throws IOException;
	}
}
Norman Elton wrote:
> Thanks all for the responses. I was trying to grapple how a
> ReentrantReadWriteLock would solve my problem, completely skipping
> over the more simple approaches. Brian's idea of an atomic "generation
> identifier" seems to be working like a champ.
> 
> Norman
> 
> On Sun, Jun 14, 2009 at 5:37 PM, David Holmes<davidcholmes at aapt.net.au> wrote:
>> Norman,
>>
>> There are many many ways to set this up. It could be as simple as checking
>> if the current server has changed after a timeout - if so then someone did
>> the change; if not then search for a new server yourself. It all depends on
>> the exact semantics you want - eg whether the new server has to be known to
>> be good before anyone else starts using it; and how long a "down" server is
>> excluded from being used etc. A CountDownLatch used as a simple gate can be
>> used to make other threads wait while a search is in progress, if you don't
>> want to use the lock directly for this purposes. (Using a Semaphore seems
>> tricky because you don't know how many threads may be waiting.)
>>
>> David Holmes
>>
>>> -----Original Message-----
>>> From: concurrency-interest-bounces at cs.oswego.edu
>>> [mailto:concurrency-interest-bounces at cs.oswego.edu]On Behalf Of Norman
>>> Elton
>>> Sent: Monday, 15 June 2009 2:37 AM
>>> To: concurrency-interest at cs.oswego.edu
>>> Subject: [concurrency-interest] Concurrency Architecture
>>>
>>>
>>>
>>> I've been using the java.util.concurrent classes over the past few
>>> years, they've worked well for every circumstance I could throw at
>>> them. Today, I've stumbled upon a case that I just can't figure out
>>> how best to tackle.
>>>
>>> I've got a connection generator which builds and returns connections
>>> to a server. In case of failover, I'm maintaining a list of possible
>>> servers to which it can generate connections. My wrapper
>>> getConnection() method takes this into account by first attempting a
>>> connection to the "current" server. If that times out, it scans the
>>> list for a new "current" server.
>>>
>>> Here's some pseudocode:
>>>
>>> public Connection getConnection() {
>>>       try {
>>>               return this.generator.getConnection();
>>>       } catch (TimeoutException e) {
>>>       }
>>>
>>>       synchronized(this.generator) {
>>>               foreach (this.servers as curr_server) {
>>>                       try {
>>>                               this.generator.server = curr_server;
>>>                               return this.generator.getConnection();
>>>                       } catch (TimeoutException e) {
>>>                       }
>>>               }
>>>       }
>>>
>>>       throw new Exception("Sorry, couldn't find a server");
>>> }
>>>
>>> In this case, if three threads simultaneously call getConnection(),
>>> they will all timeout and hit the synchronized block together. The
>>> first one will identify a new server, but the other two have no way to
>>> know that a new server has been identified. They each will loop
>>> through the possible servers.
>>>
>>> I'm looking for some way for all threads to be able to call the first
>>> part of the procedure (attempt a connection to the current server). If
>>> that fails, then the first thread will attempt to find a new server,
>>> while others wait for the result.
>>>
>>> I've dug through all the possible types of locks, nothing quite meets
>>> the need. Of course, I could be wrong! Can anyone think of a good way
>>> to tackle these requirements?
>>>
>>> Thanks!
>>>
>>> Norman
>>> _______________________________________________
>>> Concurrency-interest mailing list
>>> Concurrency-interest at cs.oswego.edu
>>> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
>>
>>
> 
> _______________________________________________
> Concurrency-interest mailing list
> Concurrency-interest at cs.oswego.edu
> http://cs.oswego.edu/mailman/listinfo/concurrency-interest
> 
> 



More information about the Concurrency-interest mailing list