diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:10:30 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:10:30 +0000 |
| commit | 245f2793e0a4efd4876ad72b2cf32edc93750d84 (patch) | |
| tree | b5fd72fdea830222b314029b13062cbd690e8d2e /java | |
| parent | b4f9004439f56f492931f4b35f7fa0ae58f3ff85 (diff) | |
| download | qpid-python-245f2793e0a4efd4876ad72b2cf32edc93750d84.tar.gz | |
QPID-3342: transition TCP based Mina transport for 0-8/0-9/0-9-1 protocols over to new IO interface model
Applied patch by Keith Wall and myself
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1143867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
47 files changed, 1189 insertions, 1766 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index 08e6a24153..3d21afe279 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -44,6 +46,7 @@ import org.apache.log4j.xml.QpidLog4JConfigurator; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration; import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; import org.apache.qpid.server.information.management.ServerInformationMBean; import org.apache.qpid.server.logging.SystemOutMessageLogger; @@ -59,8 +62,11 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.mina.MinaNetworkTransport; /** * Main entry point for AMQPD. @@ -370,7 +376,7 @@ public class Main - if (bindAddr.equals("wildcard")) + if (bindAddr.equals(WILDCARD_ADDRESS)) { bindAddress = new InetSocketAddress(0).getAddress(); } @@ -386,15 +392,12 @@ public class Main String keystorePassword = serverConfig.getKeystorePassword(); String certType = serverConfig.getCertType(); SSLContextFactory sslFactory = null; - + if (!serverConfig.getSSLOnly()) { for(int port : ports) { - - NetworkDriver driver = new MINANetworkDriver(); - Set<VERSION> supported = EnumSet.allOf(VERSION.class); if(exclude_0_10.contains(port)) @@ -415,26 +418,23 @@ public class Main supported.remove(VERSION.v0_8); } + NetworkTransportConfiguration settings = + new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); + + IncomingNetworkTransport transport = new MinaNetworkTransport(); MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(hostName, supported); - - - driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory, - serverConfig.getNetworkConfiguration(), null); + transport.accept(settings, protocolEngineFactory, sslFactory); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(driver,"TCP")); + new QpidAcceptor(transport, Transport.TCP)); CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); - } } if (serverConfig.getEnableSSL()) { - sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); - NetworkDriver driver = new MINANetworkDriver(); - String sslPort = commandLine.getOptionValue("s"); int port = 0; if (null != sslPort) @@ -446,10 +446,17 @@ public class Main port = serverConfig.getSSLPort(); } - driver.bind(port, new InetAddress[]{bindAddress}, - new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); + NetworkTransportConfiguration settings = + new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); + + sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + + IncomingNetworkTransport transport = new MinaNetworkTransport(); + + transport.accept(settings, new AMQProtocolEngineFactory(), sslFactory); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(driver,"TCP")); + new QpidAcceptor(transport,"TCP")); CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", port)); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 5908eb4bd8..23ab5e8222 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -20,6 +20,8 @@ package org.apache.qpid.server.configuration; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + import java.io.File; import java.util.Collections; import java.util.HashMap; @@ -42,7 +44,7 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriverConfiguration; +import org.apache.qpid.transport.NetworkTransportConfiguration; import sun.misc.Signal; import sun.misc.SignalHandler; @@ -61,7 +63,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public static final int DEFAULT_SSL_PORT = 8672; public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; public static final int DEFAULT_JMXPORT = 8999; - + public static final String QPID_HOME = "QPID_HOME"; public static final String QPID_WORK = "QPID_WORK"; public static final String LIB_DIR = "lib"; @@ -629,7 +631,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getLongValue("flowResumeCapacity", getCapacity()); } - public int getProcessors() + public int getConnectorProcessors() { return getIntValue("connector.processors", 4); } @@ -661,7 +663,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public String getBind() { - return getStringValue("connector.bind", "wildcard"); + return getStringValue("connector.bind", WILDCARD_ADDRESS); } public int getReceiveBufferSize() @@ -771,58 +773,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getConfig().getBoolean("statistics.reporting.reset", false); } - public NetworkDriverConfiguration getNetworkConfiguration() - { - return new NetworkDriverConfiguration() - { - - public Integer getTrafficClass() - { - return null; - } - - public Boolean getTcpNoDelay() - { - return ServerConfiguration.this.getTcpNoDelay(); - } - - public Integer getSoTimeout() - { - return null; - } - - public Integer getSoLinger() - { - return null; - } - - public Integer getSendBufferSize() - { - return ServerConfiguration.this.getWriteBufferSize(); - } - - public Boolean getReuseAddress() - { - return null; - } - - public Integer getReceiveBufferSize() - { - return ServerConfiguration.this.getReceiveBufferSize(); - } - - public Boolean getOOBInline() - { - return null; - } - - public Boolean getKeepAlive() - { - return null; - } - }; - } - public int getMaxChannelCount() { return getIntValue("maximumChannelCount", 256); diff --git a/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java new file mode 100644 index 0000000000..81dfcb4465 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration; + +import org.apache.qpid.transport.NetworkTransportConfiguration; + +public class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration +{ + private final ServerConfiguration _serverConfig; + private final int _port; + private final String _host; + private final String _transport; + + public ServerNetworkTransportConfiguration(final ServerConfiguration serverConfig, + final int port, final String host, + final String transport) + { + _serverConfig = serverConfig; + _port = port; + _host = host; + _transport = transport; + } + + public Boolean getTcpNoDelay() + { + return _serverConfig.getTcpNoDelay(); + } + + public Integer getSendBufferSize() + { + return _serverConfig.getWriteBufferSize(); + } + + public Integer getReceiveBufferSize() + { + return _serverConfig.getReceiveBufferSize(); + } + + public Integer getPort() + { + return _port; + } + + public String getHost() + { + return _host; + } + + public String getTransport() + { + return _transport; + } + + public Integer getConnectorProcessors() + { + return _serverConfig.getConnectorProcessors(); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 449f698c48..9c02cd9d2f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -94,8 +94,8 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig { @@ -156,8 +156,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private AMQPConnectionActor _actor; private LogSubject _logSubject; - private NetworkDriver _networkDriver; - private long _lastIoTime; private long _writtenBytes; @@ -177,20 +175,23 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private boolean _statisticsEnabled = false; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + private final NetworkConnection _network; + private final Sender<ByteBuffer> _sender; + public ManagedObject getManagedObject() { return _managedObject; } - public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver) + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network) { _stateManager = new AMQStateManager(virtualHostRegistry, this); - _networkDriver = driver; - _codecFactory = new AMQCodecFactory(true, this); _poolReference.acquireExecutorService(); _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); + _network = network; + _sender = _network.getSender(); _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -368,14 +369,14 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol null, mechanisms.getBytes(), locales.getBytes()); - _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer()); + _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); } catch (AMQException e) { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); } } @@ -496,7 +497,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { public void run() { - _networkDriver.send(buf); + _sender.send(buf); } }); } @@ -688,8 +689,8 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { if (delay > 0) { - _networkDriver.setMaxWriteIdle(delay); - _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + _network.setMaxWriteIdle(delay); + _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); } } @@ -793,7 +794,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public void closeProtocolSession() { - _networkDriver.close(); + _sender.close(); try { _stateManager.changeState(AMQState.CONNECTION_CLOSED); @@ -828,7 +829,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol */ public String getLocalFQDN() { - SocketAddress address = _networkDriver.getLocalAddress(); + SocketAddress address = _network.getLocalAddress(); // we use the vmpipe address in some tests hence the need for this rather ugly test. The host // information is used by SASL primary. if (address instanceof InetSocketAddress) @@ -917,7 +918,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public Object getClientIdentifier() { - return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null; + return _network.getRemoteAddress(); } public VirtualHost getVirtualHost() @@ -976,12 +977,12 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public MethodRegistry getMethodRegistry() @@ -1011,14 +1012,9 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol // Nothing } - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - public void writerIdle() { - _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer()); + _sender.send(HeartbeatBody.FRAME.toNioByteBuffer()); } public void exception(Throwable throwable) @@ -1026,7 +1022,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol if (throwable instanceof AMQProtocolHeaderException) { writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - _networkDriver.close(); + _sender.close(); _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); } @@ -1044,7 +1040,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol writeFrame(closeBody.generateFrame(0)); - _networkDriver.close(); + _sender.close(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java index 0e4444725e..c8bdcd377b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java @@ -25,7 +25,7 @@ import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolEngineFactory implements ProtocolEngineFactory { @@ -42,9 +42,8 @@ public class AMQProtocolEngineFactory implements ProtocolEngineFactory } - public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + public ProtocolEngine newProtocolEngine(NetworkConnection network) { - return new AMQProtocolEngine(_vhosts, networkDriver); + return new AMQProtocolEngine(_vhosts, network); } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index eb957ee33c..064d8f19a6 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -27,7 +27,8 @@ import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -37,28 +38,24 @@ public class MultiVersionProtocolEngine implements ProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); - - - private NetworkDriver _networkDriver; private Set<VERSION> _supported; private String _fqdn; private IApplicationRegistry _appRegistry; - + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; + private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine(); public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, String fqdn, - Set<VERSION> supported, NetworkDriver networkDriver) + Set<VERSION> supported, + NetworkConnection network) { _appRegistry = appRegistry; _fqdn = fqdn; _supported = supported; - _networkDriver = networkDriver; - } - - public void setNetworkDriver(NetworkDriver driver) - { - _delegate.setNetworkDriver(driver); + _network = network; + _sender = _network.getSender(); } public SocketAddress getRemoteAddress() @@ -175,7 +172,7 @@ private static final byte[] AMQP_0_9_1_HEADER = public ProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); } }; @@ -195,7 +192,7 @@ private static final byte[] AMQP_0_9_1_HEADER = public ProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); } }; @@ -215,7 +212,7 @@ private static final byte[] AMQP_0_9_1_HEADER = public ProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); } }; @@ -242,7 +239,7 @@ private static final byte[] AMQP_0_9_1_HEADER = ServerConnection conn = new ServerConnection(); conn.setConnectionDelegate(connDelegate); - return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry); + return new ProtocolEngine_0_10( conn, _network, _appRegistry); } }; @@ -252,19 +249,14 @@ private static final byte[] AMQP_0_9_1_HEADER = private class ClosedDelegateProtocolEngine implements ProtocolEngine { - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getWrittenBytes() @@ -305,22 +297,16 @@ private static final byte[] AMQP_0_9_1_HEADER = private class SelfDelegateProtocolEngine implements ProtocolEngine { - private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getWrittenBytes() @@ -380,14 +366,12 @@ private static final byte[] AMQP_0_9_1_HEADER = // If no delegate is found then send back the most recent support protocol version id if(newDelegate == null) { - _networkDriver.send(ByteBuffer.wrap(newestSupported)); + _sender.send(ByteBuffer.wrap(newestSupported)); _delegate = new ClosedDelegateProtocolEngine(); } else { - newDelegate.setNetworkDriver(_networkDriver); - _delegate = newDelegate; _header.flip(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 75358c42d9..09b31f2cc8 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -20,21 +20,18 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.protocol.ProtocolEngineFactory; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; - -import java.util.Set; -import java.util.Arrays; -import java.util.HashSet; +import org.apache.qpid.transport.network.NetworkConnection; public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { - ; - - public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 }; private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values())); @@ -68,8 +65,8 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory } - public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + public ProtocolEngine newProtocolEngine(NetworkConnection network) { - return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver); + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 30d506a89b..42a604e3a5 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -21,13 +21,12 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.transport.ServerConnection; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -38,7 +37,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine { public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - private NetworkDriver _networkDriver; + private NetworkConnection _network; private long _readBytes; private long _writtenBytes; private ServerConnection _connection; @@ -47,26 +46,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine private long _createTime = System.currentTimeMillis(); public ProtocolEngine_0_10(ServerConnection conn, - NetworkDriver networkDriver, + NetworkConnection network, final IApplicationRegistry appRegistry) { super(new Assembler(conn)); _connection = conn; _connection.setConnectionConfig(this); - _networkDriver = networkDriver; + _network = network; _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; - // FIXME Two log messages to maintain compatinbility with earlier protocol versions - _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); - _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); - } - - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE); - _connection.setSender(dis); + _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE)); _connection.onOpen(new Runnable() { public void run() @@ -75,16 +65,19 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine } }); + // FIXME Two log messages to maintain compatibility with earlier protocol versions + _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); + _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); } public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getReadBytes() diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index b6df0cc0a6..924392eed4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -483,7 +483,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { - acceptor.getNetworkDriver().close(); + acceptor.getNetworkTransport().close(); } catch (Throwable e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java index 4a4253153c..108533ef96 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java @@ -71,7 +71,7 @@ public class BrokerConfigAdapter implements BrokerConfig public Integer getWorkerThreads() { - return _instance.getConfiguration().getProcessors(); + return _instance.getConfiguration().getConnectorProcessors(); } public Integer getMaxConnections() diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java index 3ca22b60c8..abbc5a3805 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java @@ -20,21 +20,21 @@ */ package org.apache.qpid.server.transport; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.NetworkTransport; public class QpidAcceptor { - NetworkDriver _driver; + NetworkTransport _transport; String _protocol; - public QpidAcceptor(NetworkDriver driver, String protocol) + public QpidAcceptor(NetworkTransport transport, String protocol) { - _driver = driver; + _transport = transport; _protocol = protocol; } - public NetworkDriver getNetworkDriver() + public NetworkTransport getNetworkTransport() { - return _driver; + return _transport; } public String toString() diff --git a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 484f93cb88..f8640d136f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.configuration; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -453,13 +455,13 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); serverConfig.initialise(); - assertEquals(4, serverConfig.getProcessors()); + assertEquals(4, serverConfig.getConnectorProcessors()); // Check value we set _config.setProperty("connector.processors", 10); serverConfig = new ServerConfiguration(_config); serverConfig.initialise(); - assertEquals(10, serverConfig.getProcessors()); + assertEquals(10, serverConfig.getConnectorProcessors()); } public void testGetPort() throws ConfigurationException @@ -486,7 +488,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); serverConfig.initialise(); - assertEquals("wildcard", serverConfig.getBind()); + assertEquals(WILDCARD_ADDRESS, serverConfig.getBind()); // Check value we set _config.setProperty("connector.bind", "a"); diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 3b6cd37ea9..2b724af2b1 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -21,7 +21,11 @@ package org.apache.qpid.server.protocol; import java.security.Principal; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.AMQException; @@ -30,14 +34,13 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.transport.TestNetworkConnection; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter { @@ -47,7 +50,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException { - super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver()); + super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection()); _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 62ceb68208..3acd064fd7 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -122,6 +122,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase } catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); } } diff --git a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java b/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java deleted file mode 100644 index d7eb138523..0000000000 --- a/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.example.transport; - -import org.apache.qpid.AMQException; -import org.apache.qpid.client.AMQConnection; -import org.apache.qpid.client.transport.TransportConnection; -import org.apache.qpid.jms.ConnectionListener; -import org.apache.qpid.url.URLSyntaxException; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.channels.SocketChannel; -import java.util.UUID; - -/** - * This is a simple application that demonstrates how you can use the Qpid AMQP interfaces to use existing sockets as - * the transport for the Client API. - * - * The Demo here runs twice: - * 1. Just to show a simple publish and receive. - * 2. To demonstrate how to use existing sockets and utilise the underlying client failover mechnaism. - */ -public class ExistingSocketConnectorDemo implements ConnectionListener -{ - private static boolean DEMO_FAILOVER = false; - - public static void main(String[] args) throws IOException, URLSyntaxException, AMQException, JMSException - { - System.out.println("Testing socket connection to localhost:5672."); - - new ExistingSocketConnectorDemo(); - - System.out.println("Testing socket connection failover between localhost:5672 and localhost:5673."); - - DEMO_FAILOVER = true; - - new ExistingSocketConnectorDemo(); - } - - Connection _connection; - MessageProducer _producer; - Session _session; - - String Socket1_ID = UUID.randomUUID().toString(); - String Socket2_ID = UUID.randomUUID().toString(); - - - - /** Here we can see the broker we are connecting to is set to be 'socket:///' signifying we will provide the socket. */ - public final String CONNECTION = "amqp://guest:guest@id/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID + "'"; - - - public ExistingSocketConnectorDemo() throws IOException, URLSyntaxException, AMQException, JMSException - { - - Socket socket = SocketChannel.open().socket(); - socket.connect(new InetSocketAddress("localhost", 5672)); - - TransportConnection.registerOpenSocket(Socket1_ID, socket); - - - _connection = new AMQConnection(CONNECTION); - - _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageConsumer consumer = _session.createConsumer(_session.createQueue("Queue")); - - _producer = _session.createProducer(_session.createQueue("Queue")); - - _connection.start(); - - if (!DEMO_FAILOVER) - { - _producer.send(_session.createTextMessage("Simple Test")); - } - else - { - // Using the Qpid interfaces we can set a listener that allows us to demonstrate failover - ((AMQConnection) _connection).setConnectionListener(this); - - System.out.println("Testing failover: Please ensure second broker running on localhost:5673 and shutdown broker on 5672."); - } - - //We do a blocking receive here so that we can demonstrate failover. - Message message = consumer.receive(); - - System.out.println("Recevied :" + message); - - _connection.close(); - } - - // ConnectionListener Interface - - public void bytesSent(long count) - { - //not used in this example - } - public void bytesReceived(long count) - { - //not used in this example - } - - public boolean preFailover(boolean redirect) - { - /** - * This method is called before the underlying client library starts to reconnect. This gives us the opportunity - * to set a new socket for the failover to occur on. - */ - try - { - Socket socket = SocketChannel.open().socket(); - - socket.connect(new InetSocketAddress("localhost", 5673)); - - // This is the new method to pass in an open socket for the connection to use. - TransportConnection.registerOpenSocket(Socket2_ID, socket); - } - catch (IOException e) - { - e.printStackTrace(); - return false; - } - return true; - } - - public boolean preResubscribe() - { - //not used in this example - but must return true to allow the resubscription of existing clients. - return true; - } - - public void failoverComplete() - { - // Now that failover has completed we can send a message that the receiving thread will pick up - try - { - _producer.send(_session.createTextMessage("Simple Failover Test")); - } - catch (JMSException e) - { - e.printStackTrace(); - } - } -} diff --git a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java b/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java deleted file mode 100644 index 98716c0c3c..0000000000 --- a/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java +++ /dev/null @@ -1,478 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.mina.transport.socket.nio; - -import edu.emory.mathcs.backport.java.util.concurrent.Executor; -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.ExceptionMonitor; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoConnectorConfig; -import org.apache.mina.common.IoHandler; -import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.common.support.BaseIoConnector; -import org.apache.mina.common.support.DefaultConnectFuture; -import org.apache.mina.util.NamePreservingRunnable; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.mina.util.Queue; - -import java.io.IOException; -import java.net.ConnectException; -import java.net.Socket; -import java.net.SocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Set; - -/** - * {@link IoConnector} for socket transport (TCP/IP). - * - * @author The Apache Directory Project (mina-dev@directory.apache.org) - * @version $Rev: 627427 $, $Date: 2008-02-13 14:39:10 +0000 (Wed, 13 Feb 2008) $ - */ -public class ExistingSocketConnector extends BaseIoConnector -{ - /** @noinspection StaticNonFinalField */ - private static volatile int nextId = 0; - - private final Object lock = new Object(); - private final int id = nextId++; - private final String threadName = "SocketConnector-" + id; - private SocketConnectorConfig defaultConfig = new SocketConnectorConfig(); - private final Queue connectQueue = new Queue(); - private final SocketIoProcessor[] ioProcessors; - private final int processorCount; - private final Executor executor; - - /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ - private Selector selector; - private Worker worker; - private int processorDistributor = 0; - private int workerTimeout = 60; // 1 min. - private Socket _openSocket = null; - - /** Create a connector with a single processing thread using a NewThreadExecutor */ - public ExistingSocketConnector() - { - this(1, new NewThreadExecutor()); - } - - /** - * Create a connector with the desired number of processing threads - * - * @param processorCount Number of processing threads - * @param executor Executor to use for launching threads - */ - public ExistingSocketConnector(int processorCount, Executor executor) - { - if (processorCount < 1) - { - throw new IllegalArgumentException("Must have at least one processor"); - } - - this.executor = executor; - this.processorCount = processorCount; - ioProcessors = new SocketIoProcessor[processorCount]; - - for (int i = 0; i < processorCount; i++) - { - ioProcessors[i] = new SocketIoProcessor("SocketConnectorIoProcessor-" + id + "." + i, executor); - } - } - - /** - * How many seconds to keep the connection thread alive between connection requests - * - * @return Number of seconds to keep connection thread alive - */ - public int getWorkerTimeout() - { - return workerTimeout; - } - - /** - * Set how many seconds the connection worker thread should remain alive once idle before terminating itself. - * - * @param workerTimeout Number of seconds to keep thread alive. Must be >=0 - */ - public void setWorkerTimeout(int workerTimeout) - { - if (workerTimeout < 0) - { - throw new IllegalArgumentException("Must be >= 0"); - } - this.workerTimeout = workerTimeout; - } - - public ConnectFuture connect(SocketAddress address, IoHandler handler, IoServiceConfig config) - { - return connect(address, null, handler, config); - } - - public ConnectFuture connect(SocketAddress address, SocketAddress localAddress, - IoHandler handler, IoServiceConfig config) - { - /** Changes here from the Mina OpenSocketConnector. - * Ignoreing all address as they are not needed */ - - if (handler == null) - { - throw new NullPointerException("handler"); - } - - - if (config == null) - { - config = getDefaultConfig(); - } - - if (_openSocket == null) - { - throw new IllegalArgumentException("Specifed Socket not active"); - } - - boolean success = false; - - try - { - DefaultConnectFuture future = new DefaultConnectFuture(); - newSession(_openSocket, handler, config, future); - success = true; - return future; - } - catch (IOException e) - { - return DefaultConnectFuture.newFailedFuture(e); - } - finally - { - if (!success && _openSocket != null) - { - try - { - _openSocket.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - } - - public IoServiceConfig getDefaultConfig() - { - return defaultConfig; - } - - /** - * Sets the config this connector will use by default. - * - * @param defaultConfig the default config. - * - * @throws NullPointerException if the specified value is <code>null</code>. - */ - public void setDefaultConfig(SocketConnectorConfig defaultConfig) - { - if (defaultConfig == null) - { - throw new NullPointerException("defaultConfig"); - } - this.defaultConfig = defaultConfig; - } - - private synchronized void startupWorker() throws IOException - { - if (worker == null) - { - selector = Selector.open(); - worker = new Worker(); - executor.execute(new NamePreservingRunnable(worker)); - } - } - - private void registerNew() - { - if (connectQueue.isEmpty()) - { - return; - } - - for (; ;) - { - ConnectionRequest req; - synchronized (connectQueue) - { - req = (ConnectionRequest) connectQueue.pop(); - } - - if (req == null) - { - break; - } - - SocketChannel ch = req.channel; - try - { - ch.register(selector, SelectionKey.OP_CONNECT, req); - } - catch (IOException e) - { - req.setException(e); - } - } - } - - private void processSessions(Set keys) - { - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isConnectable()) - { - continue; - } - - SocketChannel ch = (SocketChannel) key.channel(); - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - boolean success = false; - try - { - ch.finishConnect(); - newSession(ch, entry.handler, entry.config, entry); - success = true; - } - catch (Throwable e) - { - entry.setException(e); - } - finally - { - key.cancel(); - if (!success) - { - try - { - ch.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - } - } - } - - keys.clear(); - } - - private void processTimedOutSessions(Set keys) - { - long currentTime = System.currentTimeMillis(); - Iterator it = keys.iterator(); - - while (it.hasNext()) - { - SelectionKey key = (SelectionKey) it.next(); - - if (!key.isValid()) - { - continue; - } - - ConnectionRequest entry = (ConnectionRequest) key.attachment(); - - if (currentTime >= entry.deadline) - { - entry.setException(new ConnectException()); - try - { - key.channel().close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - key.cancel(); - } - } - } - } - - private void newSession(Socket socket, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) - throws IOException - { - SocketSessionImpl session = new SocketSessionImpl(this, - nextProcessor(), - getListeners(), - config, - socket.getChannel(), - handler, - socket.getRemoteSocketAddress()); - - newSession(session, config, connectFuture); - } - - private void newSession(SocketChannel ch, IoHandler handler, IoServiceConfig config, ConnectFuture connectFuture) - throws IOException - - { - SocketSessionImpl session = new SocketSessionImpl(this, - nextProcessor(), - getListeners(), - config, - ch, - handler, - ch.socket().getRemoteSocketAddress()); - - newSession(session, config, connectFuture); - } - - private void newSession(SocketSessionImpl session, IoServiceConfig config, ConnectFuture connectFuture) - throws IOException - { - try - { - getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getFilterChainBuilder().buildFilterChain(session.getFilterChain()); - config.getThreadModel().buildFilterChain(session.getFilterChain()); - } - catch (Throwable e) - { - throw (IOException) new IOException("Failed to create a session.").initCause(e); - } - session.getIoProcessor().addNew(session); - connectFuture.setSession(session); - } - - private SocketIoProcessor nextProcessor() - { - return ioProcessors[processorDistributor++ % processorCount]; - } - - public void setOpenSocket(Socket openSocket) - { - _openSocket = openSocket; - } - - private class Worker implements Runnable - { - private long lastActive = System.currentTimeMillis(); - - public void run() - { - Thread.currentThread().setName(ExistingSocketConnector.this.threadName); - - for (; ;) - { - try - { - int nKeys = selector.select(1000); - - registerNew(); - - if (nKeys > 0) - { - processSessions(selector.selectedKeys()); - } - - processTimedOutSessions(selector.keys()); - - if (selector.keys().isEmpty()) - { - if (System.currentTimeMillis() - lastActive > workerTimeout * 1000L) - { - synchronized (lock) - { - if (selector.keys().isEmpty() && - connectQueue.isEmpty()) - { - worker = null; - try - { - selector.close(); - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - } - finally - { - selector = null; - } - break; - } - } - } - } - else - { - lastActive = System.currentTimeMillis(); - } - } - catch (IOException e) - { - ExceptionMonitor.getInstance().exceptionCaught(e); - - try - { - Thread.sleep(1000); - } - catch (InterruptedException e1) - { - ExceptionMonitor.getInstance().exceptionCaught(e1); - } - } - } - } - } - - private class ConnectionRequest extends DefaultConnectFuture - { - private final SocketChannel channel; - private final long deadline; - private final IoHandler handler; - private final IoServiceConfig config; - - private ConnectionRequest(SocketChannel channel, IoHandler handler, IoServiceConfig config) - { - this.channel = channel; - long timeout; - if (config instanceof IoConnectorConfig) - { - timeout = ((IoConnectorConfig) config).getConnectTimeoutMillis(); - } - else - { - timeout = ((IoConnectorConfig) getDefaultConfig()).getConnectTimeoutMillis(); - } - this.deadline = System.currentTimeMillis() + timeout; - this.handler = handler; - this.config = config; - } - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java index b31dd2bc91..ed37a69b82 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java @@ -56,9 +56,8 @@ public class AMQBrokerDetails implements BrokerDetails if (transport != null) { //todo this list of valid transports should be enumerated somewhere - if ((!(transport.equalsIgnoreCase(BrokerDetails.VM) || - transport.equalsIgnoreCase(BrokerDetails.TCP) || - transport.equalsIgnoreCase(BrokerDetails.SOCKET)))) + if (!(transport.equalsIgnoreCase(BrokerDetails.VM) || + transport.equalsIgnoreCase(BrokerDetails.TCP))) { if (transport.equalsIgnoreCase("localhost")) { @@ -182,10 +181,7 @@ public class AMQBrokerDetails implements BrokerDetails } else { - if (!_transport.equalsIgnoreCase(SOCKET)) - { - setPort(port); - } + setPort(port); } String queryString = connection.getQuery(); @@ -307,11 +303,8 @@ public class AMQBrokerDetails implements BrokerDetails sb.append(_host); } - if (!(_transport.equalsIgnoreCase(SOCKET))) - { - sb.append(':'); - sb.append(_port); - } + sb.append(':'); + sb.append(_port); sb.append(printOptionsURL()); diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 98573c9cc3..b0242210d8 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -49,6 +49,11 @@ import org.apache.qpid.framing.TxSelectBody; import org.apache.qpid.framing.TxSelectOkBody; import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.jms.ChannelLimitReachedException; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.mina.MinaNetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,8 +94,21 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate StateWaiter waiter = _conn._protocolHandler.createWaiter(openOrClosedStates); - TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail); + ConnectionSettings settings = new ConnectionSettings(); + settings.setHost(brokerDetail.getHost()); + settings.setPort(brokerDetail.getPort()); + settings.setProtocol(brokerDetail.getTransport()); + SSLConfiguration sslConfig = _conn.getSSLConfiguration(); + SSLContextFactory sslFactory = null; + if (sslConfig != null) + { + sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); + } + + OutgoingNetworkTransport transport = new MinaNetworkTransport(); + NetworkConnection network = transport.connect(settings, _conn._protocolHandler, sslFactory); + _conn._protocolHandler.setNetworkConnection(network); _conn._protocolHandler.getProtocolSession().init(); // this blocks until the connection has been set up or when an error // has prevented the connection being set up diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 424e09693f..34c6468629 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -57,7 +57,6 @@ import org.apache.qpid.framing.HeartbeatBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersion; -import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.pool.Job; import org.apache.qpid.pool.ReferenceCountingExecutorService; import org.apache.qpid.protocol.AMQConstant; @@ -65,8 +64,9 @@ import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.thread.Threading; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.network.io.IoTransport; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.NetworkTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,11 +172,13 @@ public class AMQProtocolHandler implements ProtocolEngine private Job _readJob; private Job _writeJob; private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance(); - private NetworkDriver _networkDriver; private ProtocolVersion _suggestedProtocolVersion; private long _writtenBytes; private long _readBytes; + private NetworkTransport _transport; + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -300,7 +302,7 @@ public class AMQProtocolHandler implements ProtocolEngine // failover: HeartbeatDiagnostics.timeout(); _logger.warn("Timed out while waiting for heartbeat from peer."); - _networkDriver.close(); + _network.close(); } public void writerIdle() @@ -322,7 +324,7 @@ public class AMQProtocolHandler implements ProtocolEngine { _logger.info("Exception caught therefore going to attempt failover: " + cause, cause); // this will attempt failover - _networkDriver.close(); + _network.close(); closed(); } else @@ -574,7 +576,7 @@ public class AMQProtocolHandler implements ProtocolEngine { public void run() { - _networkDriver.send(buf); + _sender.send(buf); } }); if (PROTOCOL_DEBUG) @@ -595,7 +597,7 @@ public class AMQProtocolHandler implements ProtocolEngine if (wait) { - _networkDriver.flush(); + _sender.flush(); } } @@ -709,7 +711,7 @@ public class AMQProtocolHandler implements ProtocolEngine try { syncWrite(frame, ConnectionCloseOkBody.class, timeout); - _networkDriver.close(); + _network.close(); closed(); } catch (AMQTimeoutException e) @@ -829,17 +831,18 @@ public class AMQProtocolHandler implements ProtocolEngine public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } - public void setNetworkDriver(NetworkDriver driver) + public void setNetworkConnection(NetworkConnection network) { - _networkDriver = driver; + _network = network; + _sender = network.getSender(); } /** @param delay delay in seconds (not ms) */ @@ -847,15 +850,15 @@ public class AMQProtocolHandler implements ProtocolEngine { if (delay > 0) { - getNetworkDriver().setMaxWriteIdle(delay); - getNetworkDriver().setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); + _network.setMaxWriteIdle(delay); + _network.setMaxReadIdle(HeartbeatConfig.CONFIG.getTimeout(delay)); HeartbeatDiagnostics.init(delay, HeartbeatConfig.CONFIG.getTimeout(delay)); } } - public NetworkDriver getNetworkDriver() + public NetworkConnection getNetworkConnection() { - return _networkDriver; + return _network; } public ProtocolVersion getSuggestedProtocolVersion() diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java deleted file mode 100644 index 1ac8f62e32..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.mina.common.ByteBuffer; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.qpid.client.SSLConfiguration; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SocketTransportConnection implements ITransportConnection -{ - private static final Logger _logger = LoggerFactory.getLogger(SocketTransportConnection.class); - private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - - private SocketConnectorFactory _socketConnectorFactory; - - static interface SocketConnectorFactory - { - IoConnector newSocketConnector(); - } - - public SocketTransportConnection(SocketConnectorFactory socketConnectorFactory) - { - _socketConnectorFactory = socketConnectorFactory; - } - - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException - { - ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); - - // the MINA default is currently to use the pooled allocator although this may change in future - // once more testing of the performance of the simple allocator has been done - if (!Boolean.getBoolean("amqj.enablePooledAllocator")) - { - _logger.info("Using SimpleByteBufferAllocator"); - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - } - - final IoConnector ioConnector = _socketConnectorFactory.newSocketConnector(); - final InetSocketAddress address; - - if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET)) - { - address = null; - } - else - { - address = new InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort()); - _logger.info("Attempting connection to " + address); - } - - SSLConfiguration sslConfig = protocolHandler.getConnection().getSSLConfiguration(); - SSLContextFactory sslFactory = null; - if (sslConfig != null) - { - sslFactory = new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType()); - } - - MINANetworkDriver driver = new MINANetworkDriver(ioConnector); - driver.open(brokerDetail.getPort(), address.getAddress(), protocolHandler, null, sslFactory); - protocolHandler.setNetworkDriver(driver); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java index 97657a09f4..3e03f88341 100644 --- a/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java +++ b/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java @@ -22,22 +22,17 @@ package org.apache.qpid.client.transport; import java.io.IOException; import java.net.Socket; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoServiceConfig; -import org.apache.mina.transport.socket.nio.ExistingSocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnector; import org.apache.mina.transport.vmpipe.VmPipeAcceptor; import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException; -import org.apache.qpid.jms.BrokerDetails; import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.thread.QpidThreadExecutor; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; +import org.apache.qpid.transport.network.VMBrokerMap; +import org.apache.qpid.transport.network.mina.MinaNetworkHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,132 +46,15 @@ public class TransportConnection { private static ITransportConnection _instance; - private static final Map _inVmPipeAddress = new HashMap(); private static VmPipeAcceptor _acceptor; private static int _currentInstance = -1; private static int _currentVMPort = -1; - private static final int TCP = 0; - private static final int VM = 1; - private static final int SOCKET = 2; - private static Logger _logger = LoggerFactory.getLogger(TransportConnection.class); private static final String DEFAULT_QPID_SERVER = "org.apache.qpid.server.protocol.AMQProtocolEngineFactory"; - private static Map<String, Socket> _openSocketRegister = new ConcurrentHashMap<String, Socket>(); - - public static void registerOpenSocket(String socketID, Socket openSocket) - { - _openSocketRegister.put(socketID, openSocket); - } - - public static Socket removeOpenSocket(String socketID) - { - return _openSocketRegister.remove(socketID); - } - - public static synchronized ITransportConnection getInstance(final BrokerDetails details) throws AMQTransportConnectionException - { - int transport = getTransport(details.getTransport()); - - if (transport == -1) - { - throw new AMQNoTransportForProtocolException(details, null, null); - } - - switch (transport) - { - case SOCKET: - return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() - { - ExistingSocketConnector connector = new ExistingSocketConnector(1,new QpidThreadExecutor()); - - Socket socket = TransportConnection.removeOpenSocket(details.getHost()); - - if (socket != null) - { - _logger.info("Using existing Socket:" + socket); - - ((ExistingSocketConnector) connector).setOpenSocket(socket); - } - else - { - throw new IllegalArgumentException("Active Socket must be provided for broker " + - "with 'socket://<SocketID>' transport:" + details); - } - return connector; - } - }); - case TCP: - return new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory() - { - public IoConnector newSocketConnector() - { - SocketConnector result = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector - - // Don't have the connector's worker thread wait around for other connections (we only use - // one SocketConnector per connection at the moment anyway). This allows short-running - // clients (like unit tests) to complete quickly. - result.setWorkerTimeout(0); - return result; - } - }); - case VM: - { - return getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker")); - } - default: - throw new AMQNoTransportForProtocolException(details, "Transport not recognised:" + transport, null); - } - } - - private static int getTransport(String transport) - { - if (transport.equals(BrokerDetails.SOCKET)) - { - return SOCKET; - } - - if (transport.equals(BrokerDetails.TCP)) - { - return TCP; - } - - if (transport.equals(BrokerDetails.VM)) - { - return VM; - } - - return -1; - } - - private static ITransportConnection getVMTransport(BrokerDetails details, boolean AutoCreate) - throws AMQVMBrokerCreationException - { - int port = details.getPort(); - - synchronized (_inVmPipeAddress) - { - if (!_inVmPipeAddress.containsKey(port)) - { - if (AutoCreate) - { - _logger.warn("Auto Creating InVM Broker on port:" + port); - createVMBroker(port); - } - else - { - throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port - + " does not exist. Auto create disabled.", null); - } - } - } - - return new VmPipeTransportConnection(port); - } + public static final int DEFAULT_VM_PORT = 1; public static void createVMBroker(int port) throws AMQVMBrokerCreationException { @@ -189,10 +67,10 @@ public class TransportConnection IoServiceConfig config = _acceptor.getDefaultConfig(); } } - synchronized (_inVmPipeAddress) + synchronized (VMBrokerMap.class) { - if (!_inVmPipeAddress.containsKey(port)) + if (!VMBrokerMap.contains(port)) { _logger.info("Creating InVM Qpid.AMQP listening on port " + port); IoHandlerAdapter provider = null; @@ -204,7 +82,7 @@ public class TransportConnection _acceptor.bind(pipe, provider); - _inVmPipeAddress.put(port, pipe); + VMBrokerMap.add(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } catch (IOException e) @@ -231,7 +109,7 @@ public class TransportConnection } _acceptor.bind(pipe, provider); - _inVmPipeAddress.put(port, pipe); + VMBrokerMap.add(port, pipe); _logger.info("Created InVM Qpid.AMQP listening on port " + port); } catch (IOException justUseFirstException) @@ -272,11 +150,10 @@ public class TransportConnection { Class[] cnstr = {Integer.class}; Object[] params = {port}; - - provider = new MINANetworkDriver(); + ProtocolEngineFactory engineFactory = (ProtocolEngineFactory) Class.forName(protocolProviderClass).getConstructor(cnstr).newInstance(params); - ((MINANetworkDriver) provider).setProtocolEngineFactory(engineFactory, true); - // Give the broker a second to create + provider = new MinaNetworkHandler(null, engineFactory); + _logger.info("Created VMBroker Instance:" + port); } catch (Exception e) @@ -309,9 +186,9 @@ public class TransportConnection { _acceptor.unbindAll(); } - synchronized (_inVmPipeAddress) + synchronized (VMBrokerMap.class) { - _inVmPipeAddress.clear(); + VMBrokerMap.clear(); } _acceptor = null; } @@ -321,16 +198,15 @@ public class TransportConnection public static void killVMBroker(int port) { - synchronized (_inVmPipeAddress) + synchronized (VMBrokerMap.class) { - VmPipeAddress pipe = (VmPipeAddress) _inVmPipeAddress.get(port); - if (pipe != null) + if (VMBrokerMap.contains(port)) { _logger.info("Killing VM Broker:" + port); - _inVmPipeAddress.remove(port); + VmPipeAddress address = VMBrokerMap.remove(port); // This does need to be sychronized as otherwise mina can hang // if a new connection is made - _acceptor.unbind(pipe); + _acceptor.unbind(address); } } } diff --git a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java b/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java deleted file mode 100644 index 87cc2e7a5a..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.transport; - -import java.io.IOException; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; -import org.apache.mina.transport.vmpipe.VmPipeAddress; -import org.apache.mina.transport.vmpipe.VmPipeConnector; -import org.apache.qpid.client.protocol.AMQProtocolHandler; -import org.apache.qpid.jms.BrokerDetails; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class VmPipeTransportConnection implements ITransportConnection -{ - private static final Logger _logger = LoggerFactory.getLogger(VmPipeTransportConnection.class); - - private int _port; - - private MINANetworkDriver _networkDriver; - - public VmPipeTransportConnection(int port) - { - _port = port; - } - - public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException - { - final VmPipeConnector ioConnector = new QpidVmPipeConnector(); - - final VmPipeAddress address = new VmPipeAddress(_port); - _logger.info("Attempting connection to " + address); - _networkDriver = new MINANetworkDriver(ioConnector, protocolHandler); - protocolHandler.setNetworkDriver(_networkDriver); - ConnectFuture future = ioConnector.connect(address, _networkDriver); - // wait for connection to complete - future.join(); - // we call getSession which throws an IOException if there has been an error connecting - future.getSession(); - _networkDriver.setProtocolEngine(protocolHandler); - } -} diff --git a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java index 6d81f728c9..1aca28aa3a 100644 --- a/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java +++ b/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java @@ -52,7 +52,6 @@ public interface BrokerDetails public static final int DEFAULT_PORT = 5672; - public static final String SOCKET = "socket"; public static final String TCP = "tcp"; public static final String VM = "vm"; diff --git a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java index f520a21ba0..e54b8ef369 100644 --- a/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java +++ b/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java @@ -20,23 +20,24 @@ */ package org.apache.qpid.client.protocol; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import junit.framework.TestCase; -import org.apache.qpid.framing.AMQFrame; + +import org.apache.qpid.AMQException; +import org.apache.qpid.client.AMQAuthenticationException; +import org.apache.qpid.client.MockAMQConnection; +import org.apache.qpid.client.state.AMQState; import org.apache.qpid.framing.AMQBody; +import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl; -import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.transport.TestNetworkDriver; -import org.apache.qpid.client.MockAMQConnection; -import org.apache.qpid.client.AMQAuthenticationException; -import org.apache.qpid.client.state.AMQState; +import org.apache.qpid.transport.TestNetworkConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - /** * This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception. * @@ -73,7 +74,7 @@ public class AMQProtocolHandlerTest extends TestCase { //Create a new ProtocolHandler with a fake connection. _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'")); - _handler.setNetworkDriver(new TestNetworkDriver()); + _handler.setNetworkConnection(new TestNetworkConnection()); AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1); _blockFrame = new AMQFrame(0, body); diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java index 2c5fa0112e..a3dfff45f9 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java @@ -487,27 +487,6 @@ public class ConnectionURLTest extends TestCase } - public void testSocketProtocol() throws URLSyntaxException - { - String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'"; - - try - { - AMQConnectionURL curl = new AMQConnectionURL(url); - assertNotNull(curl); - assertEquals(1, curl.getBrokerCount()); - assertNotNull(curl.getBrokerDetails(0)); - assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport()); - assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost()); - assertEquals("URL does not toString as expected", - url.replace(":guest", ":********"), curl.toString()); - } - catch (URLSyntaxException e) - { - fail(e.getMessage()); - } - } - public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException { String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'"; diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 31953ea6ab..48a3df734a 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -22,8 +22,6 @@ package org.apache.qpid.protocol; import java.net.SocketAddress; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Receiver; /** @@ -32,9 +30,6 @@ import org.apache.qpid.transport.Receiver; */ public interface ProtocolEngine extends Receiver<java.nio.ByteBuffer> { - // Sets the network driver providing data for this ProtocolEngine - void setNetworkDriver (NetworkDriver driver); - // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index 9df84eef90..4e40b78440 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.NetworkConnection; public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); + ProtocolEngine newProtocolEngine(NetworkConnection network); }
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index 08678b213b..2074c77a5b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -30,6 +30,8 @@ import java.util.Map; */ public class ConnectionSettings { + public static final String WILDCARD_ADDRESS = "*"; + String protocol = "tcp"; String host = "localhost"; String vhost; diff --git a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java deleted file mode 100644 index 86af97bf7e..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.SocketAddress; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; - -public interface NetworkDriver extends Sender<java.nio.ByteBuffer> -{ - // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to - // it using the SSLContextFactory if provided - void open(int port, InetAddress destination, ProtocolEngine engine, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) - throws OpenException; - - // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which - // processes incoming connections with ProtocolEngines and SSLEngines created from the factories - // (in the case of an SSLContextFactory, if provided) - void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; - - // Returns the remote address of the underlying socket - SocketAddress getRemoteAddress(); - - // Returns the local address of the underlying socket - SocketAddress getLocalAddress(); - - /** - * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been - * read in seconds - */ - void setMaxReadIdle(int idleTime); - - /** - * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been - * written in seconds - */ - void setMaxWriteIdle(int idleTime); - -}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java index c38afe5dd5..8d3f7a779a 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java +++ b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java @@ -25,20 +25,22 @@ package org.apache.qpid.transport; * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned * from here if the underlying implementation supports them. */ -public interface NetworkDriverConfiguration +public interface NetworkTransportConfiguration { // Taken from Socket - Boolean getKeepAlive(); - Boolean getOOBInline(); - Boolean getReuseAddress(); - Integer getSoLinger(); // null means off - Integer getSoTimeout(); Boolean getTcpNoDelay(); - Integer getTrafficClass(); // The amount of memory in bytes to allocate to the incoming buffer Integer getReceiveBufferSize(); // The amount of memory in bytes to allocate to the outgoing buffer - Integer getSendBufferSize(); -} + Integer getSendBufferSize(); + + Integer getPort(); + + String getHost(); + + String getTransport(); + + Integer getConnectorProcessors(); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java b/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java new file mode 100644 index 0000000000..2c7652abeb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java @@ -0,0 +1,8 @@ +package org.apache.qpid.transport; + +import org.apache.mina.common.IoConnector; + +public interface SocketConnectorFactory +{ + IoConnector newConnector(); +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java new file mode 100644 index 0000000000..7099916c33 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkTransportConfiguration; + +public interface IncomingNetworkTransport extends NetworkTransport +{ + public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContextFactory sslFactory); +}
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 80b32ea909..1f69973b96 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -40,4 +40,8 @@ public interface NetworkConnection * Returns the local address of the underlying socket. */ SocketAddress getLocalAddress(); + + void setMaxWriteIdle(int sec); + + void setMaxReadIdle(int sec); }
\ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java new file mode 100644 index 0000000000..4b8a0baf75 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +public class Transport +{ + public static final String TCP = "tcp"; + public static final String VM = "vm"; +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java b/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java new file mode 100644 index 0000000000..acc55c2e2d --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.mina.transport.vmpipe.VmPipeAddress; + +public class VMBrokerMap +{ + private static final Map<Integer, VmPipeAddress> _map = new HashMap<Integer, VmPipeAddress>(); + + public static void add(int port, VmPipeAddress pipe) + { + _map.put(port, pipe); + } + + public static VmPipeAddress remove(int port) + { + return _map.remove(port); + } + + public static void clear() + { + _map.clear(); + } + + public static boolean contains(int port) + { + return _map.containsKey(port); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 3252544fee..cca1fc46c9 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -78,5 +78,16 @@ public class IoNetworkConnection implements NetworkConnection { return _socket.getLocalSocketAddress(); } - + + public void setMaxWriteIdle(int sec) + { + // TODO implement support for setting heartbeating config in this way + // Currently a socket timeout is used in IoSender + } + + public void setMaxReadIdle(int sec) + { + // TODO implement support for setting heartbeating config in this way + // Currently a socket timeout is used in IoSender + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java deleted file mode 100644 index 2206e0999e..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network.mina; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.ExecutorThreadModel; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.mina.util.SessionUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.thread.QpidThreadExecutor; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.NetworkDriverConfiguration; -import org.apache.qpid.transport.OpenException; - -import java.io.IOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - -public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver -{ - - private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - - ProtocolEngine _protocolEngine; - private int _processors = 4; - private SSLContextFactory _sslFactory = null; - private IoConnector _socketConnector; - private IoAcceptor _acceptor; - private IoSession _ioSession; - private ProtocolEngineFactory _factory; - private Throwable _lastException; - private boolean _acceptingConnections = false; - - private WriteFuture _lastWriteFuture; - - private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class); - - static - { - org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); - - //override the MINA defaults to prevent use of the PooledByteBufferAllocator - org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - } - - public MINANetworkDriver(int processors, ProtocolEngine protocolEngine, IoSession session) - { - _processors = processors; - _protocolEngine = protocolEngine; - _ioSession = session; - _ioSession.setAttachment(_protocolEngine); - } - - public MINANetworkDriver() - { - - } - - public MINANetworkDriver(IoConnector ioConnector) - { - _socketConnector = ioConnector; - } - - public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine) - { - _socketConnector = ioConnector; - _protocolEngine = engine; - } - - public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException - { - - _factory = factory; - - _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor()); - - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); - sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)")); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - - if (config != null) - { - sc.setReceiveBufferSize(config.getReceiveBufferSize()); - sc.setSendBufferSize(config.getSendBufferSize()); - sc.setTcpNoDelay(config.getTcpNoDelay()); - } - - if (sslFactory != null) - { - _sslFactory = sslFactory; - } - - if (addresses != null && addresses.length > 0) - { - for (InetAddress addr : addresses) - { - try - { - _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig); - } - catch (IOException e) - { - throw new BindException(String.format("Could not bind to %1s:%2s", addr, port)); - } - } - } - else - { - try - { - _acceptor.bind(new InetSocketAddress(port), this, sconfig); - } - catch (IOException e) - { - throw new BindException(String.format("Could not bind to *:%1s", port)); - } - } - _acceptingConnections = true; - } - - public SocketAddress getRemoteAddress() - { - return _ioSession.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _ioSession.getLocalAddress(); - } - - - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, - SSLContextFactory sslFactory) throws OpenException - { - if (sslFactory != null) - { - _sslFactory = sslFactory; - } - - _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector - - SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); - String s = ""; - StackTraceElement[] trace = Thread.currentThread().getStackTrace(); - for(StackTraceElement elt : trace) - { - if(elt.getClassName().contains("Test")) - { - s = elt.getClassName(); - break; - } - } - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s)); - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); - scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE); - scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE); - - // Don't have the connector's worker thread wait around for other - // connections (we only use - // one SocketConnector per connection at the moment anyway). This allows - // short-running - // clients (like unit tests) to complete quickly. - if (_socketConnector instanceof SocketConnector) - { - ((SocketConnector) _socketConnector).setWorkerTimeout(0); - } - - ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); - future.join(); - if (!future.isConnected()) - { - throw new OpenException("Could not open connection", _lastException); - } - _ioSession = future.getSession(); - _ioSession.setAttachment(engine); - engine.setNetworkDriver(this); - _protocolEngine = engine; - } - - public void setMaxReadIdle(int idleTime) - { - _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime); - } - - public void setMaxWriteIdle(int idleTime) - { - _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime); - } - - public void close() - { - if (_lastWriteFuture != null) - { - _lastWriteFuture.join(); - } - if (_acceptor != null) - { - _acceptor.unbindAll(); - } - if (_ioSession != null) - { - _ioSession.close(); - } - } - - public void flush() - { - if (_lastWriteFuture != null) - { - _lastWriteFuture.join(); - } - } - - public void send(ByteBuffer msg) - { - org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity()); - minaBuf.put(msg); - minaBuf.flip(); - _lastWriteFuture = _ioSession.write(minaBuf); - } - - public void setIdleTimeout(int i) - { - // MINA doesn't support setting SO_TIMEOUT - } - - public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception - { - if (_protocolEngine != null) - { - _protocolEngine.exception(throwable); - } - else - { - _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable); - } - _lastException = throwable; - } - - /** - * Invoked when a message is received on a particular protocol session. Note - * that a protocol session is directly tied to a particular physical - * connection. - * - * @param protocolSession - * the protocol session that received the message - * @param message - * the message itself (i.e. a decoded frame) - * - * @throws Exception - * if the message cannot be processed - */ - public void messageReceived(IoSession protocolSession, Object message) throws Exception - { - if (message instanceof org.apache.mina.common.ByteBuffer) - { - ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf()); - } - else - { - throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message); - } - } - - public void sessionClosed(IoSession protocolSession) throws Exception - { - ((ProtocolEngine) protocolSession.getAttachment()).closed(); - } - - public void sessionCreated(IoSession protocolSession) throws Exception - { - // Configure the session with SSL if necessary - SessionUtil.initialize(protocolSession); - if (_sslFactory != null) - { - protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } - - if (_ioSession == null) - { - _ioSession = protocolSession; - } - - if (_acceptingConnections) - { - // Set up the protocol engine - ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); - MINANetworkDriver newDriver = new MINANetworkDriver(_processors, protocolEngine, protocolSession); - protocolEngine.setNetworkDriver(newDriver); - } - } - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - if (IdleStatus.WRITER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).writerIdle(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).readerIdle(); - } - } - - private ProtocolEngine getProtocolEngine() - { - return _protocolEngine; - } - - public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) - { - _factory = engineFactory; - _acceptingConnections = acceptingConnections; - } - - public void setProtocolEngine(ProtocolEngine protocolEngine) - { - _protocolEngine = protocolEngine; - if (_ioSession != null) - { - _ioSession.setAttachment(protocolEngine); - } - } - -} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java new file mode 100644 index 0000000000..0f433f6eeb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java @@ -0,0 +1,81 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoSession; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; + +public class MinaNetworkConnection implements NetworkConnection +{ + private IoSession _session; + private Sender<ByteBuffer> _sender; + + public MinaNetworkConnection(IoSession session) + { + _session = session; + _sender = new MinaSender(_session); + } + + public Sender<ByteBuffer> getSender() + { + return _sender; + } + + public void close() + { + _session.close(); + } + + public SocketAddress getRemoteAddress() + { + return _session.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _session.getLocalAddress(); + } + + public long getReadBytes() + { + return _session.getReadBytes(); + } + + public long getWrittenBytes() + { + return _session.getWrittenBytes(); + } + + public void setMaxWriteIdle(int sec) + { + _session.setIdleTime(IdleStatus.WRITER_IDLE, sec); + } + + public void setMaxReadIdle(int sec) + { + _session.setIdleTime(IdleStatus.READER_IDLE, sec); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java new file mode 100644 index 0000000000..c00187480c --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.mina; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.util.SessionUtil; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.NetworkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MinaNetworkHandler extends IoHandlerAdapter +{ + private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class); + + private ProtocolEngineFactory _factory; + private SSLContextFactory _sslFactory = null; + + static + { + boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers"); + LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers"); + ByteBuffer.setUseDirectBuffers(directBuffers); + + //override the MINA defaults to prevent use of the PooledByteBufferAllocator + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + public MinaNetworkHandler(SSLContextFactory sslFactory, ProtocolEngineFactory factory) + { + _sslFactory = sslFactory; + _factory = factory; + } + + public MinaNetworkHandler(SSLContextFactory sslFactory) + { + this(sslFactory, null); + } + + public void messageReceived(IoSession session, Object message) + { + ProtocolEngine engine = (ProtocolEngine) session.getAttachment(); + ByteBuffer buf = (ByteBuffer) message; + try + { + engine.received(buf.buf()); + } + catch (RuntimeException re) + { + engine.exception(re); + } + } + + public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception + { + ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); + if(engine != null) + { + LOGGER.error("Exception caught by Mina", throwable); + engine.exception(throwable); + } + else + { + LOGGER.error("Exception caught by Mina but without protocol engine to handle it", throwable); + } + } + + public void sessionCreated(IoSession ioSession) throws Exception + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Created session: " + ioSession.getRemoteAddress()); + } + + SessionUtil.initialize(ioSession); + + if (_sslFactory != null) + { + ioSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + + if (_factory != null) + { + NetworkConnection netConn = new MinaNetworkConnection(ioSession); + + ProtocolEngine engine = _factory.newProtocolEngine(netConn); + ioSession.setAttachment(engine); + } + } + + public void sessionClosed(IoSession ioSession) throws Exception + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("closed: " + ioSession.getRemoteAddress()); + } + + ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); + if(engine != null) + { + engine.closed(); + } + else + { + LOGGER.error("Unable to close ProtocolEngine as none was present"); + } + } + + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + if (IdleStatus.WRITER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).writerIdle(); + } + else if (IdleStatus.READER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).readerIdle(); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java new file mode 100644 index 0000000000..62f9429f30 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -0,0 +1,250 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExecutorThreadModel; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoSession; +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; +import org.apache.mina.transport.vmpipe.VmPipeAddress; + +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.SocketConnectorFactory; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.VMBrokerMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport +{ + private static final int UNKNOWN = -1; + private static final int TCP = 0; + private static final int VM = 1; + + public NetworkConnection _connection; + private SocketAcceptor _acceptor; + private InetSocketAddress _address; + + public NetworkConnection connect(ConnectionSettings settings, + Receiver<java.nio.ByteBuffer> delegate, SSLContextFactory sslFactory) + { + int transport = getTransport(settings.getProtocol()); + + IoConnectorCreator stc; + switch(transport) + { + case TCP: + stc = new IoConnectorCreator(new SocketConnectorFactory() + { + public IoConnector newConnector() + { + return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector + } + }); + _connection = stc.connect(delegate, settings, sslFactory); + break; + case VM: + stc = new IoConnectorCreator(new SocketConnectorFactory() + { + public IoConnector newConnector() + { + return new QpidVmPipeConnector(); + } + }); + _connection = stc.connect(delegate, settings, sslFactory); + break; + case UNKNOWN: + default: + throw new TransportException("Unknown protocol: " + settings.getProtocol()); + } + + return _connection; + } + + private static int getTransport(String transport) + { + if (transport.equals(Transport.TCP)) + { + return TCP; + } + + if (transport.equals(Transport.VM)) + { + return VM; + } + + return -1; + } + + public void close() + { + if(_connection != null) + { + _connection.close(); + } + if (_acceptor != null) + { + _acceptor.unbindAll(); + } + } + + public NetworkConnection getConnection() + { + return _connection; + } + + public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory factory, + final SSLContextFactory sslFactory) + { + int processors = config.getConnectorProcessors(); + + if (Transport.TCP.equalsIgnoreCase(config.getTransport())) + { + _acceptor = new SocketAcceptor(processors, new NewThreadExecutor()); + + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)")); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + sc.setTcpNoDelay(config.getTcpNoDelay()); + sc.setSendBufferSize(config.getSendBufferSize()); + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + + if (config.getHost().equals(WILDCARD_ADDRESS)) + { + _address = new InetSocketAddress(config.getPort()); + } + else + { + _address = new InetSocketAddress(config.getHost(), config.getPort()); + } + } + else + { + throw new TransportException("Unknown transport: " + config.getTransport()); + } + + try + { + _acceptor.bind(_address, new MinaNetworkHandler(sslFactory, factory)); + } + catch (IOException e) + { + throw new TransportException("Could not bind to " + _address, e); + } + } + + + private static class IoConnectorCreator + { + private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class); + + private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024; + + private SocketConnectorFactory _ioConnectorFactory; + + public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory) + { + _ioConnectorFactory = socketConnectorFactory; + } + + public NetworkConnection connect(Receiver<java.nio.ByteBuffer> receiver, ConnectionSettings settings, SSLContextFactory sslFactory) + { + final IoConnector ioConnector = _ioConnectorFactory.newConnector(); + final SocketAddress address; + final String protocol = settings.getProtocol(); + final int port = settings.getPort(); + + if (Transport.TCP.equalsIgnoreCase(protocol)) + { + address = new InetSocketAddress(settings.getHost(), port); + } + else if(Transport.VM.equalsIgnoreCase(protocol)) + { + synchronized (VMBrokerMap.class) + { + if(!VMBrokerMap.contains(port)) + { + throw new TransportException("VM broker on port " + port + " does not exist."); + } + } + + address = new VmPipeAddress(port); + } + else + { + throw new TransportException("Unknown transport: " + protocol); + } + + LOGGER.info("Attempting connection to " + address); + + if (ioConnector instanceof SocketConnector) + { + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)")); + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(true); + scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); + scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); + + // Don't have the connector's worker thread wait around for other + // connections (we only use one SocketConnector per connection + // at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + ((SocketConnector) ioConnector).setWorkerTimeout(0); + } + + ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslFactory), ioConnector.getDefaultConfig()); + future.join(); + if (!future.isConnected()) + { + throw new TransportException("Could not open connection"); + } + + IoSession session = future.getSession(); + session.setAttachment(receiver); + + return new MinaNetworkConnection(session); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java new file mode 100644 index 0000000000..be114e2fa1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.CloseFuture; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.WriteFuture; +import org.apache.qpid.transport.Sender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MinaSender + */ +public class MinaSender implements Sender<java.nio.ByteBuffer> +{ + private static final Logger _log = LoggerFactory.getLogger(MinaSender.class); + + private final IoSession _session; + private WriteFuture _lastWrite; + + public MinaSender(IoSession session) + { + _session = session; + } + + public synchronized void send(java.nio.ByteBuffer msg) + { + _log.debug("sending data:"); + ByteBuffer mina = ByteBuffer.allocate(msg.limit()); + mina.put(msg); + mina.flip(); + _lastWrite = _session.write(mina); + _log.debug("sent data:"); + } + + public synchronized void flush() + { + if (_lastWrite != null) + { + _lastWrite.join(); + } + } + + public void close() + { + // MINA will sometimes throw away in-progress writes when you ask it to close + flush(); + CloseFuture closed = _session.close(); + closed.join(); + } + + public void setIdleTimeout(int i) + { + //TODO: + //We are instead using the setMax[Read|Write]IdleTime methods in + //MinaNetworkConnection for this. Should remove this method from + //sender interface, but currently being used by IoSender for 0-10. + } +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/MockSender.java b/java/common/src/test/java/org/apache/qpid/transport/MockSender.java new file mode 100644 index 0000000000..4b38b7318a --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/MockSender.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.nio.ByteBuffer; + +public class MockSender implements Sender<ByteBuffer> +{ + + public void setIdleTimeout(int i) + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void flush() + { + + } + + public void close() + { + + } +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java index 957a7190ee..8686c17414 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java +++ b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java @@ -25,32 +25,32 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.NetworkConnection; /** * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, * so if this class is being used and some methods are to be used, then please update those. */ -public class TestNetworkDriver implements NetworkDriver +public class TestNetworkConnection implements NetworkConnection { - private final ConcurrentMap attributes = new ConcurrentHashMap(); private String _remoteHost = "127.0.0.1"; private String _localHost = "127.0.0.1"; private int _port = 1; private SocketAddress _localAddress = null; private SocketAddress _remoteAddress = null; + private final MockSender _sender; - public TestNetworkDriver() + public TestNetworkConnection() { + _sender = new MockSender(); } public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException + NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException { } @@ -65,7 +65,7 @@ public class TestNetworkDriver implements NetworkDriver return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost, _port); } - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws OpenException { @@ -130,4 +130,9 @@ public class TestNetworkDriver implements NetworkDriver { _remoteAddress = address; } + + public Sender<ByteBuffer> getSender() + { + return _sender; + } } diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java index fc8e689ca4..a4292d9009 100644 --- a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java +++ b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java @@ -21,44 +21,58 @@ package org.apache.qpid.transport.network.mina; -import java.net.BindException; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import junit.framework.TestCase; - +import org.apache.mina.util.AvailablePortFinder; import org.apache.qpid.framing.AMQDataBlock; import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.OpenException; - -public class MINANetworkDriverTest extends TestCase +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; + +public class MinaNetworkHandlerTest extends QpidTestCase { private static final String TEST_DATA = "YHALOTHAR"; - private static int TEST_PORT = 2323; - private NetworkDriver _server; - private NetworkDriver _client; + private int _testPort; + private IncomingNetworkTransport _server; + private OutgoingNetworkTransport _client; private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read private Exception _thrownEx; + private ConnectionSettings _clientSettings; + private NetworkConnection _network; + private TestNetworkTransportConfiguration _brokerSettings; @Override - public void setUp() + public void setUp() throws Exception { - _server = new MINANetworkDriver(); - _client = new MINANetworkDriver(); + String host = InetAddress.getLocalHost().getHostName(); + _testPort = AvailablePortFinder.getNextAvailable(10000); + + _clientSettings = new ConnectionSettings(); + _clientSettings.setHost(host); + _clientSettings.setPort(_testPort); + + _brokerSettings = new TestNetworkTransportConfiguration(_testPort, host); + + _server = new MinaNetworkTransport(); + _client = new MinaNetworkTransport(); _thrownEx = null; _countingEngine = new CountingProtocolEngine(); - // increment the port to prevent tests clashing with each other when - // the port is in TIMED_WAIT state. - TEST_PORT++; } @Override @@ -78,46 +92,40 @@ public class MINANetworkDriverTest extends TestCase /** * Tests that a socket can't be opened if a driver hasn't been bound * to the port and can be opened if a driver has been bound. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException */ - public void testBindOpen() throws BindException, UnknownHostException, OpenException + public void testBindOpen() throws Exception { try { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_clientSettings, _countingEngine, null); } - catch (OpenException e) + catch (TransportException e) { _thrownEx = e; } assertNotNull("Open should have failed since no engine bound", _thrownEx); - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _server.accept(_brokerSettings, null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_clientSettings, _countingEngine, null); } /** * Tests that a socket can't be opened after a bound NetworkDriver has been closed - * @throws BindException - * @throws UnknownHostException - * @throws OpenException */ - public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException + public void testBindOpenCloseOpen() throws Exception { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _client.connect(_clientSettings, _countingEngine, null); _client.close(); _server.close(); try { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_clientSettings, _countingEngine, null); } - catch (OpenException e) + catch (TransportException e) { _thrownEx = e; } @@ -132,43 +140,60 @@ public class MINANetworkDriverTest extends TestCase { try { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); } - catch (BindException e) + catch (TransportException e) { fail("First bind should not fail"); } try { - _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); + IncomingNetworkTransport second = new MinaNetworkTransport(); + second.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); } - catch (BindException e) + catch (TransportException e) { _thrownEx = e; } assertNotNull("Second bind should throw BindException", _thrownEx); - } - + } + + /** + * Tests that binding to the wildcard address succeeds and a client can + * connect via localhost. + */ + public void testWildcardBind() throws Exception + { + TestNetworkTransportConfiguration serverSettings = + new TestNetworkTransportConfiguration(_testPort, WILDCARD_ADDRESS); + + _server.accept(serverSettings, null, null); + + try + { + _client.connect(_clientSettings, _countingEngine, null); + } + catch (TransportException e) + { + fail("Open should have succeeded since we used a wildcard bind"); + } + } + /** * tests that bytes sent on a network driver are received at the other end - * - * @throws UnknownHostException - * @throws OpenException - * @throws InterruptedException - * @throws BindException */ - public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException + public void testSend() throws Exception { // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); // Tell the counting engine how much data we're sending _countingEngine.setNewLatch(TEST_DATA.getBytes().length); // Send the data and wait for up to 2 seconds to get it back - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); _countingEngine.getLatch().await(2, TimeUnit.SECONDS); // Check what we got @@ -177,36 +202,30 @@ public class MINANetworkDriverTest extends TestCase /** * Opens a connection with a low read idle and check that it gets triggered - * @throws BindException - * @throws OpenException - * @throws UnknownHostException * */ - public void testSetReadIdle() throws BindException, UnknownHostException, OpenException + public void testSetReadIdle() throws Exception { // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle()); - _client.setMaxReadIdle(1); + _network.setMaxReadIdle(1); sleepForAtLeast(1500); assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle()); } /** * Opens a connection with a low write idle and check that it gets triggered - * @throws BindException - * @throws OpenException - * @throws UnknownHostException * */ - public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException + public void testSetWriteIdle() throws Exception { // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle()); - _client.setMaxWriteIdle(1); + _network.setMaxWriteIdle(1); sleepForAtLeast(1500); assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle()); } @@ -216,16 +235,13 @@ public class MINANetworkDriverTest extends TestCase * Creates and then closes a connection from client to server and checks that the server * has its closed() method called. Then creates a new client and closes the server to check * that the client has its closed() method called. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException */ - public void testClosed() throws BindException, UnknownHostException, OpenException + public void testClosed() throws Exception { // Open a connection from a counting engine to an echo engine EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory(); - _server.bind(TEST_PORT, null, factory, null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, factory, null); + _network = _client.connect(_clientSettings, _countingEngine, null); EchoProtocolEngine serverEngine = null; while (serverEngine == null) { @@ -253,7 +269,7 @@ public class MINANetworkDriverTest extends TestCase } assertTrue("Server should have been closed", serverEngine.getClosed()); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _client.connect(_clientSettings, _countingEngine, null); _countingEngine.setClosed(false); assertFalse("Client should not have been closed", _countingEngine.getClosed()); _countingEngine.setNewLatch(1); @@ -271,22 +287,18 @@ public class MINANetworkDriverTest extends TestCase /** * Create a connection and instruct the client to throw an exception when it gets some data * and that the latch gets counted down. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - * @throws InterruptedException */ - public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException + public void testExceptionCaught() throws Exception { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); assertEquals("Exception should not have been thrown", 1, _countingEngine.getExceptionLatch().getCount()); _countingEngine.setErrorOnNextRead(true); _countingEngine.setNewLatch(TEST_DATA.getBytes().length); - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); assertEquals("Exception should have been thrown", 0, _countingEngine.getExceptionLatch().getCount()); @@ -294,28 +306,24 @@ public class MINANetworkDriverTest extends TestCase /** * Opens a connection and checks that the remote address is the one that was asked for - * @throws BindException - * @throws UnknownHostException - * @throws OpenException */ - public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException + public void testGetRemoteAddress() throws Exception { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT), - _client.getRemoteAddress()); + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); + assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), _testPort), + _network.getRemoteAddress()); } private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory { - EchoProtocolEngine _engine = null; + private EchoProtocolEngine _engine = null; - public ProtocolEngine newProtocolEngine(NetworkDriver driver) + public ProtocolEngine newProtocolEngine(NetworkConnection network) { if (_engine == null) { - _engine = new EchoProtocolEngine(); - _engine.setNetworkDriver(driver); + _engine = new EchoProtocolEngine(network); } return getEngine(); } @@ -328,8 +336,6 @@ public class MINANetworkDriverTest extends TestCase public class CountingProtocolEngine implements ProtocolEngine { - - protected NetworkDriver _driver; public ArrayList<ByteBuffer> _receivedBytes = new ArrayList<ByteBuffer>(); private int _readBytes; private CountDownLatch _latch = new CountDownLatch(0); @@ -362,26 +368,12 @@ public class MINANetworkDriverTest extends TestCase public SocketAddress getRemoteAddress() { - if (_driver != null) - { - return _driver.getRemoteAddress(); - } - else - { - return null; - } + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - if (_driver != null) - { - return _driver.getLocalAddress(); - } - else - { - return null; - } + return _network.getLocalAddress(); } public long getWrittenBytes() @@ -394,11 +386,6 @@ public class MINANetworkDriverTest extends TestCase _readerHasBeenIdle = true; } - public void setNetworkDriver(NetworkDriver driver) - { - _driver = driver; - } - public void writeFrame(AMQDataBlock frame) { @@ -465,12 +452,18 @@ public class MINANetworkDriverTest extends TestCase private class EchoProtocolEngine extends CountingProtocolEngine { + private NetworkConnection _echoNetwork; + + public EchoProtocolEngine(NetworkConnection network) + { + _echoNetwork = network; + } public void received(ByteBuffer msg) { super.received(msg); msg.rewind(); - _driver.send(msg); + _echoNetwork.getSender().send(msg); } } @@ -491,4 +484,52 @@ public class MINANetworkDriverTest extends TestCase timeLeft = period - (System.currentTimeMillis() - start); } } + + private static class TestNetworkTransportConfiguration implements NetworkTransportConfiguration + { + private int _port; + private String _host; + + public TestNetworkTransportConfiguration(final int port, final String host) + { + _port = port; + _host = host; + } + + public Boolean getTcpNoDelay() + { + return true; + } + + public Integer getReceiveBufferSize() + { + return 32768; + } + + public Integer getSendBufferSize() + { + return 32768; + } + + public Integer getPort() + { + return _port; + } + + public String getHost() + { + return _host; + } + + public String getTransport() + { + return Transport.TCP; + } + + public Integer getConnectorProcessors() + { + return 4; + } + + } }
\ No newline at end of file diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java index 278b9e9c04..9c1807199c 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java @@ -31,21 +31,21 @@ import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.test.utils.QpidBrokerTestCase; -import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.transport.TestNetworkConnection; public class AMQProtocolSessionTest extends QpidBrokerTestCase { - private static class AMQProtSession extends AMQProtocolSession + private static class TestProtocolSession extends AMQProtocolSession { - public AMQProtSession(AMQProtocolHandler protocolHandler, AMQConnection connection) + public TestProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection) { super(protocolHandler,connection); } - public TestNetworkDriver getNetworkDriver() + public TestNetworkConnection getNetworkConnection() { - return (TestNetworkDriver) _protocolHandler.getNetworkDriver(); + return (TestNetworkConnection) _protocolHandler.getNetworkConnection(); } public AMQShortString genQueueName() @@ -54,7 +54,7 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase } } - private AMQProtSession _testSession; + private TestProtocolSession _testSession; protected void setUp() throws Exception { @@ -62,10 +62,10 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase AMQConnection con = (AMQConnection) getConnection("guest", "guest"); AMQProtocolHandler protocolHandler = new AMQProtocolHandler(con); - protocolHandler.setNetworkDriver(new TestNetworkDriver()); - + protocolHandler.setNetworkConnection(new TestNetworkConnection()); + //don't care about the values set here apart from the dummy IoSession - _testSession = new AMQProtSession(protocolHandler , con); + _testSession = new TestProtocolSession(protocolHandler , con); } public void testTemporaryQueueWildcard() throws UnknownHostException @@ -100,7 +100,7 @@ public class AMQProtocolSessionTest extends QpidBrokerTestCase private void checkTempQueueName(SocketAddress address, String queueName) { - _testSession.getNetworkDriver().setLocalAddress(address); + _testSession.getNetworkConnection().setLocalAddress(address); assertEquals("Wrong queue name", queueName, _testSession.genQueueName().asString()); } } diff --git a/java/test-profiles/Excludes b/java/test-profiles/Excludes index ff6993fa0b..8359ad8743 100644 --- a/java/test-profiles/Excludes +++ b/java/test-profiles/Excludes @@ -50,4 +50,3 @@ org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#* // QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail. org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart - diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes index c05aad0cb1..0361d83d73 100755 --- a/java/test-profiles/Java010Excludes +++ b/java/test-profiles/Java010Excludes @@ -77,6 +77,5 @@ org.apache.qpid.test.unit.client.channelclose.ChannelCloseTest#* //Temporarily adding the following until the issues are sorted out. //Should probably raise JIRAs for them. -org.apache.qpid.transport.network.mina.MINANetworkDriverTest#* org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testCreateExchange org.apache.qpid.test.client.destination.AddressBasedDestinationTest#testBrowseMode |
