summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-07-15 17:03:34 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-07-15 17:03:34 +0000
commitc4198fca48124f3adeb14792fed0b7246bc356f4 (patch)
treea5a9510916e794c4b62aa5cbe3f2ecca566a39d8 /java/client/src
parent119535d09f09e3bd3de852eea19c2db475673b2d (diff)
downloadqpid-python-c4198fca48124f3adeb14792fed0b7246bc356f4.tar.gz
QPID-984 : Applied fix from M2.1.x that adds requried synchronization around setup and tear down of Connections.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@676973 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java179
1 files changed, 81 insertions, 98 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
index a871c754b5..dbb9503ccf 100644
--- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
+++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
@@ -40,7 +40,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.net.Socket;
-
/**
* The TransportConnection is a helper class responsible for connecting to an AMQ server. It sets up the underlying
* connector, which currently always uses TCP/IP sockets. It creates the "protocol handler" which deals with MINA
@@ -85,38 +84,18 @@ public class TransportConnection
throw new AMQNoTransportForProtocolException(details, null, null);
}
- /* if (transport == _currentInstance)
- {
- if (transport == VM)
- {
- if (_currentVMPort == details.getPort())
- {
- return _instance;
- }
- }
- else
- {
- return _instance;
- }
- }
-
- _currentInstance = transport;*/
-
- ITransportConnection instance;
switch (transport)
{
case SOCKET:
- instance =
- new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
- {
- return new ExistingSocketConnector();
- }
- });
- break;
+ return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new ExistingSocketConnector();
+ }
+ });
case TCP:
- instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
public IoConnector newSocketConnector()
{
@@ -125,8 +104,8 @@ public class TransportConnection
if (Boolean.getBoolean("qpidnio"))
{
_logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
- ? "Qpid NIO is new default"
- : "Sysproperty 'qpidnio' is set"));
+ ? "Qpid NIO is new default"
+ : "Sysproperty 'qpidnio' is set"));
result = new MultiThreadSocketConnector();
}
else
@@ -141,18 +120,13 @@ public class TransportConnection
return result;
}
});
- break;
case VM:
{
- instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
- break;
+ return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
}
default:
- // FIXME: TGM
- throw new AMQNoTransportForProtocolException(details, null, null);
+ throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null);
}
-
- return instance;
}
private static int getTransport(String transport)
@@ -180,13 +154,21 @@ public class TransportConnection
{
int port = details.getPort();
- if (!_inVmPipeAddress.containsKey(port))
+ synchronized (_inVmPipeAddress)
{
- if (AutoCreate)
+ if (!_inVmPipeAddress.containsKey(port))
{
if (AutoCreate)
{
- createVMBroker(port);
+ if (AutoCreate)
+ {
+ createVMBroker(port);
+ }
+ else
+ {
+ throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+ + " does not exist. Auto create disabled.", null);
+ }
}
else
{
@@ -194,11 +176,6 @@ public class TransportConnection
+ " does not exist. Auto create disabled.", null);
}
}
- else
- {
- throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
- + " does not exist. Auto create disabled.", null);
- }
}
return new VmPipeTransportConnection(port);
@@ -214,70 +191,73 @@ public class TransportConnection
config.setThreadModel(ReadWriteThreadModel.getInstance());
}
-
- if (!_inVmPipeAddress.containsKey(port))
+ synchronized (_inVmPipeAddress)
{
- _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
- IoHandlerAdapter provider = null;
- try
- {
- VmPipeAddress pipe = new VmPipeAddress(port);
-
- provider = createBrokerInstance(port);
-
- _acceptor.bind(pipe, provider);
- _inVmPipeAddress.put(port, pipe);
- _logger.info("Created InVM Qpid.AMQP listening on port " + port);
- }
- catch (IOException e)
+ if (!_inVmPipeAddress.containsKey(port))
{
- _logger.error("Got IOException.", e);
-
- // Try and unbind provider
+ _logger.info("Creating InVM Qpid.AMQP listening on port " + port);
+ IoHandlerAdapter provider = null;
try
{
VmPipeAddress pipe = new VmPipeAddress(port);
- try
- {
- _acceptor.unbind(pipe);
- }
- catch (Exception ignore)
- {
- // ignore
- }
-
- if (provider == null)
- {
- provider = createBrokerInstance(port);
- }
+ provider = createBrokerInstance(port);
_acceptor.bind(pipe, provider);
+
_inVmPipeAddress.put(port, pipe);
_logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
- catch (IOException justUseFirstException)
+ catch (IOException e)
{
- String because;
- if (e.getCause() == null)
+ _logger.error("Got IOException.", e);
+
+ // Try and unbind provider
+ try
{
- because = e.toString();
+ VmPipeAddress pipe = new VmPipeAddress(port);
+
+ try
+ {
+ _acceptor.unbind(pipe);
+ }
+ catch (Exception ignore)
+ {
+ // ignore
+ }
+
+ if (provider == null)
+ {
+ provider = createBrokerInstance(port);
+ }
+
+ _acceptor.bind(pipe, provider);
+ _inVmPipeAddress.put(port, pipe);
+ _logger.info("Created InVM Qpid.AMQP listening on port " + port);
}
- else
+ catch (IOException justUseFirstException)
{
- because = e.getCause().toString();
- }
+ String because;
+ if (e.getCause() == null)
+ {
+ because = e.toString();
+ }
+ else
+ {
+ because = e.getCause().toString();
+ }
- throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+ throw new AMQVMBrokerCreationException(null, port, because + " Stopped binding of InVM Qpid.AMQP", e);
+ }
}
+
+ }
+ else
+ {
+ _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
}
}
- else
- {
- _logger.info("InVM Qpid.AMQP on port " + port + " already exits.");
- }
-
}
private static IoHandlerAdapter createBrokerInstance(int port) throws AMQVMBrokerCreationException
@@ -324,7 +304,7 @@ public class TransportConnection
_logger.info("Killing all VM Brokers");
if (_acceptor != null)
{
- _acceptor.unbindAll();
+ _acceptor.unbindAll();
}
synchronized (_inVmPipeAddress)
{
@@ -337,14 +317,17 @@ public class TransportConnection
public static void killVMBroker(int port)
{
- VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
- if (pipe != null)
+ synchronized (_inVmPipeAddress)
{
- _logger.info("Killing VM Broker:" + port);
- _inVmPipeAddress.remove(port);
- // This does need to be sychronized as otherwise mina can hang
- // if a new connection is made
- _acceptor.unbind(pipe);
+ VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port);
+ if (pipe != null)
+ {
+ _logger.info("Killing VM Broker:" + port);
+ _inVmPipeAddress.remove(port);
+ // This does need to be sychronized as otherwise mina can hang
+ // if a new connection is made
+ _acceptor.unbind(pipe);
+ }
}
}