diff options
| author | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:10:30 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2011-07-07 15:10:30 +0000 |
| commit | 7c964a16114a1731a384d86d3c26087a3133d9dc (patch) | |
| tree | c3a15f8d7b32894beeaba7827740df928413b19e /qpid/java/broker | |
| parent | 093280efbbd98e0e73f2d45da22b3bca993acd0d (diff) | |
| download | qpid-python-7c964a16114a1731a384d86d3c26087a3133d9dc.tar.gz | |
QPID-3342: transition TCP based Mina transport for 0-8/0-9/0-9-1 protocols over to new IO interface model
Applied patch by Keith Wall and myself
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1143867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
14 files changed, 189 insertions, 182 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java index 08e6a24153..3d21afe279 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -44,6 +46,7 @@ import org.apache.log4j.xml.QpidLog4JConfigurator; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration; import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean; import org.apache.qpid.server.information.management.ServerInformationMBean; import org.apache.qpid.server.logging.SystemOutMessageLogger; @@ -59,8 +62,11 @@ import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry; import org.apache.qpid.server.transport.QpidAcceptor; import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.network.mina.MINANetworkDriver; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.mina.MinaNetworkTransport; /** * Main entry point for AMQPD. @@ -370,7 +376,7 @@ public class Main - if (bindAddr.equals("wildcard")) + if (bindAddr.equals(WILDCARD_ADDRESS)) { bindAddress = new InetSocketAddress(0).getAddress(); } @@ -386,15 +392,12 @@ public class Main String keystorePassword = serverConfig.getKeystorePassword(); String certType = serverConfig.getCertType(); SSLContextFactory sslFactory = null; - + if (!serverConfig.getSSLOnly()) { for(int port : ports) { - - NetworkDriver driver = new MINANetworkDriver(); - Set<VERSION> supported = EnumSet.allOf(VERSION.class); if(exclude_0_10.contains(port)) @@ -415,26 +418,23 @@ public class Main supported.remove(VERSION.v0_8); } + NetworkTransportConfiguration settings = + new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); + + IncomingNetworkTransport transport = new MinaNetworkTransport(); MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(hostName, supported); - - - driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory, - serverConfig.getNetworkConfiguration(), null); + transport.accept(settings, protocolEngineFactory, sslFactory); ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(driver,"TCP")); + new QpidAcceptor(transport, Transport.TCP)); CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port)); - } } if (serverConfig.getEnableSSL()) { - sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); - NetworkDriver driver = new MINANetworkDriver(); - String sslPort = commandLine.getOptionValue("s"); int port = 0; if (null != sslPort) @@ -446,10 +446,17 @@ public class Main port = serverConfig.getSSLPort(); } - driver.bind(port, new InetAddress[]{bindAddress}, - new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory); + NetworkTransportConfiguration settings = + new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP); + + sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType); + + IncomingNetworkTransport transport = new MinaNetworkTransport(); + + transport.accept(settings, new AMQProtocolEngineFactory(), sslFactory); + ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port), - new QpidAcceptor(driver,"TCP")); + new QpidAcceptor(transport,"TCP")); CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", port)); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java index 5908eb4bd8..23ab5e8222 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java @@ -20,6 +20,8 @@ package org.apache.qpid.server.configuration; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + import java.io.File; import java.util.Collections; import java.util.HashMap; @@ -42,7 +44,7 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriverConfiguration; +import org.apache.qpid.transport.NetworkTransportConfiguration; import sun.misc.Signal; import sun.misc.SignalHandler; @@ -61,7 +63,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public static final int DEFAULT_SSL_PORT = 8672; public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; public static final int DEFAULT_JMXPORT = 8999; - + public static final String QPID_HOME = "QPID_HOME"; public static final String QPID_WORK = "QPID_WORK"; public static final String LIB_DIR = "lib"; @@ -629,7 +631,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getLongValue("flowResumeCapacity", getCapacity()); } - public int getProcessors() + public int getConnectorProcessors() { return getIntValue("connector.processors", 4); } @@ -661,7 +663,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa public String getBind() { - return getStringValue("connector.bind", "wildcard"); + return getStringValue("connector.bind", WILDCARD_ADDRESS); } public int getReceiveBufferSize() @@ -771,58 +773,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa return getConfig().getBoolean("statistics.reporting.reset", false); } - public NetworkDriverConfiguration getNetworkConfiguration() - { - return new NetworkDriverConfiguration() - { - - public Integer getTrafficClass() - { - return null; - } - - public Boolean getTcpNoDelay() - { - return ServerConfiguration.this.getTcpNoDelay(); - } - - public Integer getSoTimeout() - { - return null; - } - - public Integer getSoLinger() - { - return null; - } - - public Integer getSendBufferSize() - { - return ServerConfiguration.this.getWriteBufferSize(); - } - - public Boolean getReuseAddress() - { - return null; - } - - public Integer getReceiveBufferSize() - { - return ServerConfiguration.this.getReceiveBufferSize(); - } - - public Boolean getOOBInline() - { - return null; - } - - public Boolean getKeepAlive() - { - return null; - } - }; - } - public int getMaxChannelCount() { return getIntValue("maximumChannelCount", 256); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java new file mode 100644 index 0000000000..81dfcb4465 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.configuration; + +import org.apache.qpid.transport.NetworkTransportConfiguration; + +public class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration +{ + private final ServerConfiguration _serverConfig; + private final int _port; + private final String _host; + private final String _transport; + + public ServerNetworkTransportConfiguration(final ServerConfiguration serverConfig, + final int port, final String host, + final String transport) + { + _serverConfig = serverConfig; + _port = port; + _host = host; + _transport = transport; + } + + public Boolean getTcpNoDelay() + { + return _serverConfig.getTcpNoDelay(); + } + + public Integer getSendBufferSize() + { + return _serverConfig.getWriteBufferSize(); + } + + public Integer getReceiveBufferSize() + { + return _serverConfig.getReceiveBufferSize(); + } + + public Integer getPort() + { + return _port; + } + + public String getHost() + { + return _host; + } + + public String getTransport() + { + return _transport; + } + + public Integer getConnectorProcessors() + { + return _serverConfig.getConnectorProcessors(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java index 449f698c48..9c02cd9d2f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java @@ -94,8 +94,8 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig { @@ -156,8 +156,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private AMQPConnectionActor _actor; private LogSubject _logSubject; - private NetworkDriver _networkDriver; - private long _lastIoTime; private long _writtenBytes; @@ -177,20 +175,23 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol private boolean _statisticsEnabled = false; private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived; + private final NetworkConnection _network; + private final Sender<ByteBuffer> _sender; + public ManagedObject getManagedObject() { return _managedObject; } - public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver) + public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network) { _stateManager = new AMQStateManager(virtualHostRegistry, this); - _networkDriver = driver; - _codecFactory = new AMQCodecFactory(true, this); _poolReference.acquireExecutorService(); _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true); _writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false); + _network = network; + _sender = _network.getSender(); _actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger()); @@ -368,14 +369,14 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol null, mechanisms.getBytes(), locales.getBytes()); - _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer()); + _sender.send(responseBody.generateFrame(0).toNioByteBuffer()); } catch (AMQException e) { _logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion()); - _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); + _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer()); } } @@ -496,7 +497,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { public void run() { - _networkDriver.send(buf); + _sender.send(buf); } }); } @@ -688,8 +689,8 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol { if (delay > 0) { - _networkDriver.setMaxWriteIdle(delay); - _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); + _network.setMaxWriteIdle(delay); + _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay)); } } @@ -793,7 +794,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public void closeProtocolSession() { - _networkDriver.close(); + _sender.close(); try { _stateManager.changeState(AMQState.CONNECTION_CLOSED); @@ -828,7 +829,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol */ public String getLocalFQDN() { - SocketAddress address = _networkDriver.getLocalAddress(); + SocketAddress address = _network.getLocalAddress(); // we use the vmpipe address in some tests hence the need for this rather ugly test. The host // information is used by SASL primary. if (address instanceof InetSocketAddress) @@ -917,7 +918,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public Object getClientIdentifier() { - return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null; + return _network.getRemoteAddress(); } public VirtualHost getVirtualHost() @@ -976,12 +977,12 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public MethodRegistry getMethodRegistry() @@ -1011,14 +1012,9 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol // Nothing } - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - public void writerIdle() { - _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer()); + _sender.send(HeartbeatBody.FRAME.toNioByteBuffer()); } public void exception(Throwable throwable) @@ -1026,7 +1022,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol if (throwable instanceof AMQProtocolHeaderException) { writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion())); - _networkDriver.close(); + _sender.close(); _logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable); } @@ -1044,7 +1040,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol writeFrame(closeBody.generateFrame(0)); - _networkDriver.close(); + _sender.close(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java index 0e4444725e..c8bdcd377b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java @@ -25,7 +25,7 @@ import org.apache.qpid.protocol.ProtocolEngine; import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.NetworkConnection; public class AMQProtocolEngineFactory implements ProtocolEngineFactory { @@ -42,9 +42,8 @@ public class AMQProtocolEngineFactory implements ProtocolEngineFactory } - public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + public ProtocolEngine newProtocolEngine(NetworkConnection network) { - return new AMQProtocolEngine(_vhosts, networkDriver); + return new AMQProtocolEngine(_vhosts, network); } - } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index eb957ee33c..064d8f19a6 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -27,7 +27,8 @@ import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.ConnectionDelegate; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -37,28 +38,24 @@ public class MultiVersionProtocolEngine implements ProtocolEngine { private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class); - - - private NetworkDriver _networkDriver; private Set<VERSION> _supported; private String _fqdn; private IApplicationRegistry _appRegistry; - + private NetworkConnection _network; + private Sender<ByteBuffer> _sender; + private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine(); public MultiVersionProtocolEngine(IApplicationRegistry appRegistry, String fqdn, - Set<VERSION> supported, NetworkDriver networkDriver) + Set<VERSION> supported, + NetworkConnection network) { _appRegistry = appRegistry; _fqdn = fqdn; _supported = supported; - _networkDriver = networkDriver; - } - - public void setNetworkDriver(NetworkDriver driver) - { - _delegate.setNetworkDriver(driver); + _network = network; + _sender = _network.getSender(); } public SocketAddress getRemoteAddress() @@ -175,7 +172,7 @@ private static final byte[] AMQP_0_9_1_HEADER = public ProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); } }; @@ -195,7 +192,7 @@ private static final byte[] AMQP_0_9_1_HEADER = public ProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); } }; @@ -215,7 +212,7 @@ private static final byte[] AMQP_0_9_1_HEADER = public ProtocolEngine getProtocolEngine() { - return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver); + return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network); } }; @@ -242,7 +239,7 @@ private static final byte[] AMQP_0_9_1_HEADER = ServerConnection conn = new ServerConnection(); conn.setConnectionDelegate(connDelegate); - return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry); + return new ProtocolEngine_0_10( conn, _network, _appRegistry); } }; @@ -252,19 +249,14 @@ private static final byte[] AMQP_0_9_1_HEADER = private class ClosedDelegateProtocolEngine implements ProtocolEngine { - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getWrittenBytes() @@ -305,22 +297,16 @@ private static final byte[] AMQP_0_9_1_HEADER = private class SelfDelegateProtocolEngine implements ProtocolEngine { - private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES); - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - } - public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getWrittenBytes() @@ -380,14 +366,12 @@ private static final byte[] AMQP_0_9_1_HEADER = // If no delegate is found then send back the most recent support protocol version id if(newDelegate == null) { - _networkDriver.send(ByteBuffer.wrap(newestSupported)); + _sender.send(ByteBuffer.wrap(newestSupported)); _delegate = new ClosedDelegateProtocolEngine(); } else { - newDelegate.setNetworkDriver(_networkDriver); - _delegate = newDelegate; _header.flip(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java index 75358c42d9..09b31f2cc8 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java @@ -20,21 +20,18 @@ */ package org.apache.qpid.server.protocol; -import org.apache.qpid.protocol.ProtocolEngineFactory; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.protocol.ProtocolEngineFactory; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; - -import java.util.Set; -import java.util.Arrays; -import java.util.HashSet; +import org.apache.qpid.transport.network.NetworkConnection; public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory { - ; - - public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 }; private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values())); @@ -68,8 +65,8 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory } - public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver) + public ProtocolEngine newProtocolEngine(NetworkConnection network) { - return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver); + return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index 30d506a89b..42a604e3a5 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -21,13 +21,12 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.network.InputHandler; import org.apache.qpid.transport.network.Assembler; import org.apache.qpid.transport.network.Disassembler; +import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.server.configuration.*; import org.apache.qpid.server.transport.ServerConnection; -import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -38,7 +37,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine { public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; - private NetworkDriver _networkDriver; + private NetworkConnection _network; private long _readBytes; private long _writtenBytes; private ServerConnection _connection; @@ -47,26 +46,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine private long _createTime = System.currentTimeMillis(); public ProtocolEngine_0_10(ServerConnection conn, - NetworkDriver networkDriver, + NetworkConnection network, final IApplicationRegistry appRegistry) { super(new Assembler(conn)); _connection = conn; _connection.setConnectionConfig(this); - _networkDriver = networkDriver; + _network = network; _id = appRegistry.getConfigStore().createId(); _appRegistry = appRegistry; - // FIXME Two log messages to maintain compatinbility with earlier protocol versions - _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); - _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); - } - - public void setNetworkDriver(NetworkDriver driver) - { - _networkDriver = driver; - Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE); - _connection.setSender(dis); + _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE)); _connection.onOpen(new Runnable() { public void run() @@ -75,16 +65,19 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine } }); + // FIXME Two log messages to maintain compatibility with earlier protocol versions + _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false)); + _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true)); } public SocketAddress getRemoteAddress() { - return _networkDriver.getRemoteAddress(); + return _network.getRemoteAddress(); } public SocketAddress getLocalAddress() { - return _networkDriver.getLocalAddress(); + return _network.getLocalAddress(); } public long getReadBytes() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java index b6df0cc0a6..924392eed4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java @@ -483,7 +483,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry try { - acceptor.getNetworkDriver().close(); + acceptor.getNetworkTransport().close(); } catch (Throwable e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java index 4a4253153c..108533ef96 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java @@ -71,7 +71,7 @@ public class BrokerConfigAdapter implements BrokerConfig public Integer getWorkerThreads() { - return _instance.getConfiguration().getProcessors(); + return _instance.getConfiguration().getConnectorProcessors(); } public Integer getMaxConnections() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java index 3ca22b60c8..abbc5a3805 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java @@ -20,21 +20,21 @@ */ package org.apache.qpid.server.transport; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.NetworkTransport; public class QpidAcceptor { - NetworkDriver _driver; + NetworkTransport _transport; String _protocol; - public QpidAcceptor(NetworkDriver driver, String protocol) + public QpidAcceptor(NetworkTransport transport, String protocol) { - _driver = driver; + _transport = transport; _protocol = protocol; } - public NetworkDriver getNetworkDriver() + public NetworkTransport getNetworkTransport() { - return _driver; + return _transport; } public String toString() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java index 484f93cb88..f8640d136f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.configuration; +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -453,13 +455,13 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); serverConfig.initialise(); - assertEquals(4, serverConfig.getProcessors()); + assertEquals(4, serverConfig.getConnectorProcessors()); // Check value we set _config.setProperty("connector.processors", 10); serverConfig = new ServerConfiguration(_config); serverConfig.initialise(); - assertEquals(10, serverConfig.getProcessors()); + assertEquals(10, serverConfig.getConnectorProcessors()); } public void testGetPort() throws ConfigurationException @@ -486,7 +488,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase // Check default ServerConfiguration serverConfig = new ServerConfiguration(_config); serverConfig.initialise(); - assertEquals("wildcard", serverConfig.getBind()); + assertEquals(WILDCARD_ADDRESS, serverConfig.getBind()); // Check value we set _config.setProperty("connector.bind", "a"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index 3b6cd37ea9..2b724af2b1 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -21,7 +21,11 @@ package org.apache.qpid.server.protocol; import java.security.Principal; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.AMQException; @@ -30,14 +34,13 @@ import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; -import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.message.AMQMessage; +import org.apache.qpid.server.message.MessageContentSource; +import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.state.AMQState; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.transport.TestNetworkDriver; +import org.apache.qpid.transport.TestNetworkConnection; public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter { @@ -47,7 +50,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException { - super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver()); + super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection()); _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index 62ceb68208..3acd064fd7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -122,6 +122,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase } catch (Exception e) { + e.printStackTrace(); fail(e.getMessage()); } } |
