From 245f2793e0a4efd4876ad72b2cf32edc93750d84 Mon Sep 17 00:00:00 2001 From: Robert Gemmell Date: Thu, 7 Jul 2011 15:10:30 +0000 Subject: QPID-3342: transition TCP based Mina transport for 0-8/0-9/0-9-1 protocols over to new IO interface model Applied patch by Keith Wall and myself git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1143867 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/protocol/ProtocolEngine.java | 5 - .../qpid/protocol/ProtocolEngineFactory.java | 4 +- .../apache/qpid/transport/ConnectionSettings.java | 2 + .../org/apache/qpid/transport/NetworkDriver.java | 63 --- .../qpid/transport/NetworkDriverConfiguration.java | 44 -- .../transport/NetworkTransportConfiguration.java | 46 ++ .../qpid/transport/SocketConnectorFactory.java | 8 + .../network/IncomingNetworkTransport.java | 30 ++ .../qpid/transport/network/NetworkConnection.java | 4 + .../apache/qpid/transport/network/Transport.java | 27 ++ .../apache/qpid/transport/network/VMBrokerMap.java | 51 ++ .../transport/network/io/IoNetworkConnection.java | 13 +- .../transport/network/mina/MINANetworkDriver.java | 368 -------------- .../network/mina/MinaNetworkConnection.java | 81 ++++ .../transport/network/mina/MinaNetworkHandler.java | 149 ++++++ .../network/mina/MinaNetworkTransport.java | 250 ++++++++++ .../qpid/transport/network/mina/MinaSender.java | 79 +++ .../java/org/apache/qpid/transport/MockSender.java | 47 ++ .../qpid/transport/TestNetworkConnection.java | 138 ++++++ .../apache/qpid/transport/TestNetworkDriver.java | 133 ----- .../network/mina/MINANetworkDriverTest.java | 494 ------------------- .../network/mina/MinaNetworkHandlerTest.java | 535 +++++++++++++++++++++ 22 files changed, 1461 insertions(+), 1110 deletions(-) delete mode 100644 java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java delete mode 100644 java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/Transport.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java delete mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java create mode 100644 java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java create mode 100644 java/common/src/test/java/org/apache/qpid/transport/MockSender.java create mode 100644 java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java delete mode 100644 java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java delete mode 100644 java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java create mode 100644 java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java (limited to 'java/common') diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java index 31953ea6ab..48a3df734a 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java @@ -22,8 +22,6 @@ package org.apache.qpid.protocol; import java.net.SocketAddress; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.transport.NetworkDriver; import org.apache.qpid.transport.Receiver; /** @@ -32,9 +30,6 @@ import org.apache.qpid.transport.Receiver; */ public interface ProtocolEngine extends Receiver { - // Sets the network driver providing data for this ProtocolEngine - void setNetworkDriver (NetworkDriver driver); - // Returns the remote address of the NetworkDriver SocketAddress getRemoteAddress(); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java index 9df84eef90..4e40b78440 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java @@ -20,12 +20,12 @@ */ package org.apache.qpid.protocol; -import org.apache.qpid.transport.NetworkDriver; +import org.apache.qpid.transport.network.NetworkConnection; public interface ProtocolEngineFactory { // Returns a new instance of a ProtocolEngine - ProtocolEngine newProtocolEngine(NetworkDriver networkDriver); + ProtocolEngine newProtocolEngine(NetworkConnection network); } \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java index 08678b213b..2074c77a5b 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java +++ b/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java @@ -30,6 +30,8 @@ import java.util.Map; */ public class ConnectionSettings { + public static final String WILDCARD_ADDRESS = "*"; + String protocol = "tcp"; String host = "localhost"; String vhost; diff --git a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java b/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java deleted file mode 100644 index 86af97bf7e..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriver.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.SocketAddress; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; - -public interface NetworkDriver extends Sender -{ - // Creates a NetworkDriver which attempts to connect to destination on port and attaches the ProtocolEngine to - // it using the SSLContextFactory if provided - void open(int port, InetAddress destination, ProtocolEngine engine, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) - throws OpenException; - - // listens for incoming connections on the specified ports and address and creates a new NetworkDriver which - // processes incoming connections with ProtocolEngines and SSLEngines created from the factories - // (in the case of an SSLContextFactory, if provided) - void bind (int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException; - - // Returns the remote address of the underlying socket - SocketAddress getRemoteAddress(); - - // Returns the local address of the underlying socket - SocketAddress getLocalAddress(); - - /** - * The length of time after which the ProtocolEngines readIdle() method should be called if no data has been - * read in seconds - */ - void setMaxReadIdle(int idleTime); - - /** - * The length of time after which the ProtocolEngines writeIdle() method should be called if no data has been - * written in seconds - */ - void setMaxWriteIdle(int idleTime); - -} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java b/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java deleted file mode 100644 index c38afe5dd5..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/NetworkDriverConfiguration.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -/** - * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing - * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned - * from here if the underlying implementation supports them. - */ -public interface NetworkDriverConfiguration -{ - // Taken from Socket - Boolean getKeepAlive(); - Boolean getOOBInline(); - Boolean getReuseAddress(); - Integer getSoLinger(); // null means off - Integer getSoTimeout(); - Boolean getTcpNoDelay(); - Integer getTrafficClass(); - - // The amount of memory in bytes to allocate to the incoming buffer - Integer getReceiveBufferSize(); - - // The amount of memory in bytes to allocate to the outgoing buffer - Integer getSendBufferSize(); -} diff --git a/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java new file mode 100644 index 0000000000..8d3f7a779a --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/NetworkTransportConfiguration.java @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +/** + * This interface provides a means for NetworkDrivers to configure TCP options such as incoming and outgoing + * buffer sizes and set particular options on the socket. NetworkDrivers should honour the values returned + * from here if the underlying implementation supports them. + */ +public interface NetworkTransportConfiguration +{ + // Taken from Socket + Boolean getTcpNoDelay(); + + // The amount of memory in bytes to allocate to the incoming buffer + Integer getReceiveBufferSize(); + + // The amount of memory in bytes to allocate to the outgoing buffer + Integer getSendBufferSize(); + + Integer getPort(); + + String getHost(); + + String getTransport(); + + Integer getConnectorProcessors(); +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java b/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java new file mode 100644 index 0000000000..2c7652abeb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/SocketConnectorFactory.java @@ -0,0 +1,8 @@ +package org.apache.qpid.transport; + +import org.apache.mina.common.IoConnector; + +public interface SocketConnectorFactory +{ + IoConnector newConnector(); +} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java new file mode 100644 index 0000000000..7099916c33 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.NetworkTransportConfiguration; + +public interface IncomingNetworkTransport extends NetworkTransport +{ + public void accept(NetworkTransportConfiguration config, ProtocolEngineFactory factory, SSLContextFactory sslFactory); +} \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java index 80b32ea909..1f69973b96 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java @@ -40,4 +40,8 @@ public interface NetworkConnection * Returns the local address of the underlying socket. */ SocketAddress getLocalAddress(); + + void setMaxWriteIdle(int sec); + + void setMaxReadIdle(int sec); } \ No newline at end of file diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java new file mode 100644 index 0000000000..4b8a0baf75 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/Transport.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +public class Transport +{ + public static final String TCP = "tcp"; + public static final String VM = "vm"; +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java b/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java new file mode 100644 index 0000000000..acc55c2e2d --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/VMBrokerMap.java @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.mina.transport.vmpipe.VmPipeAddress; + +public class VMBrokerMap +{ + private static final Map _map = new HashMap(); + + public static void add(int port, VmPipeAddress pipe) + { + _map.put(port, pipe); + } + + public static VmPipeAddress remove(int port) + { + return _map.remove(port); + } + + public static void clear() + { + _map.clear(); + } + + public static boolean contains(int port) + { + return _map.containsKey(port); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java index 3252544fee..cca1fc46c9 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java @@ -78,5 +78,16 @@ public class IoNetworkConnection implements NetworkConnection { return _socket.getLocalSocketAddress(); } - + + public void setMaxWriteIdle(int sec) + { + // TODO implement support for setting heartbeating config in this way + // Currently a socket timeout is used in IoSender + } + + public void setMaxReadIdle(int sec) + { + // TODO implement support for setting heartbeating config in this way + // Currently a socket timeout is used in IoSender + } } diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java deleted file mode 100644 index 2206e0999e..0000000000 --- a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network.mina; - -import org.apache.mina.common.ConnectFuture; -import org.apache.mina.common.ExecutorThreadModel; -import org.apache.mina.common.IdleStatus; -import org.apache.mina.common.IoAcceptor; -import org.apache.mina.common.IoConnector; -import org.apache.mina.common.IoHandlerAdapter; -import org.apache.mina.common.IoSession; -import org.apache.mina.common.SimpleByteBufferAllocator; -import org.apache.mina.common.WriteFuture; -import org.apache.mina.filter.SSLFilter; -import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; -import org.apache.mina.transport.socket.nio.SocketConnector; -import org.apache.mina.transport.socket.nio.SocketConnectorConfig; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; -import org.apache.mina.util.NewThreadExecutor; -import org.apache.mina.util.SessionUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; -import org.apache.qpid.thread.QpidThreadExecutor; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.NetworkDriverConfiguration; -import org.apache.qpid.transport.OpenException; - -import java.io.IOException; -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; - -public class MINANetworkDriver extends IoHandlerAdapter implements NetworkDriver -{ - - private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; - - ProtocolEngine _protocolEngine; - private int _processors = 4; - private SSLContextFactory _sslFactory = null; - private IoConnector _socketConnector; - private IoAcceptor _acceptor; - private IoSession _ioSession; - private ProtocolEngineFactory _factory; - private Throwable _lastException; - private boolean _acceptingConnections = false; - - private WriteFuture _lastWriteFuture; - - private static final Logger _logger = LoggerFactory.getLogger(MINANetworkDriver.class); - - static - { - org.apache.mina.common.ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers")); - - //override the MINA defaults to prevent use of the PooledByteBufferAllocator - org.apache.mina.common.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); - } - - public MINANetworkDriver(int processors, ProtocolEngine protocolEngine, IoSession session) - { - _processors = processors; - _protocolEngine = protocolEngine; - _ioSession = session; - _ioSession.setAttachment(_protocolEngine); - } - - public MINANetworkDriver() - { - - } - - public MINANetworkDriver(IoConnector ioConnector) - { - _socketConnector = ioConnector; - } - - public MINANetworkDriver(IoConnector ioConnector, ProtocolEngine engine) - { - _socketConnector = ioConnector; - _protocolEngine = engine; - } - - public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory factory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException - { - - _factory = factory; - - _acceptor = new org.apache.mina.transport.socket.nio.SocketAcceptor(_processors, new NewThreadExecutor()); - - SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); - sconfig.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Acceptor)")); - SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); - - if (config != null) - { - sc.setReceiveBufferSize(config.getReceiveBufferSize()); - sc.setSendBufferSize(config.getSendBufferSize()); - sc.setTcpNoDelay(config.getTcpNoDelay()); - } - - if (sslFactory != null) - { - _sslFactory = sslFactory; - } - - if (addresses != null && addresses.length > 0) - { - for (InetAddress addr : addresses) - { - try - { - _acceptor.bind(new InetSocketAddress(addr, port), this, sconfig); - } - catch (IOException e) - { - throw new BindException(String.format("Could not bind to %1s:%2s", addr, port)); - } - } - } - else - { - try - { - _acceptor.bind(new InetSocketAddress(port), this, sconfig); - } - catch (IOException e) - { - throw new BindException(String.format("Could not bind to *:%1s", port)); - } - } - _acceptingConnections = true; - } - - public SocketAddress getRemoteAddress() - { - return _ioSession.getRemoteAddress(); - } - - public SocketAddress getLocalAddress() - { - return _ioSession.getLocalAddress(); - } - - - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, - SSLContextFactory sslFactory) throws OpenException - { - if (sslFactory != null) - { - _sslFactory = sslFactory; - } - - _socketConnector = new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector - - SocketConnectorConfig cfg = (SocketConnectorConfig) _socketConnector.getDefaultConfig(); - String s = ""; - StackTraceElement[] trace = Thread.currentThread().getStackTrace(); - for(StackTraceElement elt : trace) - { - if(elt.getClassName().contains("Test")) - { - s = elt.getClassName(); - break; - } - } - cfg.setThreadModel(ExecutorThreadModel.getInstance("MINANetworkDriver(Client)-"+s)); - - SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); - scfg.setTcpNoDelay((config != null) ? config.getTcpNoDelay() : true); - scfg.setSendBufferSize((config != null) ? config.getSendBufferSize() : DEFAULT_BUFFER_SIZE); - scfg.setReceiveBufferSize((config != null) ? config.getReceiveBufferSize() : DEFAULT_BUFFER_SIZE); - - // Don't have the connector's worker thread wait around for other - // connections (we only use - // one SocketConnector per connection at the moment anyway). This allows - // short-running - // clients (like unit tests) to complete quickly. - if (_socketConnector instanceof SocketConnector) - { - ((SocketConnector) _socketConnector).setWorkerTimeout(0); - } - - ConnectFuture future = _socketConnector.connect(new InetSocketAddress(destination, port), this, cfg); - future.join(); - if (!future.isConnected()) - { - throw new OpenException("Could not open connection", _lastException); - } - _ioSession = future.getSession(); - _ioSession.setAttachment(engine); - engine.setNetworkDriver(this); - _protocolEngine = engine; - } - - public void setMaxReadIdle(int idleTime) - { - _ioSession.setIdleTime(IdleStatus.READER_IDLE, idleTime); - } - - public void setMaxWriteIdle(int idleTime) - { - _ioSession.setIdleTime(IdleStatus.WRITER_IDLE, idleTime); - } - - public void close() - { - if (_lastWriteFuture != null) - { - _lastWriteFuture.join(); - } - if (_acceptor != null) - { - _acceptor.unbindAll(); - } - if (_ioSession != null) - { - _ioSession.close(); - } - } - - public void flush() - { - if (_lastWriteFuture != null) - { - _lastWriteFuture.join(); - } - } - - public void send(ByteBuffer msg) - { - org.apache.mina.common.ByteBuffer minaBuf = org.apache.mina.common.ByteBuffer.allocate(msg.capacity()); - minaBuf.put(msg); - minaBuf.flip(); - _lastWriteFuture = _ioSession.write(minaBuf); - } - - public void setIdleTimeout(int i) - { - // MINA doesn't support setting SO_TIMEOUT - } - - public void exceptionCaught(IoSession protocolSession, Throwable throwable) throws Exception - { - if (_protocolEngine != null) - { - _protocolEngine.exception(throwable); - } - else - { - _logger.error("Exception thrown and no ProtocolEngine to handle it", throwable); - } - _lastException = throwable; - } - - /** - * Invoked when a message is received on a particular protocol session. Note - * that a protocol session is directly tied to a particular physical - * connection. - * - * @param protocolSession - * the protocol session that received the message - * @param message - * the message itself (i.e. a decoded frame) - * - * @throws Exception - * if the message cannot be processed - */ - public void messageReceived(IoSession protocolSession, Object message) throws Exception - { - if (message instanceof org.apache.mina.common.ByteBuffer) - { - ((ProtocolEngine) protocolSession.getAttachment()).received(((org.apache.mina.common.ByteBuffer) message).buf()); - } - else - { - throw new IllegalStateException("Handed unhandled message. message.class = " + message.getClass() + " message = " + message); - } - } - - public void sessionClosed(IoSession protocolSession) throws Exception - { - ((ProtocolEngine) protocolSession.getAttachment()).closed(); - } - - public void sessionCreated(IoSession protocolSession) throws Exception - { - // Configure the session with SSL if necessary - SessionUtil.initialize(protocolSession); - if (_sslFactory != null) - { - protocolSession.getFilterChain().addBefore("protocolFilter", "sslFilter", - new SSLFilter(_sslFactory.buildServerContext())); - } - - if (_ioSession == null) - { - _ioSession = protocolSession; - } - - if (_acceptingConnections) - { - // Set up the protocol engine - ProtocolEngine protocolEngine = _factory.newProtocolEngine(this); - MINANetworkDriver newDriver = new MINANetworkDriver(_processors, protocolEngine, protocolSession); - protocolEngine.setNetworkDriver(newDriver); - } - } - - public void sessionIdle(IoSession session, IdleStatus status) throws Exception - { - if (IdleStatus.WRITER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).writerIdle(); - } - else if (IdleStatus.READER_IDLE.equals(status)) - { - ((ProtocolEngine) session.getAttachment()).readerIdle(); - } - } - - private ProtocolEngine getProtocolEngine() - { - return _protocolEngine; - } - - public void setProtocolEngineFactory(ProtocolEngineFactory engineFactory, boolean acceptingConnections) - { - _factory = engineFactory; - _acceptingConnections = acceptingConnections; - } - - public void setProtocolEngine(ProtocolEngine protocolEngine) - { - _protocolEngine = protocolEngine; - if (_ioSession != null) - { - _ioSession.setAttachment(protocolEngine); - } - } - -} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java new file mode 100644 index 0000000000..0f433f6eeb --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkConnection.java @@ -0,0 +1,81 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoSession; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; + +public class MinaNetworkConnection implements NetworkConnection +{ + private IoSession _session; + private Sender _sender; + + public MinaNetworkConnection(IoSession session) + { + _session = session; + _sender = new MinaSender(_session); + } + + public Sender getSender() + { + return _sender; + } + + public void close() + { + _session.close(); + } + + public SocketAddress getRemoteAddress() + { + return _session.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _session.getLocalAddress(); + } + + public long getReadBytes() + { + return _session.getReadBytes(); + } + + public long getWrittenBytes() + { + return _session.getWrittenBytes(); + } + + public void setMaxWriteIdle(int sec) + { + _session.setIdleTime(IdleStatus.WRITER_IDLE, sec); + } + + public void setMaxReadIdle(int sec) + { + _session.setIdleTime(IdleStatus.READER_IDLE, sec); + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java new file mode 100644 index 0000000000..c00187480c --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkHandler.java @@ -0,0 +1,149 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.mina; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.IdleStatus; +import org.apache.mina.common.IoHandlerAdapter; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.filter.SSLFilter; +import org.apache.mina.util.SessionUtil; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.NetworkConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MinaNetworkHandler extends IoHandlerAdapter +{ + private static final Logger LOGGER = LoggerFactory.getLogger(MinaNetworkHandler.class); + + private ProtocolEngineFactory _factory; + private SSLContextFactory _sslFactory = null; + + static + { + boolean directBuffers = Boolean.getBoolean("amqj.enableDirectBuffers"); + LOGGER.debug("Using " + (directBuffers ? "direct" : "heap") + " buffers"); + ByteBuffer.setUseDirectBuffers(directBuffers); + + //override the MINA defaults to prevent use of the PooledByteBufferAllocator + ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + } + + public MinaNetworkHandler(SSLContextFactory sslFactory, ProtocolEngineFactory factory) + { + _sslFactory = sslFactory; + _factory = factory; + } + + public MinaNetworkHandler(SSLContextFactory sslFactory) + { + this(sslFactory, null); + } + + public void messageReceived(IoSession session, Object message) + { + ProtocolEngine engine = (ProtocolEngine) session.getAttachment(); + ByteBuffer buf = (ByteBuffer) message; + try + { + engine.received(buf.buf()); + } + catch (RuntimeException re) + { + engine.exception(re); + } + } + + public void exceptionCaught(IoSession ioSession, Throwable throwable) throws Exception + { + ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); + if(engine != null) + { + LOGGER.error("Exception caught by Mina", throwable); + engine.exception(throwable); + } + else + { + LOGGER.error("Exception caught by Mina but without protocol engine to handle it", throwable); + } + } + + public void sessionCreated(IoSession ioSession) throws Exception + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Created session: " + ioSession.getRemoteAddress()); + } + + SessionUtil.initialize(ioSession); + + if (_sslFactory != null) + { + ioSession.getFilterChain().addBefore("protocolFilter", "sslFilter", + new SSLFilter(_sslFactory.buildServerContext())); + } + + if (_factory != null) + { + NetworkConnection netConn = new MinaNetworkConnection(ioSession); + + ProtocolEngine engine = _factory.newProtocolEngine(netConn); + ioSession.setAttachment(engine); + } + } + + public void sessionClosed(IoSession ioSession) throws Exception + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("closed: " + ioSession.getRemoteAddress()); + } + + ProtocolEngine engine = (ProtocolEngine) ioSession.getAttachment(); + if(engine != null) + { + engine.closed(); + } + else + { + LOGGER.error("Unable to close ProtocolEngine as none was present"); + } + } + + + public void sessionIdle(IoSession session, IdleStatus status) throws Exception + { + if (IdleStatus.WRITER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).writerIdle(); + } + else if (IdleStatus.READER_IDLE.equals(status)) + { + ((ProtocolEngine) session.getAttachment()).readerIdle(); + } + } + +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java new file mode 100644 index 0000000000..62f9429f30 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaNetworkTransport.java @@ -0,0 +1,250 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; + +import org.apache.mina.common.ConnectFuture; +import org.apache.mina.common.ExecutorThreadModel; +import org.apache.mina.common.IoConnector; +import org.apache.mina.common.IoSession; +import org.apache.mina.transport.socket.nio.SocketAcceptor; +import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; +import org.apache.mina.transport.socket.nio.SocketConnector; +import org.apache.mina.transport.socket.nio.SocketConnectorConfig; +import org.apache.mina.transport.socket.nio.SocketSessionConfig; +import org.apache.mina.util.NewThreadExecutor; +import org.apache.mina.transport.vmpipe.QpidVmPipeConnector; +import org.apache.mina.transport.vmpipe.VmPipeAddress; + +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.thread.QpidThreadExecutor; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.SocketConnectorFactory; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; +import org.apache.qpid.transport.network.VMBrokerMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MinaNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport +{ + private static final int UNKNOWN = -1; + private static final int TCP = 0; + private static final int VM = 1; + + public NetworkConnection _connection; + private SocketAcceptor _acceptor; + private InetSocketAddress _address; + + public NetworkConnection connect(ConnectionSettings settings, + Receiver delegate, SSLContextFactory sslFactory) + { + int transport = getTransport(settings.getProtocol()); + + IoConnectorCreator stc; + switch(transport) + { + case TCP: + stc = new IoConnectorCreator(new SocketConnectorFactory() + { + public IoConnector newConnector() + { + return new SocketConnector(1, new QpidThreadExecutor()); // non-blocking connector + } + }); + _connection = stc.connect(delegate, settings, sslFactory); + break; + case VM: + stc = new IoConnectorCreator(new SocketConnectorFactory() + { + public IoConnector newConnector() + { + return new QpidVmPipeConnector(); + } + }); + _connection = stc.connect(delegate, settings, sslFactory); + break; + case UNKNOWN: + default: + throw new TransportException("Unknown protocol: " + settings.getProtocol()); + } + + return _connection; + } + + private static int getTransport(String transport) + { + if (transport.equals(Transport.TCP)) + { + return TCP; + } + + if (transport.equals(Transport.VM)) + { + return VM; + } + + return -1; + } + + public void close() + { + if(_connection != null) + { + _connection.close(); + } + if (_acceptor != null) + { + _acceptor.unbindAll(); + } + } + + public NetworkConnection getConnection() + { + return _connection; + } + + public void accept(final NetworkTransportConfiguration config, final ProtocolEngineFactory factory, + final SSLContextFactory sslFactory) + { + int processors = config.getConnectorProcessors(); + + if (Transport.TCP.equalsIgnoreCase(config.getTransport())) + { + _acceptor = new SocketAcceptor(processors, new NewThreadExecutor()); + + SocketAcceptorConfig sconfig = (SocketAcceptorConfig) _acceptor.getDefaultConfig(); + sconfig.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Acceptor)")); + SocketSessionConfig sc = (SocketSessionConfig) sconfig.getSessionConfig(); + sc.setTcpNoDelay(config.getTcpNoDelay()); + sc.setSendBufferSize(config.getSendBufferSize()); + sc.setReceiveBufferSize(config.getReceiveBufferSize()); + + if (config.getHost().equals(WILDCARD_ADDRESS)) + { + _address = new InetSocketAddress(config.getPort()); + } + else + { + _address = new InetSocketAddress(config.getHost(), config.getPort()); + } + } + else + { + throw new TransportException("Unknown transport: " + config.getTransport()); + } + + try + { + _acceptor.bind(_address, new MinaNetworkHandler(sslFactory, factory)); + } + catch (IOException e) + { + throw new TransportException("Could not bind to " + _address, e); + } + } + + + private static class IoConnectorCreator + { + private static final Logger LOGGER = LoggerFactory.getLogger(IoConnectorCreator.class); + + private static final int CLIENT_DEFAULT_BUFFER_SIZE = 32 * 1024; + + private SocketConnectorFactory _ioConnectorFactory; + + public IoConnectorCreator(SocketConnectorFactory socketConnectorFactory) + { + _ioConnectorFactory = socketConnectorFactory; + } + + public NetworkConnection connect(Receiver receiver, ConnectionSettings settings, SSLContextFactory sslFactory) + { + final IoConnector ioConnector = _ioConnectorFactory.newConnector(); + final SocketAddress address; + final String protocol = settings.getProtocol(); + final int port = settings.getPort(); + + if (Transport.TCP.equalsIgnoreCase(protocol)) + { + address = new InetSocketAddress(settings.getHost(), port); + } + else if(Transport.VM.equalsIgnoreCase(protocol)) + { + synchronized (VMBrokerMap.class) + { + if(!VMBrokerMap.contains(port)) + { + throw new TransportException("VM broker on port " + port + " does not exist."); + } + } + + address = new VmPipeAddress(port); + } + else + { + throw new TransportException("Unknown transport: " + protocol); + } + + LOGGER.info("Attempting connection to " + address); + + if (ioConnector instanceof SocketConnector) + { + SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); + cfg.setThreadModel(ExecutorThreadModel.getInstance("MinaNetworkTransport(Client)")); + + SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); + scfg.setTcpNoDelay(true); + scfg.setSendBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); + scfg.setReceiveBufferSize(CLIENT_DEFAULT_BUFFER_SIZE); + + // Don't have the connector's worker thread wait around for other + // connections (we only use one SocketConnector per connection + // at the moment anyway). This allows short-running + // clients (like unit tests) to complete quickly. + ((SocketConnector) ioConnector).setWorkerTimeout(0); + } + + ConnectFuture future = ioConnector.connect(address, new MinaNetworkHandler(sslFactory), ioConnector.getDefaultConfig()); + future.join(); + if (!future.isConnected()) + { + throw new TransportException("Could not open connection"); + } + + IoSession session = future.getSession(); + session.setAttachment(receiver); + + return new MinaNetworkConnection(session); + } + } +} diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java new file mode 100644 index 0000000000..be114e2fa1 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaSender.java @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.mina; + +import org.apache.mina.common.ByteBuffer; +import org.apache.mina.common.CloseFuture; +import org.apache.mina.common.IoSession; +import org.apache.mina.common.WriteFuture; +import org.apache.qpid.transport.Sender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MinaSender + */ +public class MinaSender implements Sender +{ + private static final Logger _log = LoggerFactory.getLogger(MinaSender.class); + + private final IoSession _session; + private WriteFuture _lastWrite; + + public MinaSender(IoSession session) + { + _session = session; + } + + public synchronized void send(java.nio.ByteBuffer msg) + { + _log.debug("sending data:"); + ByteBuffer mina = ByteBuffer.allocate(msg.limit()); + mina.put(msg); + mina.flip(); + _lastWrite = _session.write(mina); + _log.debug("sent data:"); + } + + public synchronized void flush() + { + if (_lastWrite != null) + { + _lastWrite.join(); + } + } + + public void close() + { + // MINA will sometimes throw away in-progress writes when you ask it to close + flush(); + CloseFuture closed = _session.close(); + closed.join(); + } + + public void setIdleTimeout(int i) + { + //TODO: + //We are instead using the setMax[Read|Write]IdleTime methods in + //MinaNetworkConnection for this. Should remove this method from + //sender interface, but currently being used by IoSender for 0-10. + } +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/MockSender.java b/java/common/src/test/java/org/apache/qpid/transport/MockSender.java new file mode 100644 index 0000000000..4b38b7318a --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/MockSender.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.nio.ByteBuffer; + +public class MockSender implements Sender +{ + + public void setIdleTimeout(int i) + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void flush() + { + + } + + public void close() + { + + } +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java new file mode 100644 index 0000000000..8686c17414 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkConnection.java @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport; + +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.ssl.SSLContextFactory; +import org.apache.qpid.transport.network.NetworkConnection; + +/** + * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, + * so if this class is being used and some methods are to be used, then please update those. + */ +public class TestNetworkConnection implements NetworkConnection +{ + private String _remoteHost = "127.0.0.1"; + private String _localHost = "127.0.0.1"; + private int _port = 1; + private SocketAddress _localAddress = null; + private SocketAddress _remoteAddress = null; + private final MockSender _sender; + + public TestNetworkConnection() + { + _sender = new MockSender(); + } + + public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, + NetworkTransportConfiguration config, SSLContextFactory sslFactory) throws BindException + { + + } + + public SocketAddress getLocalAddress() + { + return (_localAddress != null) ? _localAddress : new InetSocketAddress(_localHost, _port); + } + + public SocketAddress getRemoteAddress() + { + return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost, _port); + } + + public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkTransportConfiguration config, + SSLContextFactory sslFactory) throws OpenException + { + + } + + public void setMaxReadIdle(int idleTime) + { + + } + + public void setMaxWriteIdle(int idleTime) + { + + } + + public void close() + { + + } + + public void flush() + { + + } + + public void send(ByteBuffer msg) + { + + } + + public void setIdleTimeout(int i) + { + + } + + public void setPort(int port) + { + _port = port; + } + + public int getPort() + { + return _port; + } + + public void setLocalHost(String host) + { + _localHost = host; + } + + public void setRemoteHost(String host) + { + _remoteHost = host; + } + + public void setLocalAddress(SocketAddress address) + { + _localAddress = address; + } + + public void setRemoteAddress(SocketAddress address) + { + _remoteAddress = address; + } + + public Sender getSender() + { + return _sender; + } +} diff --git a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java b/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java deleted file mode 100644 index 957a7190ee..0000000000 --- a/java/common/src/test/java/org/apache/qpid/transport/TestNetworkDriver.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.ssl.SSLContextFactory; - -/** - * Test implementation of IoSession, which is required for some tests. Methods not being used are not implemented, - * so if this class is being used and some methods are to be used, then please update those. - */ -public class TestNetworkDriver implements NetworkDriver -{ - private final ConcurrentMap attributes = new ConcurrentHashMap(); - private String _remoteHost = "127.0.0.1"; - private String _localHost = "127.0.0.1"; - private int _port = 1; - private SocketAddress _localAddress = null; - private SocketAddress _remoteAddress = null; - - public TestNetworkDriver() - { - } - - public void bind(int port, InetAddress[] addresses, ProtocolEngineFactory protocolFactory, - NetworkDriverConfiguration config, SSLContextFactory sslFactory) throws BindException - { - - } - - public SocketAddress getLocalAddress() - { - return (_localAddress != null) ? _localAddress : new InetSocketAddress(_localHost, _port); - } - - public SocketAddress getRemoteAddress() - { - return (_remoteAddress != null) ? _remoteAddress : new InetSocketAddress(_remoteHost, _port); - } - - public void open(int port, InetAddress destination, ProtocolEngine engine, NetworkDriverConfiguration config, - SSLContextFactory sslFactory) throws OpenException - { - - } - - public void setMaxReadIdle(int idleTime) - { - - } - - public void setMaxWriteIdle(int idleTime) - { - - } - - public void close() - { - - } - - public void flush() - { - - } - - public void send(ByteBuffer msg) - { - - } - - public void setIdleTimeout(int i) - { - - } - - public void setPort(int port) - { - _port = port; - } - - public int getPort() - { - return _port; - } - - public void setLocalHost(String host) - { - _localHost = host; - } - - public void setRemoteHost(String host) - { - _remoteHost = host; - } - - public void setLocalAddress(SocketAddress address) - { - _localAddress = address; - } - - public void setRemoteAddress(SocketAddress address) - { - _remoteAddress = address; - } -} diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java deleted file mode 100644 index fc8e689ca4..0000000000 --- a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MINANetworkDriverTest.java +++ /dev/null @@ -1,494 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.transport.network.mina; - -import java.net.BindException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import junit.framework.TestCase; - -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.NetworkDriver; -import org.apache.qpid.transport.OpenException; - -public class MINANetworkDriverTest extends TestCase -{ - - private static final String TEST_DATA = "YHALOTHAR"; - private static int TEST_PORT = 2323; - private NetworkDriver _server; - private NetworkDriver _client; - private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read - private Exception _thrownEx; - - @Override - public void setUp() - { - _server = new MINANetworkDriver(); - _client = new MINANetworkDriver(); - _thrownEx = null; - _countingEngine = new CountingProtocolEngine(); - // increment the port to prevent tests clashing with each other when - // the port is in TIMED_WAIT state. - TEST_PORT++; - } - - @Override - public void tearDown() - { - if (_server != null) - { - _server.close(); - } - - if (_client != null) - { - _client.close(); - } - } - - /** - * Tests that a socket can't be opened if a driver hasn't been bound - * to the port and can be opened if a driver has been bound. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - */ - public void testBindOpen() throws BindException, UnknownHostException, OpenException - { - try - { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - } - catch (OpenException e) - { - _thrownEx = e; - } - - assertNotNull("Open should have failed since no engine bound", _thrownEx); - - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - } - - /** - * Tests that a socket can't be opened after a bound NetworkDriver has been closed - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - */ - public void testBindOpenCloseOpen() throws BindException, UnknownHostException, OpenException - { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - _client.close(); - _server.close(); - - try - { - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - } - catch (OpenException e) - { - _thrownEx = e; - } - assertNotNull("Open should have failed", _thrownEx); - } - - /** - * Checks that the right exception is thrown when binding a NetworkDriver to an already - * existing socket. - */ - public void testBindPortInUse() - { - try - { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - } - catch (BindException e) - { - fail("First bind should not fail"); - } - - try - { - _client.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - } - catch (BindException e) - { - _thrownEx = e; - } - assertNotNull("Second bind should throw BindException", _thrownEx); - } - - /** - * tests that bytes sent on a network driver are received at the other end - * - * @throws UnknownHostException - * @throws OpenException - * @throws InterruptedException - * @throws BindException - */ - public void testSend() throws UnknownHostException, OpenException, InterruptedException, BindException - { - // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - - // Tell the counting engine how much data we're sending - _countingEngine.setNewLatch(TEST_DATA.getBytes().length); - - // Send the data and wait for up to 2 seconds to get it back - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); - _countingEngine.getLatch().await(2, TimeUnit.SECONDS); - - // Check what we got - assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes()); - } - - /** - * Opens a connection with a low read idle and check that it gets triggered - * @throws BindException - * @throws OpenException - * @throws UnknownHostException - * - */ - public void testSetReadIdle() throws BindException, UnknownHostException, OpenException - { - // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle()); - _client.setMaxReadIdle(1); - sleepForAtLeast(1500); - assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle()); - } - - /** - * Opens a connection with a low write idle and check that it gets triggered - * @throws BindException - * @throws OpenException - * @throws UnknownHostException - * - */ - public void testSetWriteIdle() throws BindException, UnknownHostException, OpenException - { - // Open a connection from a counting engine to an echo engine - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle()); - _client.setMaxWriteIdle(1); - sleepForAtLeast(1500); - assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle()); - } - - - /** - * Creates and then closes a connection from client to server and checks that the server - * has its closed() method called. Then creates a new client and closes the server to check - * that the client has its closed() method called. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - */ - public void testClosed() throws BindException, UnknownHostException, OpenException - { - // Open a connection from a counting engine to an echo engine - EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory(); - _server.bind(TEST_PORT, null, factory, null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - EchoProtocolEngine serverEngine = null; - while (serverEngine == null) - { - serverEngine = factory.getEngine(); - if (serverEngine == null) - { - try - { - Thread.sleep(10); - } - catch (InterruptedException e) - { - } - } - } - assertFalse("Server should not have been closed", serverEngine.getClosed()); - serverEngine.setNewLatch(1); - _client.close(); - try - { - serverEngine.getLatch().await(2, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - } - assertTrue("Server should have been closed", serverEngine.getClosed()); - - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - _countingEngine.setClosed(false); - assertFalse("Client should not have been closed", _countingEngine.getClosed()); - _countingEngine.setNewLatch(1); - _server.close(); - try - { - _countingEngine.getLatch().await(2, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - } - assertTrue("Client should have been closed", _countingEngine.getClosed()); - } - - /** - * Create a connection and instruct the client to throw an exception when it gets some data - * and that the latch gets counted down. - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - * @throws InterruptedException - */ - public void testExceptionCaught() throws BindException, UnknownHostException, OpenException, InterruptedException - { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - - - assertEquals("Exception should not have been thrown", 1, - _countingEngine.getExceptionLatch().getCount()); - _countingEngine.setErrorOnNextRead(true); - _countingEngine.setNewLatch(TEST_DATA.getBytes().length); - _client.send(ByteBuffer.wrap(TEST_DATA.getBytes())); - _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); - assertEquals("Exception should have been thrown", 0, - _countingEngine.getExceptionLatch().getCount()); - } - - /** - * Opens a connection and checks that the remote address is the one that was asked for - * @throws BindException - * @throws UnknownHostException - * @throws OpenException - */ - public void testGetRemoteAddress() throws BindException, UnknownHostException, OpenException - { - _server.bind(TEST_PORT, null, new EchoProtocolEngineSingletonFactory(), null, null); - _client.open(TEST_PORT, InetAddress.getLocalHost(), _countingEngine, null, null); - assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), TEST_PORT), - _client.getRemoteAddress()); - } - - private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory - { - EchoProtocolEngine _engine = null; - - public ProtocolEngine newProtocolEngine(NetworkDriver driver) - { - if (_engine == null) - { - _engine = new EchoProtocolEngine(); - _engine.setNetworkDriver(driver); - } - return getEngine(); - } - - public EchoProtocolEngine getEngine() - { - return _engine; - } - } - - public class CountingProtocolEngine implements ProtocolEngine - { - - protected NetworkDriver _driver; - public ArrayList _receivedBytes = new ArrayList(); - private int _readBytes; - private CountDownLatch _latch = new CountDownLatch(0); - private boolean _readerHasBeenIdle; - private boolean _writerHasBeenIdle; - private boolean _closed = false; - private boolean _nextReadErrors = false; - private CountDownLatch _exceptionLatch = new CountDownLatch(1); - - public void closed() - { - setClosed(true); - _latch.countDown(); - } - - public void setErrorOnNextRead(boolean b) - { - _nextReadErrors = b; - } - - public void setNewLatch(int length) - { - _latch = new CountDownLatch(length); - } - - public long getReadBytes() - { - return _readBytes; - } - - public SocketAddress getRemoteAddress() - { - if (_driver != null) - { - return _driver.getRemoteAddress(); - } - else - { - return null; - } - } - - public SocketAddress getLocalAddress() - { - if (_driver != null) - { - return _driver.getLocalAddress(); - } - else - { - return null; - } - } - - public long getWrittenBytes() - { - return 0; - } - - public void readerIdle() - { - _readerHasBeenIdle = true; - } - - public void setNetworkDriver(NetworkDriver driver) - { - _driver = driver; - } - - public void writeFrame(AMQDataBlock frame) - { - - } - - public void writerIdle() - { - _writerHasBeenIdle = true; - } - - public void exception(Throwable t) - { - _exceptionLatch.countDown(); - } - - public CountDownLatch getExceptionLatch() - { - return _exceptionLatch; - } - - public void received(ByteBuffer msg) - { - // increment read bytes and count down the latch for that many - int bytes = msg.remaining(); - _readBytes += bytes; - for (int i = 0; i < bytes; i++) - { - _latch.countDown(); - } - - // Throw an error if we've been asked too, but we can still count - if (_nextReadErrors) - { - throw new RuntimeException("Was asked to error"); - } - } - - public CountDownLatch getLatch() - { - return _latch; - } - - public boolean getWriterHasBeenIdle() - { - return _writerHasBeenIdle; - } - - public boolean getReaderHasBeenIdle() - { - return _readerHasBeenIdle; - } - - public void setClosed(boolean _closed) - { - this._closed = _closed; - } - - public boolean getClosed() - { - return _closed; - } - - } - - private class EchoProtocolEngine extends CountingProtocolEngine - { - - public void received(ByteBuffer msg) - { - super.received(msg); - msg.rewind(); - _driver.send(msg); - } - } - - public static void sleepForAtLeast(long period) - { - long start = System.currentTimeMillis(); - long timeLeft = period; - while (timeLeft > 0) - { - try - { - Thread.sleep(timeLeft); - } - catch (InterruptedException e) - { - // Ignore it - } - timeLeft = period - (System.currentTimeMillis() - start); - } - } -} \ No newline at end of file diff --git a/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java new file mode 100644 index 0000000000..a4292d9009 --- /dev/null +++ b/java/common/src/test/java/org/apache/qpid/transport/network/mina/MinaNetworkHandlerTest.java @@ -0,0 +1,535 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.transport.network.mina; + +import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.mina.util.AvailablePortFinder; +import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.Transport; + +public class MinaNetworkHandlerTest extends QpidTestCase +{ + + private static final String TEST_DATA = "YHALOTHAR"; + private int _testPort; + private IncomingNetworkTransport _server; + private OutgoingNetworkTransport _client; + private CountingProtocolEngine _countingEngine; // Keeps a count of how many bytes it's read + private Exception _thrownEx; + private ConnectionSettings _clientSettings; + private NetworkConnection _network; + private TestNetworkTransportConfiguration _brokerSettings; + + @Override + public void setUp() throws Exception + { + String host = InetAddress.getLocalHost().getHostName(); + _testPort = AvailablePortFinder.getNextAvailable(10000); + + _clientSettings = new ConnectionSettings(); + _clientSettings.setHost(host); + _clientSettings.setPort(_testPort); + + _brokerSettings = new TestNetworkTransportConfiguration(_testPort, host); + + _server = new MinaNetworkTransport(); + _client = new MinaNetworkTransport(); + _thrownEx = null; + _countingEngine = new CountingProtocolEngine(); + } + + @Override + public void tearDown() + { + if (_server != null) + { + _server.close(); + } + + if (_client != null) + { + _client.close(); + } + } + + /** + * Tests that a socket can't be opened if a driver hasn't been bound + * to the port and can be opened if a driver has been bound. + */ + public void testBindOpen() throws Exception + { + try + { + _client.connect(_clientSettings, _countingEngine, null); + } + catch (TransportException e) + { + _thrownEx = e; + } + + assertNotNull("Open should have failed since no engine bound", _thrownEx); + + _server.accept(_brokerSettings, null, null); + + _client.connect(_clientSettings, _countingEngine, null); + } + + /** + * Tests that a socket can't be opened after a bound NetworkDriver has been closed + */ + public void testBindOpenCloseOpen() throws Exception + { + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _client.connect(_clientSettings, _countingEngine, null); + _client.close(); + _server.close(); + + try + { + _client.connect(_clientSettings, _countingEngine, null); + } + catch (TransportException e) + { + _thrownEx = e; + } + assertNotNull("Open should have failed", _thrownEx); + } + + /** + * Checks that the right exception is thrown when binding a NetworkDriver to an already + * existing socket. + */ + public void testBindPortInUse() + { + try + { + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + } + catch (TransportException e) + { + fail("First bind should not fail"); + } + + try + { + IncomingNetworkTransport second = new MinaNetworkTransport(); + second.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + } + catch (TransportException e) + { + _thrownEx = e; + } + assertNotNull("Second bind should throw BindException", _thrownEx); + } + + /** + * Tests that binding to the wildcard address succeeds and a client can + * connect via localhost. + */ + public void testWildcardBind() throws Exception + { + TestNetworkTransportConfiguration serverSettings = + new TestNetworkTransportConfiguration(_testPort, WILDCARD_ADDRESS); + + _server.accept(serverSettings, null, null); + + try + { + _client.connect(_clientSettings, _countingEngine, null); + } + catch (TransportException e) + { + fail("Open should have succeeded since we used a wildcard bind"); + } + } + + /** + * tests that bytes sent on a network driver are received at the other end + */ + public void testSend() throws Exception + { + // Open a connection from a counting engine to an echo engine + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); + + // Tell the counting engine how much data we're sending + _countingEngine.setNewLatch(TEST_DATA.getBytes().length); + + // Send the data and wait for up to 2 seconds to get it back + _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _countingEngine.getLatch().await(2, TimeUnit.SECONDS); + + // Check what we got + assertEquals("Wrong amount of data recieved", TEST_DATA.getBytes().length, _countingEngine.getReadBytes()); + } + + /** + * Opens a connection with a low read idle and check that it gets triggered + * + */ + public void testSetReadIdle() throws Exception + { + // Open a connection from a counting engine to an echo engine + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); + assertFalse("Reader should not have been idle", _countingEngine.getReaderHasBeenIdle()); + _network.setMaxReadIdle(1); + sleepForAtLeast(1500); + assertTrue("Reader should have been idle", _countingEngine.getReaderHasBeenIdle()); + } + + /** + * Opens a connection with a low write idle and check that it gets triggered + * + */ + public void testSetWriteIdle() throws Exception + { + // Open a connection from a counting engine to an echo engine + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); + assertFalse("Reader should not have been idle", _countingEngine.getWriterHasBeenIdle()); + _network.setMaxWriteIdle(1); + sleepForAtLeast(1500); + assertTrue("Reader should have been idle", _countingEngine.getWriterHasBeenIdle()); + } + + + /** + * Creates and then closes a connection from client to server and checks that the server + * has its closed() method called. Then creates a new client and closes the server to check + * that the client has its closed() method called. + */ + public void testClosed() throws Exception + { + // Open a connection from a counting engine to an echo engine + EchoProtocolEngineSingletonFactory factory = new EchoProtocolEngineSingletonFactory(); + _server.accept(_brokerSettings, factory, null); + _network = _client.connect(_clientSettings, _countingEngine, null); + EchoProtocolEngine serverEngine = null; + while (serverEngine == null) + { + serverEngine = factory.getEngine(); + if (serverEngine == null) + { + try + { + Thread.sleep(10); + } + catch (InterruptedException e) + { + } + } + } + assertFalse("Server should not have been closed", serverEngine.getClosed()); + serverEngine.setNewLatch(1); + _client.close(); + try + { + serverEngine.getLatch().await(2, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + } + assertTrue("Server should have been closed", serverEngine.getClosed()); + + _client.connect(_clientSettings, _countingEngine, null); + _countingEngine.setClosed(false); + assertFalse("Client should not have been closed", _countingEngine.getClosed()); + _countingEngine.setNewLatch(1); + _server.close(); + try + { + _countingEngine.getLatch().await(2, TimeUnit.SECONDS); + } + catch (InterruptedException e) + { + } + assertTrue("Client should have been closed", _countingEngine.getClosed()); + } + + /** + * Create a connection and instruct the client to throw an exception when it gets some data + * and that the latch gets counted down. + */ + public void testExceptionCaught() throws Exception + { + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); + + + assertEquals("Exception should not have been thrown", 1, + _countingEngine.getExceptionLatch().getCount()); + _countingEngine.setErrorOnNextRead(true); + _countingEngine.setNewLatch(TEST_DATA.getBytes().length); + _network.getSender().send(ByteBuffer.wrap(TEST_DATA.getBytes())); + _countingEngine.getExceptionLatch().await(2, TimeUnit.SECONDS); + assertEquals("Exception should have been thrown", 0, + _countingEngine.getExceptionLatch().getCount()); + } + + /** + * Opens a connection and checks that the remote address is the one that was asked for + */ + public void testGetRemoteAddress() throws Exception + { + _server.accept(_brokerSettings, new EchoProtocolEngineSingletonFactory(), null); + _network = _client.connect(_clientSettings, _countingEngine, null); + assertEquals(new InetSocketAddress(InetAddress.getLocalHost(), _testPort), + _network.getRemoteAddress()); + } + + private class EchoProtocolEngineSingletonFactory implements ProtocolEngineFactory + { + private EchoProtocolEngine _engine = null; + + public ProtocolEngine newProtocolEngine(NetworkConnection network) + { + if (_engine == null) + { + _engine = new EchoProtocolEngine(network); + } + return getEngine(); + } + + public EchoProtocolEngine getEngine() + { + return _engine; + } + } + + public class CountingProtocolEngine implements ProtocolEngine + { + public ArrayList _receivedBytes = new ArrayList(); + private int _readBytes; + private CountDownLatch _latch = new CountDownLatch(0); + private boolean _readerHasBeenIdle; + private boolean _writerHasBeenIdle; + private boolean _closed = false; + private boolean _nextReadErrors = false; + private CountDownLatch _exceptionLatch = new CountDownLatch(1); + + public void closed() + { + setClosed(true); + _latch.countDown(); + } + + public void setErrorOnNextRead(boolean b) + { + _nextReadErrors = b; + } + + public void setNewLatch(int length) + { + _latch = new CountDownLatch(length); + } + + public long getReadBytes() + { + return _readBytes; + } + + public SocketAddress getRemoteAddress() + { + return _network.getRemoteAddress(); + } + + public SocketAddress getLocalAddress() + { + return _network.getLocalAddress(); + } + + public long getWrittenBytes() + { + return 0; + } + + public void readerIdle() + { + _readerHasBeenIdle = true; + } + + public void writeFrame(AMQDataBlock frame) + { + + } + + public void writerIdle() + { + _writerHasBeenIdle = true; + } + + public void exception(Throwable t) + { + _exceptionLatch.countDown(); + } + + public CountDownLatch getExceptionLatch() + { + return _exceptionLatch; + } + + public void received(ByteBuffer msg) + { + // increment read bytes and count down the latch for that many + int bytes = msg.remaining(); + _readBytes += bytes; + for (int i = 0; i < bytes; i++) + { + _latch.countDown(); + } + + // Throw an error if we've been asked too, but we can still count + if (_nextReadErrors) + { + throw new RuntimeException("Was asked to error"); + } + } + + public CountDownLatch getLatch() + { + return _latch; + } + + public boolean getWriterHasBeenIdle() + { + return _writerHasBeenIdle; + } + + public boolean getReaderHasBeenIdle() + { + return _readerHasBeenIdle; + } + + public void setClosed(boolean _closed) + { + this._closed = _closed; + } + + public boolean getClosed() + { + return _closed; + } + + } + + private class EchoProtocolEngine extends CountingProtocolEngine + { + private NetworkConnection _echoNetwork; + + public EchoProtocolEngine(NetworkConnection network) + { + _echoNetwork = network; + } + + public void received(ByteBuffer msg) + { + super.received(msg); + msg.rewind(); + _echoNetwork.getSender().send(msg); + } + } + + public static void sleepForAtLeast(long period) + { + long start = System.currentTimeMillis(); + long timeLeft = period; + while (timeLeft > 0) + { + try + { + Thread.sleep(timeLeft); + } + catch (InterruptedException e) + { + // Ignore it + } + timeLeft = period - (System.currentTimeMillis() - start); + } + } + + private static class TestNetworkTransportConfiguration implements NetworkTransportConfiguration + { + private int _port; + private String _host; + + public TestNetworkTransportConfiguration(final int port, final String host) + { + _port = port; + _host = host; + } + + public Boolean getTcpNoDelay() + { + return true; + } + + public Integer getReceiveBufferSize() + { + return 32768; + } + + public Integer getSendBufferSize() + { + return 32768; + } + + public Integer getPort() + { + return _port; + } + + public String getHost() + { + return _host; + } + + public String getTransport() + { + return Transport.TCP; + } + + public Integer getConnectorProcessors() + { + return 4; + } + + } +} \ No newline at end of file -- cgit v1.2.1