summaryrefslogtreecommitdiff
path: root/java/common/src
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-07-14 08:15:31 +0000
committerRobert Gemmell <robbie@apache.org>2011-07-14 08:15:31 +0000
commitb4f74c44b73e0e3ca21a0c39487135bdab12e4af (patch)
tree075c9102f312e016022715449bb03f56c8ca9b40 /java/common/src
parent05ebb6d1625b94e240648c2d251f3c524c7ab7d7 (diff)
downloadqpid-python-b4f74c44b73e0e3ca21a0c39487135bdab12e4af.tar.gz
QPID-3345: restore/add ability to use sys props to select the NetworkTransport used to make/accept connections
Applied patch by Keith Wall <keith.wall@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1146594 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java9
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java4
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/Transport.java109
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java9
-rw-r--r--java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java49
-rw-r--r--java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java157
6 files changed, 327 insertions, 10 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index f4e3a10f92..7d17397b2d 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -25,7 +25,6 @@ import static org.apache.qpid.transport.Connection.State.CLOSING;
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
-import static org.apache.qpid.transport.Connection.State.RESUMING;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -41,12 +40,13 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.io.IoNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
@@ -242,10 +242,9 @@ public class Connection extends ConnectionInvoker
userID = settings.getUsername();
delegate = new ClientDelegate(settings);
- securityLayer = new SecurityLayer();
- securityLayer.init(this);
+ securityLayer = new SecurityLayer(this);
- OutgoingNetworkTransport transport = new IoNetworkTransport();
+ OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
NetworkConnection network = transport.connect(settings, receiver, null);
sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize());
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
index 9371835e89..4610c2351e 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkTransport.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.transport.network;
+/**
+ * A network transport is responsible for the establishment of network connections.
+ * NetworkTransport implementations are pluggable via the {@link Transport} class.
+ */
public interface NetworkTransport
{
public void close();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
index 742d6575df..2c10a30f10 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java
@@ -20,7 +20,116 @@
*/
package org.apache.qpid.transport.network;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.transport.TransportException;
+
public class Transport
{
+ public static final String QPID_TRANSPORT_PROPNAME = "qpid.transport";
+ public static final String QPID_TRANSPORT_V0_8_PROPNAME = "qpid.transport.v0_8";
+ public static final String QPID_TRANSPORT_V0_9_PROPNAME = "qpid.transport.v0_9";
+ public static final String QPID_TRANSPORT_V0_9_1_PROPNAME = "qpid.transport.v0_9_1";
+ public static final String QPID_TRANSPORT_V0_10_PROPNAME = "qpid.transport.v0_10";
+ public static final String QPID_BROKER_TRANSPORT_PROPNAME = "qpid.broker.transport";
+
+ // Can't reference the class directly here, as this would preclude the ability to bundle transports separately.
+ private static final String MINA_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.mina.MinaNetworkTransport";
+ private static final String IO_TRANSPORT_CLASSNAME = "org.apache.qpid.transport.network.io.IoNetworkTransport";
+
public static final String TCP = "tcp";
+
+ private final static Map<ProtocolVersion,String> OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP;
+
+ static
+ {
+ final Map<ProtocolVersion,String> map = new HashMap<ProtocolVersion, String>();
+ map.put(ProtocolVersion.v8_0, MINA_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_9, MINA_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_91, MINA_TRANSPORT_CLASSNAME);
+ map.put(ProtocolVersion.v0_10, IO_TRANSPORT_CLASSNAME);
+
+ OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP = Collections.unmodifiableMap(map);
+ }
+
+ public static IncomingNetworkTransport getIncomingTransportInstance()
+ {
+ return (IncomingNetworkTransport) loadTransportClass(
+ System.getProperty(QPID_BROKER_TRANSPORT_PROPNAME, MINA_TRANSPORT_CLASSNAME));
+ }
+
+ public static OutgoingNetworkTransport getOutgoingTransportInstance(
+ final ProtocolVersion protocolVersion)
+ {
+
+ final String overrride = getOverrideClassNameFromSystemProperty(protocolVersion);
+ final String networkTransportClassName;
+ if (overrride != null)
+ {
+ networkTransportClassName = overrride;
+ }
+ else
+ {
+ networkTransportClassName = OUTGOING_PROTOCOL_TO_IMPLDEFAULTS_MAP.get(protocolVersion);
+ }
+
+ return (OutgoingNetworkTransport) loadTransportClass(networkTransportClassName);
+ }
+
+ private static NetworkTransport loadTransportClass(final String networkTransportClassName)
+ {
+ if (networkTransportClassName == null)
+ {
+ throw new IllegalArgumentException("transport class name must not be null");
+ }
+
+ try
+ {
+ final Class<?> clazz = Class.forName(networkTransportClassName);
+ return (NetworkTransport) clazz.newInstance();
+ }
+ catch (InstantiationException e)
+ {
+ throw new TransportException("Unable to instantiate transport class " + networkTransportClassName, e);
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new TransportException("Access exception " + networkTransportClassName, e);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new TransportException("Unable to load transport class " + networkTransportClassName, e);
+ }
+ }
+
+ private static String getOverrideClassNameFromSystemProperty(final ProtocolVersion protocolVersion)
+ {
+ final String protocolSpecificSystemProperty;
+
+ if (ProtocolVersion.v0_10.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_10_PROPNAME;
+ }
+ else if (ProtocolVersion.v0_91.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_1_PROPNAME;
+ }
+ else if (ProtocolVersion.v0_9.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_9_PROPNAME;
+ }
+ else if (ProtocolVersion.v8_0.equals(protocolVersion))
+ {
+ protocolSpecificSystemProperty = QPID_TRANSPORT_V0_8_PROPNAME;
+ }
+ else
+ {
+ throw new IllegalArgumentException("Unknown ProtocolVersion " + protocolVersion);
+ }
+
+ return System.getProperty(protocolSpecificSystemProperty, System.getProperty(QPID_TRANSPORT_PROPNAME));
+ }
}
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
index 3f0966903d..69e4b52edb 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
@@ -43,8 +43,8 @@ public class SecurityLayer
Connection con;
SSLSecurityLayer sslLayer;
SASLSecurityLayer saslLayer;
-
- public void init(Connection con) throws TransportException
+
+ public SecurityLayer(Connection con)
{
this.con = con;
this.settings = con.getConnectionSettings();
@@ -55,10 +55,9 @@ public class SecurityLayer
if (settings.isUseSASLEncryption())
{
saslLayer = new SASLSecurityLayer();
- }
-
+ }
}
-
+
public Sender<ByteBuffer> sender(Sender<ByteBuffer> delegate)
{
Sender<ByteBuffer> sender = delegate;
diff --git a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
index 808374b06e..89542e8125 100644
--- a/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
+++ b/java/common/src/test/java/org/apache/qpid/test/utils/QpidTestCase.java
@@ -25,7 +25,9 @@ import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import junit.framework.TestCase;
import junit.framework.TestResult;
@@ -37,6 +39,8 @@ public class QpidTestCase extends TestCase
{
protected static final Logger _logger = Logger.getLogger(QpidTestCase.class);
+ private final Map<String, String> _propertiesSetForTest = new HashMap<String, String>();
+
/**
* Some tests are excluded when the property test.excludes is set to true.
* An exclusion list is either a file (prop test.excludesfile) which contains one test name
@@ -133,4 +137,49 @@ public class QpidTestCase extends TestCase
{
return AvailablePortFinder.getNextAvailable(10000);
}
+
+ /**
+ * Set a System property for duration of this test only. The tearDown will
+ * guarantee to reset the property to its previous value after the test
+ * completes.
+ *
+ * @param property The property to set
+ * @param value the value to set it to.
+ */
+ protected void setTestSystemProperty(String property, String value)
+ {
+ if (!_propertiesSetForTest.containsKey(property))
+ {
+ // Record the current value so we can revert it later.
+ _propertiesSetForTest.put(property, System.getProperty(property));
+ }
+
+ System.setProperty(property, value);
+ }
+
+ /**
+ * Restore the System property values that were set by this test run.
+ */
+ protected void revertTestSystemProperties()
+ {
+ for (String key : _propertiesSetForTest.keySet())
+ {
+ String value = _propertiesSetForTest.get(key);
+ if (value != null)
+ {
+ System.setProperty(key, value);
+ }
+ else
+ {
+ System.clearProperty(key);
+ }
+ }
+
+ _propertiesSetForTest.clear();
+ }
+
+ protected void tearDown() throws java.lang.Exception
+ {
+ revertTestSystemProperties();
+ }
}
diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
new file mode 100644
index 0000000000..4e504c69eb
--- /dev/null
+++ b/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.io.IoNetworkTransport;
+import org.apache.qpid.transport.network.mina.MinaNetworkTransport;
+
+public class TransportTest extends QpidTestCase
+{
+
+
+
+ public void testDefaultGetOutgoingTransportForv0_8() throws Exception
+ {
+ final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0);
+ assertNotNull(networkTransport);
+ assertTrue(networkTransport instanceof MinaNetworkTransport);
+ }
+
+ public void testGloballyOverriddenOutgoingTransportForv0_8() throws Exception
+ {
+ setTestSystemProperty(Transport.QPID_TRANSPORT_PROPNAME, TestOutgoingNetworkTransport.class.getName());
+
+ final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0);
+ assertNotNull(networkTransport);
+ assertTrue(networkTransport instanceof TestOutgoingNetworkTransport);
+ }
+
+ public void testProtocolSpecificOverriddenOutgoingTransportForv0_8() throws Exception
+ {
+ setTestSystemProperty(Transport.QPID_TRANSPORT_V0_8_PROPNAME, TestOutgoingNetworkTransport.class.getName());
+
+ final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v8_0);
+ assertNotNull(networkTransport);
+ assertTrue(networkTransport instanceof TestOutgoingNetworkTransport);
+ }
+
+ public void testDefaultGetOutgoingTransportForv0_10() throws Exception
+ {
+ final OutgoingNetworkTransport networkTransport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
+ assertNotNull(networkTransport);
+ assertTrue(networkTransport instanceof IoNetworkTransport);
+ }
+
+ public void testDefaultGetIncomingTransport() throws Exception
+ {
+ final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance();
+ assertNotNull(networkTransport);
+ assertTrue(networkTransport instanceof MinaNetworkTransport);
+ }
+
+ public void testOverriddenGetIncomingTransport() throws Exception
+ {
+ setTestSystemProperty(Transport.QPID_BROKER_TRANSPORT_PROPNAME, TestIncomingNetworkTransport.class.getName());
+
+ final IncomingNetworkTransport networkTransport = Transport.getIncomingTransportInstance();
+ assertNotNull(networkTransport);
+ assertTrue(networkTransport instanceof TestIncomingNetworkTransport);
+ }
+
+ public void testInvalidOutgoingTransportClassName() throws Exception
+ {
+ setTestSystemProperty(Transport.QPID_TRANSPORT_PROPNAME, "invalid");
+
+ try
+ {
+ Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
+ fail("Should have failed to load the invalid class");
+ }
+ catch(TransportException te)
+ {
+ //expected, ignore
+ }
+ }
+
+ public void testInvalidOutgoingTransportProtocolVersion() throws Exception
+ {
+ try
+ {
+ Transport.getOutgoingTransportInstance(new ProtocolVersion((byte)0, (byte)0));
+ fail("Should have failed to load the transport for invalid protocol version");
+ }
+ catch(IllegalArgumentException iae)
+ {
+ //expected, ignore
+ }
+ }
+
+ public static class TestOutgoingNetworkTransport implements OutgoingNetworkTransport
+ {
+
+ public void close()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public NetworkConnection getConnection()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate, SSLContextFactory sslFactory)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ public static class TestIncomingNetworkTransport implements IncomingNetworkTransport
+ {
+
+ public void close()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public NetworkConnection getConnection()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory, SSLContextFactory sslFactory)
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+}