summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-07-07 15:10:30 +0000
committerRobert Gemmell <robbie@apache.org>2011-07-07 15:10:30 +0000
commit7c964a16114a1731a384d86d3c26087a3133d9dc (patch)
treec3a15f8d7b32894beeaba7827740df928413b19e /qpid/java/broker
parent093280efbbd98e0e73f2d45da22b3bca993acd0d (diff)
downloadqpid-python-7c964a16114a1731a384d86d3c26087a3133d9dc.tar.gz
QPID-3342: transition TCP based Mina transport for 0-8/0-9/0-9-1 protocols over to new IO interface model
Applied patch by Keith Wall and myself git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1143867 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java45
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java62
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java75
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java44
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java7
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java52
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java19
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java12
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java8
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java15
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java1
14 files changed, 189 insertions, 182 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
index 08e6a24153..3d21afe279 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server;
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -44,6 +46,7 @@ import org.apache.log4j.xml.QpidLog4JConfigurator;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration;
import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
import org.apache.qpid.server.information.management.ServerInformationMBean;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
@@ -59,8 +62,11 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.transport.QpidAcceptor;
import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.mina.MINANetworkDriver;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
+import org.apache.qpid.transport.network.mina.MinaNetworkTransport;
/**
* Main entry point for AMQPD.
@@ -370,7 +376,7 @@ public class Main
- if (bindAddr.equals("wildcard"))
+ if (bindAddr.equals(WILDCARD_ADDRESS))
{
bindAddress = new InetSocketAddress(0).getAddress();
}
@@ -386,15 +392,12 @@ public class Main
String keystorePassword = serverConfig.getKeystorePassword();
String certType = serverConfig.getCertType();
SSLContextFactory sslFactory = null;
-
+
if (!serverConfig.getSSLOnly())
{
for(int port : ports)
{
-
- NetworkDriver driver = new MINANetworkDriver();
-
Set<VERSION> supported = EnumSet.allOf(VERSION.class);
if(exclude_0_10.contains(port))
@@ -415,26 +418,23 @@ public class Main
supported.remove(VERSION.v0_8);
}
+ NetworkTransportConfiguration settings =
+ new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP);
+
+ IncomingNetworkTransport transport = new MinaNetworkTransport();
MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(hostName, supported);
-
-
- driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory,
- serverConfig.getNetworkConfiguration(), null);
+ transport.accept(settings, protocolEngineFactory, sslFactory);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
- new QpidAcceptor(driver,"TCP"));
+ new QpidAcceptor(transport, Transport.TCP));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
-
}
}
if (serverConfig.getEnableSSL())
{
- sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
- NetworkDriver driver = new MINANetworkDriver();
-
String sslPort = commandLine.getOptionValue("s");
int port = 0;
if (null != sslPort)
@@ -446,10 +446,17 @@ public class Main
port = serverConfig.getSSLPort();
}
- driver.bind(port, new InetAddress[]{bindAddress},
- new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
+ NetworkTransportConfiguration settings =
+ new ServerNetworkTransportConfiguration(serverConfig, port, bindAddress.getHostName(), Transport.TCP);
+
+ sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
+
+ IncomingNetworkTransport transport = new MinaNetworkTransport();
+
+ transport.accept(settings, new AMQProtocolEngineFactory(), sslFactory);
+
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
- new QpidAcceptor(driver,"TCP"));
+ new QpidAcceptor(transport,"TCP"));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", port));
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
index 5908eb4bd8..23ab5e8222 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
@@ -20,6 +20,8 @@
package org.apache.qpid.server.configuration;
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
@@ -42,7 +44,7 @@ import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.NetworkDriverConfiguration;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
import sun.misc.Signal;
import sun.misc.SignalHandler;
@@ -61,7 +63,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
public static final int DEFAULT_SSL_PORT = 8672;
public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
public static final int DEFAULT_JMXPORT = 8999;
-
+
public static final String QPID_HOME = "QPID_HOME";
public static final String QPID_WORK = "QPID_WORK";
public static final String LIB_DIR = "lib";
@@ -629,7 +631,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
return getLongValue("flowResumeCapacity", getCapacity());
}
- public int getProcessors()
+ public int getConnectorProcessors()
{
return getIntValue("connector.processors", 4);
}
@@ -661,7 +663,7 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
public String getBind()
{
- return getStringValue("connector.bind", "wildcard");
+ return getStringValue("connector.bind", WILDCARD_ADDRESS);
}
public int getReceiveBufferSize()
@@ -771,58 +773,6 @@ public class ServerConfiguration extends ConfigurationPlugin implements SignalHa
return getConfig().getBoolean("statistics.reporting.reset", false);
}
- public NetworkDriverConfiguration getNetworkConfiguration()
- {
- return new NetworkDriverConfiguration()
- {
-
- public Integer getTrafficClass()
- {
- return null;
- }
-
- public Boolean getTcpNoDelay()
- {
- return ServerConfiguration.this.getTcpNoDelay();
- }
-
- public Integer getSoTimeout()
- {
- return null;
- }
-
- public Integer getSoLinger()
- {
- return null;
- }
-
- public Integer getSendBufferSize()
- {
- return ServerConfiguration.this.getWriteBufferSize();
- }
-
- public Boolean getReuseAddress()
- {
- return null;
- }
-
- public Integer getReceiveBufferSize()
- {
- return ServerConfiguration.this.getReceiveBufferSize();
- }
-
- public Boolean getOOBInline()
- {
- return null;
- }
-
- public Boolean getKeepAlive()
- {
- return null;
- }
- };
- }
-
public int getMaxChannelCount()
{
return getIntValue("maximumChannelCount", 256);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java
new file mode 100644
index 0000000000..81dfcb4465
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.configuration;
+
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+
+public class ServerNetworkTransportConfiguration implements NetworkTransportConfiguration
+{
+ private final ServerConfiguration _serverConfig;
+ private final int _port;
+ private final String _host;
+ private final String _transport;
+
+ public ServerNetworkTransportConfiguration(final ServerConfiguration serverConfig,
+ final int port, final String host,
+ final String transport)
+ {
+ _serverConfig = serverConfig;
+ _port = port;
+ _host = host;
+ _transport = transport;
+ }
+
+ public Boolean getTcpNoDelay()
+ {
+ return _serverConfig.getTcpNoDelay();
+ }
+
+ public Integer getSendBufferSize()
+ {
+ return _serverConfig.getWriteBufferSize();
+ }
+
+ public Integer getReceiveBufferSize()
+ {
+ return _serverConfig.getReceiveBufferSize();
+ }
+
+ public Integer getPort()
+ {
+ return _port;
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public String getTransport()
+ {
+ return _transport;
+ }
+
+ public Integer getConnectorProcessors()
+ {
+ return _serverConfig.getConnectorProcessors();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 449f698c48..9c02cd9d2f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -94,8 +94,8 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
{
@@ -156,8 +156,6 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private AMQPConnectionActor _actor;
private LogSubject _logSubject;
- private NetworkDriver _networkDriver;
-
private long _lastIoTime;
private long _writtenBytes;
@@ -177,20 +175,23 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
+ private final NetworkConnection _network;
+ private final Sender<ByteBuffer> _sender;
+
public ManagedObject getManagedObject()
{
return _managedObject;
}
- public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkDriver driver)
+ public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network)
{
_stateManager = new AMQStateManager(virtualHostRegistry, this);
- _networkDriver = driver;
-
_codecFactory = new AMQCodecFactory(true, this);
_poolReference.acquireExecutorService();
_readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
_writeJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, false);
+ _network = network;
+ _sender = _network.getSender();
_actor = new AMQPConnectionActor(this, virtualHostRegistry.getApplicationRegistry().getRootMessageLogger());
@@ -368,14 +369,14 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
null,
mechanisms.getBytes(),
locales.getBytes());
- _networkDriver.send(responseBody.generateFrame(0).toNioByteBuffer());
+ _sender.send(responseBody.generateFrame(0).toNioByteBuffer());
}
catch (AMQException e)
{
_logger.info("Received unsupported protocol initiation for protocol version: " + getProtocolVersion());
- _networkDriver.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
+ _sender.send(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()).toNioByteBuffer());
}
}
@@ -496,7 +497,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
public void run()
{
- _networkDriver.send(buf);
+ _sender.send(buf);
}
});
}
@@ -688,8 +689,8 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
{
if (delay > 0)
{
- _networkDriver.setMaxWriteIdle(delay);
- _networkDriver.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
+ _network.setMaxWriteIdle(delay);
+ _network.setMaxReadIdle((int) (ApplicationRegistry.getInstance().getConfiguration().getHeartBeatTimeout() * delay));
}
}
@@ -793,7 +794,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public void closeProtocolSession()
{
- _networkDriver.close();
+ _sender.close();
try
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
@@ -828,7 +829,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
*/
public String getLocalFQDN()
{
- SocketAddress address = _networkDriver.getLocalAddress();
+ SocketAddress address = _network.getLocalAddress();
// we use the vmpipe address in some tests hence the need for this rather ugly test. The host
// information is used by SASL primary.
if (address instanceof InetSocketAddress)
@@ -917,7 +918,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public Object getClientIdentifier()
{
- return (_networkDriver != null) ? _networkDriver.getRemoteAddress() : null;
+ return _network.getRemoteAddress();
}
public VirtualHost getVirtualHost()
@@ -976,12 +977,12 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public MethodRegistry getMethodRegistry()
@@ -1011,14 +1012,9 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
// Nothing
}
- public void setNetworkDriver(NetworkDriver driver)
- {
- _networkDriver = driver;
- }
-
public void writerIdle()
{
- _networkDriver.send(HeartbeatBody.FRAME.toNioByteBuffer());
+ _sender.send(HeartbeatBody.FRAME.toNioByteBuffer());
}
public void exception(Throwable throwable)
@@ -1026,7 +1022,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
if (throwable instanceof AMQProtocolHeaderException)
{
writeFrame(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
- _networkDriver.close();
+ _sender.close();
_logger.error("Error in protocol initiation " + this + ":" + getRemoteAddress() + " :" + throwable.getMessage(), throwable);
}
@@ -1044,7 +1040,7 @@ public class AMQProtocolEngine implements ProtocolEngine, Managable, AMQProtocol
writeFrame(closeBody.generateFrame(0));
- _networkDriver.close();
+ _sender.close();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
index 0e4444725e..c8bdcd377b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngineFactory.java
@@ -25,7 +25,7 @@ import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.NetworkConnection;
public class AMQProtocolEngineFactory implements ProtocolEngineFactory
{
@@ -42,9 +42,8 @@ public class AMQProtocolEngineFactory implements ProtocolEngineFactory
}
- public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ public ProtocolEngine newProtocolEngine(NetworkConnection network)
{
- return new AMQProtocolEngine(_vhosts, networkDriver);
+ return new AMQProtocolEngine(_vhosts, network);
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index eb957ee33c..064d8f19a6 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -27,7 +27,8 @@ import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.ConnectionDelegate;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -37,28 +38,24 @@ public class MultiVersionProtocolEngine implements ProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
-
-
- private NetworkDriver _networkDriver;
private Set<VERSION> _supported;
private String _fqdn;
private IApplicationRegistry _appRegistry;
-
+ private NetworkConnection _network;
+ private Sender<ByteBuffer> _sender;
+
private volatile ProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(IApplicationRegistry appRegistry,
String fqdn,
- Set<VERSION> supported, NetworkDriver networkDriver)
+ Set<VERSION> supported,
+ NetworkConnection network)
{
_appRegistry = appRegistry;
_fqdn = fqdn;
_supported = supported;
- _networkDriver = networkDriver;
- }
-
- public void setNetworkDriver(NetworkDriver driver)
- {
- _delegate.setNetworkDriver(driver);
+ _network = network;
+ _sender = _network.getSender();
}
public SocketAddress getRemoteAddress()
@@ -175,7 +172,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
public ProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
}
};
@@ -195,7 +192,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
public ProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
}
};
@@ -215,7 +212,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
public ProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _networkDriver);
+ return new AMQProtocolEngine(_appRegistry.getVirtualHostRegistry(), _network);
}
};
@@ -242,7 +239,7 @@ private static final byte[] AMQP_0_9_1_HEADER =
ServerConnection conn = new ServerConnection();
conn.setConnectionDelegate(connDelegate);
- return new ProtocolEngine_0_10( conn, _networkDriver, _appRegistry);
+ return new ProtocolEngine_0_10( conn, _network, _appRegistry);
}
};
@@ -252,19 +249,14 @@ private static final byte[] AMQP_0_9_1_HEADER =
private class ClosedDelegateProtocolEngine implements ProtocolEngine
{
- public void setNetworkDriver(NetworkDriver driver)
- {
- _networkDriver = driver;
- }
-
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public long getWrittenBytes()
@@ -305,22 +297,16 @@ private static final byte[] AMQP_0_9_1_HEADER =
private class SelfDelegateProtocolEngine implements ProtocolEngine
{
-
private final ByteBuffer _header = ByteBuffer.allocate(MINIMUM_REQUIRED_HEADER_BYTES);
- public void setNetworkDriver(NetworkDriver driver)
- {
- _networkDriver = driver;
- }
-
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public long getWrittenBytes()
@@ -380,14 +366,12 @@ private static final byte[] AMQP_0_9_1_HEADER =
// If no delegate is found then send back the most recent support protocol version id
if(newDelegate == null)
{
- _networkDriver.send(ByteBuffer.wrap(newestSupported));
+ _sender.send(ByteBuffer.wrap(newestSupported));
_delegate = new ClosedDelegateProtocolEngine();
}
else
{
- newDelegate.setNetworkDriver(_networkDriver);
-
_delegate = newDelegate;
_header.flip();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 75358c42d9..09b31f2cc8 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -20,21 +20,18 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
-
-import java.util.Set;
-import java.util.Arrays;
-import java.util.HashSet;
+import org.apache.qpid.transport.network.NetworkConnection;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
- ;
-
-
public enum VERSION { v0_8, v0_9, v0_9_1, v0_10 };
private static final Set<VERSION> ALL_VERSIONS = new HashSet<VERSION>(Arrays.asList(VERSION.values()));
@@ -68,8 +65,8 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
}
- public ProtocolEngine newProtocolEngine(NetworkDriver networkDriver)
+ public ProtocolEngine newProtocolEngine(NetworkConnection network)
{
- return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, networkDriver);
+ return new MultiVersionProtocolEngine(_appRegistry, _fqdn, _supported, network);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index 30d506a89b..42a604e3a5 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -21,13 +21,12 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.transport.NetworkDriver;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
+import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.server.configuration.*;
import org.apache.qpid.server.transport.ServerConnection;
-import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -38,7 +37,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
- private NetworkDriver _networkDriver;
+ private NetworkConnection _network;
private long _readBytes;
private long _writtenBytes;
private ServerConnection _connection;
@@ -47,26 +46,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine
private long _createTime = System.currentTimeMillis();
public ProtocolEngine_0_10(ServerConnection conn,
- NetworkDriver networkDriver,
+ NetworkConnection network,
final IApplicationRegistry appRegistry)
{
super(new Assembler(conn));
_connection = conn;
_connection.setConnectionConfig(this);
- _networkDriver = networkDriver;
+ _network = network;
_id = appRegistry.getConfigStore().createId();
_appRegistry = appRegistry;
- // FIXME Two log messages to maintain compatinbility with earlier protocol versions
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
- _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
- }
-
- public void setNetworkDriver(NetworkDriver driver)
- {
- _networkDriver = driver;
- Disassembler dis = new Disassembler(driver, MAX_FRAME_SIZE);
- _connection.setSender(dis);
+ _connection.setSender(new Disassembler(_network.getSender(), MAX_FRAME_SIZE));
_connection.onOpen(new Runnable()
{
public void run()
@@ -75,16 +65,19 @@ public class ProtocolEngine_0_10 extends InputHandler implements ProtocolEngine
}
});
+ // FIXME Two log messages to maintain compatibility with earlier protocol versions
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, false, false));
+ _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", false, true));
}
public SocketAddress getRemoteAddress()
{
- return _networkDriver.getRemoteAddress();
+ return _network.getRemoteAddress();
}
public SocketAddress getLocalAddress()
{
- return _networkDriver.getLocalAddress();
+ return _network.getLocalAddress();
}
public long getReadBytes()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
index b6df0cc0a6..924392eed4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
@@ -483,7 +483,7 @@ public abstract class ApplicationRegistry implements IApplicationRegistry
try
{
- acceptor.getNetworkDriver().close();
+ acceptor.getNetworkTransport().close();
}
catch (Throwable e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
index 4a4253153c..108533ef96 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
@@ -71,7 +71,7 @@ public class BrokerConfigAdapter implements BrokerConfig
public Integer getWorkerThreads()
{
- return _instance.getConfiguration().getProcessors();
+ return _instance.getConfiguration().getConnectorProcessors();
}
public Integer getMaxConnections()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
index 3ca22b60c8..abbc5a3805 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
@@ -20,21 +20,21 @@
*/
package org.apache.qpid.server.transport;
-import org.apache.qpid.transport.NetworkDriver;
+import org.apache.qpid.transport.network.NetworkTransport;
public class QpidAcceptor
{
- NetworkDriver _driver;
+ NetworkTransport _transport;
String _protocol;
- public QpidAcceptor(NetworkDriver driver, String protocol)
+ public QpidAcceptor(NetworkTransport transport, String protocol)
{
- _driver = driver;
+ _transport = transport;
_protocol = protocol;
}
- public NetworkDriver getNetworkDriver()
+ public NetworkTransport getNetworkTransport()
{
- return _driver;
+ return _transport;
}
public String toString()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
index 484f93cb88..f8640d136f 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.configuration;
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -453,13 +455,13 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
serverConfig.initialise();
- assertEquals(4, serverConfig.getProcessors());
+ assertEquals(4, serverConfig.getConnectorProcessors());
// Check value we set
_config.setProperty("connector.processors", 10);
serverConfig = new ServerConfiguration(_config);
serverConfig.initialise();
- assertEquals(10, serverConfig.getProcessors());
+ assertEquals(10, serverConfig.getConnectorProcessors());
}
public void testGetPort() throws ConfigurationException
@@ -486,7 +488,7 @@ public class ServerConfigurationTest extends InternalBrokerBaseCase
// Check default
ServerConfiguration serverConfig = new ServerConfiguration(_config);
serverConfig.initialise();
- assertEquals("wildcard", serverConfig.getBind());
+ assertEquals(WILDCARD_ADDRESS, serverConfig.getBind());
// Check value we set
_config.setProperty("connector.bind", "a");
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 3b6cd37ea9..2b724af2b1 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -21,7 +21,11 @@
package org.apache.qpid.server.protocol;
import java.security.Principal;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.AMQException;
@@ -30,14 +34,13 @@ import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.transport.TestNetworkDriver;
+import org.apache.qpid.transport.TestNetworkConnection;
public class InternalTestProtocolSession extends AMQProtocolEngine implements ProtocolOutputConverter
{
@@ -47,7 +50,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
public InternalTestProtocolSession(VirtualHost virtualHost) throws AMQException
{
- super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkDriver());
+ super(ApplicationRegistry.getInstance().getVirtualHostRegistry(), new TestNetworkConnection());
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index 62ceb68208..3acd064fd7 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -122,6 +122,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
}
catch (Exception e)
{
+ e.printStackTrace();
fail(e.getMessage());
}
}