diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2008-07-15 17:03:34 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2008-07-15 17:03:34 +0000 |
| commit | c4198fca48124f3adeb14792fed0b7246bc356f4 (patch) | |
| tree | a5a9510916e794c4b62aa5cbe3f2ecca566a39d8 /java/client/src | |
| parent | 119535d09f09e3bd3de852eea19c2db475673b2d (diff) | |
| download | qpid-python-c4198fca48124f3adeb14792fed0b7246bc356f4.tar.gz | |
QPID-984 : Applied fix from M2.1.x that adds requried synchronization around setup and tear down of Connections.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@676973 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java | 179 |
1 files changed, 81 insertions, 98 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index a871c754b5..dbb9503ccf 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -40,7 +40,6 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.net.Socket; - /** * The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying * connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA @@ -85,38 +84,18 @@ public class TransportConnection throw new AMQNoTransportForProtocolException(details, null, null); } - /* if (transport == _currentInstance) - { - if (transport == VM) - { - if (_currentVMPort == details.getPort()) - { - return _instance; - } - } - else - { - return _instance; - } - } - - _currentInstance = transport;*/ - - ITransportConnection instance; switch (transport) { case SOCKET: - instance = - new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() - { - return new ExistingSocketConnector(); - } - }); - break; + return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + { + public IoConnector newSocketConnector() + { + return new ExistingSocketConnector(); + } + }); case TCP: - instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() + return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() { public IoConnector newSocketConnector() { @@ -125,8 +104,8 @@ public class TransportConnection if (Boolean.getBoolean("qpidnio")) { _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio") - ? "Qpid NIO is new default" - : "Sysproperty 'qpidnio' is set")); + ? "Qpid NIO is new default" + : "Sysproperty 'qpidnio' is set")); result = new MultiThreadSocketConnector(); } else @@ -141,18 +120,13 @@ public class TransportConnection return result; } }); - break; case VM: { - instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - break; + return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); } default: - // FIXME: TGM - throw new AMQNoTransportForProtocolException(details, null, null); + throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null); } - - return instance; } private static int getTransport(String transport) @@ -180,13 +154,21 @@ public class TransportConnection { int port = details.getPort(); - if (!_inVmPipeAddress.containsKey(port)) + synchronized (_inVmPipeAddress) { - if (AutoCreate) + if (!_inVmPipeAddress.containsKey(port)) { if (AutoCreate) { - createVMBroker(port); + if (AutoCreate) + { + createVMBroker(port); + } + else + { + throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port + + " does not exist. Auto create disabled.", null); + } } else { @@ -194,11 +176,6 @@ public class TransportConnection + " does not exist. Auto create disabled.", null); } } - else - { - throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); - } } return new VmPipeTransportConnection(port); @@ -214,70 +191,73 @@ public class TransportConnection config.setThreadModel(ReadWriteThreadModel.getInstance()); } - - if (!_inVmPipeAddress.containsKey(port)) + synchronized (_inVmPipeAddress) { - _logger.info("Creating InVM Qpid.AMQP listening on port " + port); - IoHandlerAdapter provider = null; - try - { - VmPipeAddress pipe = new VmPipeAddress(port); - - provider = createBrokerInstance(port); - - _acceptor.bind(pipe, provider); - _inVmPipeAddress.put(port, pipe); - _logger.info("Created InVM Qpid.AMQP listening on port " + port); - } - catch (IOException e) + if (!_inVmPipeAddress.containsKey(port)) { - _logger.error("Got IOException.", e); - - // Try and unbind provider + _logger.info("Creating InVM Qpid.AMQP listening on port " + port); + IoHandlerAdapter provider = null; try { VmPipeAddress pipe = new VmPipeAddress(port); - try - { - _acceptor.unbind(pipe); - } - catch (Exception ignore) - { - // ignore - } - - if (provider == null) - { - provider = createBrokerInstance(port); - } + provider = createBrokerInstance(port); _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } - catch (IOException justUseFirstException) + catch (IOException e) { - String because; - if (e.getCause() == null) + _logger.error("Got IOException.", e); + + // Try and unbind provider + try { - because = e.toString(); + VmPipeAddress pipe = new VmPipeAddress(port); + + try + { + _acceptor.unbind(pipe); + } + catch (Exception ignore) + { + // ignore + } + + if (provider == null) + { + provider = createBrokerInstance(port); + } + + _acceptor.bind(pipe, provider); + _inVmPipeAddress.put(port, pipe); + _logger.info("Created InVM Qpid.AMQP listening on port " + port); } - else + catch (IOException justUseFirstException) { - because = e.getCause().toString(); - } + String because; + if (e.getCause() == null) + { + because = e.toString(); + } + else + { + because = e.getCause().toString(); + } - throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); + throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e); + } } + + } + else + { + _logger.info("InVM Qpid.AMQP on port " + port + " already exits."); } } - else - { - _logger.info("InVM Qpid.AMQP on port " + port + " already exits."); - } - } private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException @@ -324,7 +304,7 @@ public class TransportConnection _logger.info("Killing all VM Brokers"); if (_acceptor != null) { - _acceptor.unbindAll(); + _acceptor.unbindAll(); } synchronized (_inVmPipeAddress) { @@ -337,14 +317,17 @@ public class TransportConnection public static void killVMBroker(int port) { - VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); - if (pipe != null) + synchronized (_inVmPipeAddress) { - _logger.info("Killing VM Broker:" + port); - _inVmPipeAddress.remove(port); - // This does need to be sychronized as otherwise mina can hang - // if a new connection is made - _acceptor.unbind(pipe); + VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); + if (pipe != null) + { + _logger.info("Killing VM Broker:" + port); + _inVmPipeAddress.remove(port); + // This does need to be sychronized as otherwise mina can hang + // if a new connection is made + _acceptor.unbind(pipe); + } } } |
