summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-12-05 14:48:00 +0000
committerKeith Wall <kwall@apache.org>2014-12-05 14:48:00 +0000
commita0cdd525f55c8386e2b3e86cdd683c69d181c209 (patch)
treeee0f6d906cbcfe1b0d8a60dc3ee23f963252377f /qpid/java
parenta8c5c3888feed40fd6c47a44f677668250e7635d (diff)
downloadqpid-python-a0cdd525f55c8386e2b3e86cdd683c69d181c209.tar.gz
QPID-6262: Rob's prototype NIO work
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1643302 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java281
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java6
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java5
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java340
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java307
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java147
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java319
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java266
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java272
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java274
11 files changed, 1642 insertions, 576 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index dd5e01ebc5..aa9f649995 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -44,7 +44,6 @@ import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java
new file mode 100644
index 0000000000..09e8a68fb0
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import javax.net.ssl.SSLEngineResult.Status;
+import javax.net.ssl.SSLException;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+import org.apache.qpid.transport.util.Logger;
+
+public class SSLBufferingSender implements Sender<ByteBuffer>
+{
+ private static final Logger LOGGER = Logger.get(SSLBufferingSender.class);
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+ private final Sender<ByteBuffer> _delegate;
+ private final SSLEngine _engine;
+ private final int _sslBufSize;
+ private final SSLStatus _sslStatus;
+
+ private String _hostname;
+
+ private final AtomicBoolean _closed = new AtomicBoolean(false);
+ private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
+
+
+ public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
+ {
+ _engine = engine;
+ _delegate = delegate;
+ _sslBufSize = engine.getSession().getPacketBufferSize();
+ _sslStatus = sslStatus;
+ }
+
+ public void setHostname(String hostname)
+ {
+ _hostname = hostname;
+ }
+
+ public void close()
+ {
+ if (!_closed.getAndSet(true))
+ {
+ if (_engine.isOutboundDone())
+ {
+ return;
+ }
+ LOGGER.debug("Closing SSL connection");
+ doSend();
+ _engine.closeOutbound();
+ try
+ {
+ tearDownSSLConnection();
+ }
+ catch(Exception e)
+ {
+ throw new SenderException("Error closing SSL connection",e);
+ }
+
+
+ synchronized(_sslStatus.getSslLock())
+ {
+ while (!_engine.isOutboundDone())
+ {
+ try
+ {
+ _sslStatus.getSslLock().wait();
+ }
+ catch(InterruptedException e)
+ {
+ // pass
+ }
+
+ }
+ }
+ _delegate.close();
+ }
+ }
+
+ private void tearDownSSLConnection() throws Exception
+ {
+ ByteBuffer netData = getNetDataBuffer();
+ SSLEngineResult result = _engine.wrap(ByteBuffer.allocate(0), netData);
+ Status status = result.getStatus();
+ int read = result.bytesProduced();
+ while (status != Status.CLOSED)
+ {
+ if (status == Status.BUFFER_OVERFLOW)
+ {
+ netData.clear();
+ }
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ _delegate.send(data);
+ flush();
+ }
+ result = _engine.wrap(ByteBuffer.allocate(0), netData);
+ status = result.getStatus();
+ read = result.bytesProduced();
+ }
+ }
+
+ private ByteBuffer getNetDataBuffer()
+ {
+ return ByteBuffer.allocate(_sslBufSize);
+ }
+
+ public void flush()
+ {
+ _delegate.flush();
+ }
+
+ public void send()
+ {
+ if(!_closed.get())
+ {
+ doSend();
+ }
+ }
+
+ public synchronized void send(ByteBuffer appData)
+ {
+ boolean buffered;
+ if(buffered = _appData.hasRemaining())
+ {
+ ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
+ newBuf.put(_appData);
+ newBuf.put(appData);
+ newBuf.flip();
+ _appData = newBuf;
+ }
+ if (_closed.get())
+ {
+ throw new SenderException("SSL Sender is closed");
+ }
+ doSend();
+ if(!appData.hasRemaining())
+ {
+ _appData = EMPTY_BYTE_BUFFER;
+ }
+ else if(!buffered)
+ {
+ _appData = ByteBuffer.allocate(appData.remaining());
+ _appData.put(appData);
+ _appData.flip();
+ }
+ }
+
+ private synchronized void doSend()
+ {
+
+ HandshakeStatus handshakeStatus;
+ Status status;
+
+ while((_appData.hasRemaining() || _engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
+ && !_sslStatus.getSslErrorFlag())
+ {
+ ByteBuffer netData = getNetDataBuffer();
+
+ int read = 0;
+ try
+ {
+ SSLEngineResult result = _engine.wrap(_appData, netData);
+ read = result.bytesProduced();
+ status = result.getStatus();
+ handshakeStatus = result.getHandshakeStatus();
+ }
+ catch(SSLException e)
+ {
+ // Should this set _sslError??
+ throw new SenderException("SSL, Error occurred while encrypting data",e);
+ }
+
+ if(read > 0)
+ {
+ int limit = netData.limit();
+ netData.limit(netData.position());
+ netData.position(netData.position() - read);
+
+ ByteBuffer data = netData.slice();
+
+ netData.limit(limit);
+ netData.position(netData.position() + read);
+
+ _delegate.send(data);
+ }
+
+ switch(status)
+ {
+ case CLOSED:
+ throw new SenderException("SSLEngine is closed");
+
+ case BUFFER_OVERFLOW:
+ netData.clear();
+ continue;
+
+ case OK:
+ break; // do nothing
+
+ default:
+ throw new IllegalStateException("SSLReceiver: Invalid State " + status);
+ }
+
+ switch (handshakeStatus)
+ {
+ case NEED_WRAP:
+ if (netData.hasRemaining())
+ {
+ continue;
+ }
+
+ case NEED_TASK:
+ doTasks();
+ break;
+
+ case NEED_UNWRAP:
+ flush();
+ return;
+
+ case FINISHED:
+ if (_hostname != null)
+ {
+ SSLUtil.verifyHostname(_engine, _hostname);
+ }
+
+ case NOT_HANDSHAKING:
+ break; //do nothing
+
+ default:
+ throw new IllegalStateException("SSLSender: Invalid State " + status);
+ }
+
+ }
+ }
+
+ private void doTasks()
+ {
+ Runnable runnable;
+ while ((runnable = _engine.getDelegatedTask()) != null) {
+ runnable.run();
+ }
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ _delegate.setIdleTimeout(i);
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
index b1f6b84b72..d3a984c4f0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -28,13 +28,13 @@ import java.util.Set;
import javax.net.ssl.SSLContext;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport;
class TCPandSSLTransport implements AcceptingTransport
{
@@ -78,10 +78,10 @@ class TCPandSSLTransport implements AcceptingTransport
}
final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
- _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
+ _networkTransport = new NonBlockingNetworkTransport();
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(
- _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null,
+ _port.getParent(Broker.class), _sslContext,
settings.wantClientAuth(), settings.needClientAuth(),
_supported,
_defaultSupportedProtocolReply,
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 854cd388b9..670acc4709 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -111,7 +111,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void send(ByteBuffer msg)
{
_lastWriteTime = System.currentTimeMillis();
- sender.send(msg);
+ ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]);
+ copy.put(msg);
+ copy.flip();
+ sender.send(copy);
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
new file mode 100644
index 0000000000..871283ef2e
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
@@ -0,0 +1,340 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.OutgoingNetworkTransport;
+import org.apache.qpid.transport.network.TransportActivity;
+import org.apache.qpid.transport.network.security.ssl.SSLUtil;
+
+public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+{
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+ private Socket _socket;
+ private NetworkConnection _connection;
+ private AcceptingThread _acceptor;
+
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity)
+ {
+ int sendBufferSize = settings.getWriteBufferSize();
+ int receiveBufferSize = settings.getReadBufferSize();
+
+ try
+ {
+ _socket = new Socket();
+ _socket.setReuseAddress(true);
+ _socket.setTcpNoDelay(settings.isTcpNodelay());
+ _socket.setSendBufferSize(sendBufferSize);
+ _socket.setReceiveBufferSize(receiveBufferSize);
+
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
+ LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
+ LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
+ }
+
+ InetAddress address = InetAddress.getByName(settings.getHost());
+
+ _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
+ }
+ catch (SocketException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+
+ try
+ {
+ IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+ _connection = createNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
+ ticker.setConnection(_connection);
+ _connection.start();
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ _socket.close();
+ }
+ catch(IOException ioe)
+ {
+ //ignored, throw based on original exception
+ }
+
+ throw new TransportException("Error creating network connection", e);
+ }
+
+ return _connection;
+ }
+
+ public void close()
+ {
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ if(_acceptor != null)
+ {
+ _acceptor.close();
+ }
+ }
+
+ public NetworkConnection getConnection()
+ {
+ return _connection;
+ }
+
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext)
+ {
+ try
+ {
+ _acceptor = new AcceptingThread(config, factory, sslContext);
+ _acceptor.setDaemon(false);
+ _acceptor.start();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+ }
+
+ public int getAcceptingPort()
+ {
+ return _acceptor == null ? -1 : _acceptor.getPort();
+ }
+
+ protected abstract NetworkConnection createNetworkConnection(Socket socket,
+ Receiver<ByteBuffer> engine,
+ Integer sendBufferSize,
+ Integer receiveBufferSize,
+ int timeout,
+ IdleTimeoutTicker ticker);
+
+ private class AcceptingThread extends Thread
+ {
+ private volatile boolean _closed = false;
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContext;
+ private ServerSocket _serverSocket;
+ private int _timeout;
+
+ private AcceptingThread(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext) throws IOException
+ {
+ _config = config;
+ _factory = factory;
+ _sslContext = sslContext;
+ _timeout = TIMEOUT;
+
+ InetSocketAddress address = config.getAddress();
+
+ if(sslContext == null)
+ {
+ _serverSocket = new ServerSocket();
+ }
+ else
+ {
+ SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
+ _serverSocket = socketFactory.createServerSocket();
+
+ SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
+
+ SSLUtil.removeSSLv3Support(sslServerSocket);
+
+ if(config.needClientAuth())
+ {
+ sslServerSocket.setNeedClientAuth(true);
+ }
+ else if(config.wantClientAuth())
+ {
+ sslServerSocket.setWantClientAuth(true);
+ }
+
+ }
+
+ _serverSocket.setReuseAddress(true);
+ _serverSocket.bind(address);
+ }
+
+
+ /**
+ Close the underlying ServerSocket if it has not already been closed.
+ */
+ public void close()
+ {
+ LOGGER.debug("Shutting down the Acceptor");
+ _closed = true;
+
+ if (!_serverSocket.isClosed())
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+ private int getPort()
+ {
+ return _serverSocket.getLocalPort();
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (!_closed)
+ {
+ Socket socket = null;
+ try
+ {
+ socket = _serverSocket.accept();
+
+ ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socket.setTcpNoDelay(_config.getTcpNoDelay());
+ socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socket.setSendBufferSize(sendBufferSize);
+ socket.setReceiveBufferSize(receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NetworkConnection connection =
+ createNetworkConnection(socket,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker);
+
+ connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+
+ connection.start();
+ }
+ else
+ {
+ socket.close();
+ }
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket);
+ }
+ catch(IOException e)
+ {
+ if(!_closed)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket);
+ try
+ {
+ //Delay to avoid tight spinning the loop during issues such as too many open files
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.debug("Stopping acceptor due to interrupt request");
+ _closed = true;
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
+ + _config.getAddress());
+ }
+ }
+ }
+
+ private void closeSocketIfNecessary(final Socket socket)
+ {
+ if(socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Exception while closing socket", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
index e5bc9fa977..f33f626601 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
@@ -20,312 +20,25 @@
*/
package org.apache.qpid.transport.network.io;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketException;
import java.nio.ByteBuffer;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
-
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.configuration.CommonProperties;
-import org.apache.qpid.protocol.ProtocolEngine;
-import org.apache.qpid.protocol.ProtocolEngineFactory;
-import org.apache.qpid.transport.ConnectionSettings;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.Receiver;
-import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.NetworkConnection;
-import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.TransportActivity;
-import org.apache.qpid.transport.network.security.ssl.SSLUtil;
-public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport
+public class IoNetworkTransport extends AbstractNetworkTransport
{
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class);
- private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
- CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
- CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
-
-
- private Socket _socket;
- private IoNetworkConnection _connection;
- private AcceptingThread _acceptor;
-
- public NetworkConnection connect(ConnectionSettings settings,
- Receiver<ByteBuffer> delegate,
- TransportActivity transportActivity)
- {
- int sendBufferSize = settings.getWriteBufferSize();
- int receiveBufferSize = settings.getReadBufferSize();
-
- try
- {
- _socket = new Socket();
- _socket.setReuseAddress(true);
- _socket.setTcpNoDelay(settings.isTcpNodelay());
- _socket.setSendBufferSize(sendBufferSize);
- _socket.setReceiveBufferSize(receiveBufferSize);
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize());
- LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize());
- LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay());
- }
-
- InetAddress address = InetAddress.getByName(settings.getHost());
-
- _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout());
- }
- catch (SocketException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
- catch (IOException e)
- {
- throw new TransportException("Error connecting to broker", e);
- }
-
- try
- {
- IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
- _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker);
- ticker.setConnection(_connection);
- _connection.start();
- }
- catch(Exception e)
- {
- try
- {
- _socket.close();
- }
- catch(IOException ioe)
- {
- //ignored, throw based on original exception
- }
-
- throw new TransportException("Error creating network connection", e);
- }
-
- return _connection;
- }
-
- public void close()
- {
- if(_connection != null)
- {
- _connection.close();
- }
- if(_acceptor != null)
- {
- _acceptor.close();
- }
- }
-
- public NetworkConnection getConnection()
- {
- return _connection;
- }
-
- public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext)
- {
- try
- {
- _acceptor = new AcceptingThread(config, factory, sslContext);
- _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress()));
- _acceptor.setDaemon(false);
- _acceptor.start();
- }
- catch (IOException e)
- {
- throw new TransportException("Failed to start AMQP on port : " + config, e);
- }
- }
-
- public int getAcceptingPort()
- {
- return _acceptor == null ? -1 : _acceptor.getPort();
- }
- private class AcceptingThread extends Thread
+ @Override
+ protected IoNetworkConnection createNetworkConnection(final Socket socket,
+ final Receiver<ByteBuffer> engine,
+ final Integer sendBufferSize,
+ final Integer receiveBufferSize,
+ final int timeout,
+ final IdleTimeoutTicker ticker)
{
- private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocket _serverSocket;
- private int _timeout;
-
- private AcceptingThread(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext) throws IOException
- {
- _config = config;
- _factory = factory;
- _sslContext = sslContext;
- _timeout = TIMEOUT;
-
- InetSocketAddress address = config.getAddress();
-
- if(sslContext == null)
- {
- _serverSocket = new ServerSocket();
- }
- else
- {
- SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory();
- _serverSocket = socketFactory.createServerSocket();
-
- SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket;
-
- SSLUtil.removeSSLv3Support(sslServerSocket);
-
- if(config.needClientAuth())
- {
- sslServerSocket.setNeedClientAuth(true);
- }
- else if(config.wantClientAuth())
- {
- sslServerSocket.setWantClientAuth(true);
- }
-
- }
-
- _serverSocket.setReuseAddress(true);
- _serverSocket.bind(address);
- }
-
-
- /**
- Close the underlying ServerSocket if it has not already been closed.
- */
- public void close()
- {
- LOGGER.debug("Shutting down the Acceptor");
- _closed = true;
-
- if (!_serverSocket.isClosed())
- {
- try
- {
- _serverSocket.close();
- }
- catch (IOException e)
- {
- throw new TransportException(e);
- }
- }
- }
-
- private int getPort()
- {
- return _serverSocket.getLocalPort();
- }
-
- @Override
- public void run()
- {
- try
- {
- while (!_closed)
- {
- Socket socket = null;
- try
- {
- socket = _serverSocket.accept();
-
- ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress());
-
- if(engine != null)
- {
- socket.setTcpNoDelay(_config.getTcpNoDelay());
- socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT);
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
- socket.setSendBufferSize(sendBufferSize);
- socket.setReceiveBufferSize(receiveBufferSize);
-
-
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
- NetworkConnection connection =
- new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout,
- ticker);
-
- connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
-
- ticker.setConnection(connection);
-
- engine.setNetworkConnection(connection, connection.getSender());
-
- connection.start();
- }
- else
- {
- socket.close();
- }
- }
- catch(RuntimeException e)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket);
- }
- catch(IOException e)
- {
- if(!_closed)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socket);
- try
- {
- //Delay to avoid tight spinning the loop during issues such as too many open files
- Thread.sleep(1000);
- }
- catch (InterruptedException ie)
- {
- LOGGER.debug("Stopping acceptor due to interrupt request");
- _closed = true;
- }
- }
- }
- }
- }
- finally
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress());
- }
- }
- }
-
- private void closeSocketIfNecessary(final Socket socket)
- {
- if(socket != null)
- {
- try
- {
- socket.close();
- }
- catch (IOException e)
- {
- LOGGER.debug("Exception while closing socket", e);
- }
- }
- }
-
+ return new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout,
+ ticker);
}
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
new file mode 100644
index 0000000000..c848ba7a16
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java
@@ -0,0 +1,147 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.security.Principal;
+
+import javax.net.ssl.SSLPeerUnverifiedException;
+import javax.net.ssl.SSLSocket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+
+public class NonBlockingConnection implements NetworkConnection
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+ private final SocketChannel _socket;
+ private final long _timeout;
+ private final NonBlockingSender _ioSender;
+ private final NonBlockingReceiver _ioReceiver;
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
+ private Principal _principal;
+ private boolean _principalChecked;
+ private final Object _lock = new Object();
+
+ public NonBlockingConnection(SocketChannel socket, Receiver<ByteBuffer> delegate,
+ int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker)
+ {
+ _socket = socket;
+ _timeout = timeout;
+
+ _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout);
+ _ioReceiver.setTicker(ticker);
+
+ _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout);
+
+ _ioSender.setReceiver(_ioReceiver);
+
+ }
+
+ public void start()
+ {
+ _ioSender.initiate();
+ _ioReceiver.initiate();
+ }
+
+ public Sender<ByteBuffer> getSender()
+ {
+ return _ioSender;
+ }
+
+ public void close()
+ {
+ try
+ {
+ _ioSender.close();
+ }
+ finally
+ {
+ _ioReceiver.close(false);
+ }
+ }
+
+ public SocketAddress getRemoteAddress()
+ {
+ return _socket.socket().getRemoteSocketAddress();
+ }
+
+ public SocketAddress getLocalAddress()
+ {
+ return _socket.socket().getLocalSocketAddress();
+ }
+
+ public void setMaxWriteIdle(int sec)
+ {
+ _maxWriteIdle = sec;
+ }
+
+ public void setMaxReadIdle(int sec)
+ {
+ _maxReadIdle = sec;
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ synchronized (_lock)
+ {
+ if(!_principalChecked)
+ {
+ if(_socket.socket() instanceof SSLSocket)
+ {
+ try
+ {
+ _principal = ((SSLSocket) _socket.socket()).getSession().getPeerPrincipal();
+ }
+ catch(SSLPeerUnverifiedException e)
+ {
+ _principal = null;
+ }
+ }
+
+ _principalChecked = true;
+ }
+
+ return _principal;
+ }
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _maxReadIdle;
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _maxWriteIdle;
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
new file mode 100644
index 0000000000..3d45dafa05
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -0,0 +1,319 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.StandardSocketOptions;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+import javax.net.ssl.SSLContext;
+
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.configuration.CommonProperties;
+import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.transport.ConnectionSettings;
+import org.apache.qpid.transport.NetworkTransportConfiguration;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.IncomingNetworkTransport;
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.TransportActivity;
+
+public class NonBlockingNetworkTransport implements IncomingNetworkTransport
+{
+
+ private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
+ private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
+ CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
+ private AcceptingThread _acceptor;
+
+ private SocketChannel _socketChannel;
+ private NonBlockingConnection _connection;
+
+ public NetworkConnection connect(ConnectionSettings settings,
+ Receiver<ByteBuffer> delegate,
+ TransportActivity transportActivity)
+ {
+ int sendBufferSize = settings.getWriteBufferSize();
+ int receiveBufferSize = settings.getReadBufferSize();
+
+ try
+ {
+ _socketChannel = SocketChannel.open();
+ _socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ _socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, settings.isTcpNodelay());
+ _socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ _socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("SO_RCVBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_RCVBUF));
+ LOGGER.debug("SO_SNDBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_SNDBUF));
+ LOGGER.debug("TCP_NODELAY : " + _socketChannel.getOption(StandardSocketOptions.TCP_NODELAY));
+ }
+
+ InetAddress address = InetAddress.getByName(settings.getHost());
+
+ _socketChannel.socket().connect(new InetSocketAddress(address, settings.getPort()),
+ settings.getConnectTimeout());
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Error connecting to broker", e);
+ }
+
+ try
+ {
+ IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT);
+ _connection = createNetworkConnection(_socketChannel, delegate, sendBufferSize, receiveBufferSize,
+ TIMEOUT, ticker);
+ ticker.setConnection(_connection);
+ _connection.start();
+ }
+ catch(Exception e)
+ {
+ try
+ {
+ _socketChannel.close();
+ }
+ catch(IOException ioe)
+ {
+ //ignored, throw based on original exception
+ }
+
+ throw new TransportException("Error creating network connection", e);
+ }
+
+ return _connection;
+ }
+
+
+ protected NonBlockingConnection createNetworkConnection(final SocketChannel socket,
+ final Receiver<ByteBuffer> engine,
+ final Integer sendBufferSize,
+ final Integer receiveBufferSize,
+ final int timeout,
+ final IdleTimeoutTicker ticker)
+ {
+ return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker);
+ }
+
+ public void close()
+ {
+ if(_connection != null)
+ {
+ _connection.close();
+ }
+ if(_acceptor != null)
+ {
+ _acceptor.close();
+ }
+ }
+
+ public NetworkConnection getConnection()
+ {
+ return _connection;
+ }
+
+ public void accept(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext)
+ {
+ try
+ {
+ _acceptor = new AcceptingThread(config, factory, sslContext);
+ _acceptor.setDaemon(false);
+ _acceptor.start();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+ }
+
+ public int getAcceptingPort()
+ {
+ return _acceptor == null ? -1 : _acceptor.getPort();
+ }
+
+ private class AcceptingThread extends Thread
+ {
+ private volatile boolean _closed = false;
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContext;
+ private ServerSocketChannel _serverSocket;
+ private int _timeout;
+
+ private AcceptingThread(NetworkTransportConfiguration config,
+ ProtocolEngineFactory factory,
+ SSLContext sslContext) throws IOException
+ {
+ _config = config;
+ _factory = factory;
+ _sslContext = sslContext;
+ _timeout = TIMEOUT;
+
+ InetSocketAddress address = config.getAddress();
+
+ _serverSocket = ServerSocketChannel.open();
+
+ _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
+ _serverSocket.bind(address);
+ }
+
+
+ /**
+ Close the underlying ServerSocket if it has not already been closed.
+ */
+ public void close()
+ {
+ LOGGER.debug("Shutting down the Acceptor");
+ _closed = true;
+
+ if (!_serverSocket.socket().isClosed())
+ {
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+ private int getPort()
+ {
+ return _serverSocket.socket().getLocalPort();
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (!_closed)
+ {
+ SocketChannel socket = null;
+ try
+ {
+ socket = _serverSocket.accept();
+
+ ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socket.socket().setSoTimeout(1000 * HANSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socket.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socket.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NetworkConnection connection =
+ createNetworkConnection(socket,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker);
+
+ connection.setMaxReadIdle(HANSHAKE_TIMEOUT);
+
+ ticker.setConnection(connection);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+
+ connection.start();
+ }
+ else
+ {
+ socket.close();
+ }
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket.socket());
+ }
+ catch(IOException e)
+ {
+ if(!_closed)
+ {
+ LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
+ closeSocketIfNecessary(socket.socket());
+ try
+ {
+ //Delay to avoid tight spinning the loop during issues such as too many open files
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException ie)
+ {
+ LOGGER.debug("Stopping acceptor due to interrupt request");
+ _closed = true;
+ }
+ }
+ }
+ }
+ }
+ finally
+ {
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
+ + _config.getAddress());
+ }
+ }
+ }
+
+ private void closeSocketIfNecessary(final Socket socket)
+ {
+ if(socket != null)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Exception while closing socket", e);
+ }
+ }
+ }
+
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
new file mode 100644
index 0000000000..ccd7170b62
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java
@@ -0,0 +1,266 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.SSLSocket;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.Ticker;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.util.SystemUtils;
+
+/**
+ * IoReceiver
+ *
+ */
+
+final class NonBlockingReceiver implements Runnable
+{
+
+ private static final Logger log = Logger.get(NonBlockingReceiver.class);
+
+ private final Receiver<ByteBuffer> receiver;
+ private final int bufferSize;
+ private final SocketChannel socket;
+ private final long timeout;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Thread receiverThread;
+ private static final boolean shutdownBroken;
+
+ private Ticker _ticker;
+ static
+ {
+ shutdownBroken = SystemUtils.isWindows();
+ }
+
+ public NonBlockingReceiver(SocketChannel socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout)
+ {
+ this.receiver = receiver;
+ this.bufferSize = bufferSize;
+ this.socket = socket;
+ this.timeout = timeout;
+
+ try
+ {
+ //Create but deliberately don't start the thread.
+ receiverThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException("Error creating IOReceiver thread",e);
+ }
+ receiverThread.setDaemon(true);
+ receiverThread.setName(String.format("IoReceiver - %s", socket.socket().getRemoteSocketAddress()));
+ }
+
+ public void initiate()
+ {
+ receiverThread.start();
+ }
+
+ public void close()
+ {
+ close(false);
+ }
+
+ void close(boolean block)
+ {
+ if (!closed.getAndSet(true))
+ {
+ try
+ {
+ try
+ {
+ if (shutdownBroken || socket.socket() instanceof SSLSocket)
+ {
+ socket.close();
+ }
+ else
+ {
+ socket.shutdownInput();
+ }
+ }
+ catch(SocketException se)
+ {
+ if(!socket.socket().isClosed() && !socket.socket().isInputShutdown())
+ {
+ throw se;
+ }
+ }
+ if (block && Thread.currentThread() != receiverThread)
+ {
+ receiverThread.join(timeout);
+ if (receiverThread.isAlive())
+ {
+ throw new TransportException("join timed out");
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new TransportException(e);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+
+ }
+ }
+
+ public void run()
+ {
+ final int threshold = bufferSize / 2;
+
+ // I set the read buffer size similar to SO_RCVBUF
+ // Haven't tested with a lower value to see if it's better or worse
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+ try
+ {
+ int read = 0;
+ long currentTime;
+ while(read != -1)
+ {
+ try
+ {
+ while ((read = socket.read(buffer)) != -1)
+ {
+ if (read > 0)
+ {
+ ByteBuffer b = buffer.duplicate();
+ b.flip();
+ receiver.received(b);
+
+ if (buffer.remaining() < threshold)
+ {
+ buffer = ByteBuffer.allocate(bufferSize);
+ }
+ else
+ {
+ buffer = buffer.slice();
+ }
+ }
+ currentTime = System.currentTimeMillis();
+
+ if(_ticker != null)
+ {
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ tick = _ticker.tick(currentTime);
+ }
+ try
+ {
+ if(!socket.socket().isClosed())
+ {
+ socket.socket().setSoTimeout(tick <= 0 ? 1 : tick);
+ }
+ }
+ catch(SocketException e)
+ {
+ // ignore - closed socket
+ }
+ }
+ }
+ }
+ catch (SocketTimeoutException e)
+ {
+ currentTime = System.currentTimeMillis();
+ if(_ticker != null)
+ {
+ final int tick = _ticker.tick(currentTime);
+ if(!socket.socket().isClosed())
+ {
+ try
+ {
+ socket.socket().setSoTimeout(tick <= 0 ? 1 : tick );
+ }
+ catch(SocketException ex)
+ {
+ // ignore - closed socket
+ }
+ }
+ }
+ }
+ }
+ }
+ catch (Exception t)
+ {
+ if (shouldReport(t))
+ {
+ receiver.exception(t);
+ }
+ }
+ finally
+ {
+ receiver.closed();
+ try
+ {
+ socket.close();
+ }
+ catch(Exception e)
+ {
+ log.warn(e, "Error closing socket");
+ }
+ }
+ }
+
+ private boolean shouldReport(Throwable t)
+ {
+ boolean brokenClose = closed.get() &&
+ shutdownBroken &&
+ t instanceof SocketException &&
+ "socket closed".equalsIgnoreCase(t.getMessage());
+
+ boolean sslSocketClosed = closed.get() &&
+ socket.socket() instanceof SSLSocket &&
+ t instanceof SocketException &&
+ "Socket is closed".equalsIgnoreCase(t.getMessage());
+
+ boolean recvFailed = closed.get() &&
+ shutdownBroken &&
+ t instanceof SocketException &&
+ "Socket operation on nonsocket: recv failed".equalsIgnoreCase(t.getMessage());
+
+ return !brokenClose && !sslSocketClosed && !recvFailed;
+ }
+
+ public Ticker getTicker()
+ {
+ return _ticker;
+ }
+
+ public void setTicker(Ticker ticker)
+ {
+ _ticker = ticker;
+ }
+
+
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
new file mode 100644
index 0000000000..e3d604d2ba
--- /dev/null
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.transport.network.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.thread.Threading;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
+import org.apache.qpid.transport.SenderException;
+import org.apache.qpid.transport.util.Logger;
+
+
+public final class NonBlockingSender implements Runnable, Sender<ByteBuffer>
+{
+
+ private static final Logger log = Logger.get(NonBlockingSender.class);
+
+ // by starting here, we ensure that we always test the wraparound
+ // case, we should probably make this configurable somehow so that
+ // we can test other cases as well
+ private final static int START = Integer.MAX_VALUE - 10;
+
+ private final long timeout;
+ private final SocketChannel socket;
+
+ private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
+ private final Object _isEmpty = new Object();
+
+ private volatile boolean idle = true;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Thread senderThread;
+ private NonBlockingReceiver _receiver;
+ private final String _remoteSocketAddress;
+
+ private volatile Throwable exception = null;
+
+ public NonBlockingSender(SocketChannel socket, int bufferSize, long timeout)
+ {
+ this.socket = socket;
+ this.timeout = timeout;
+ _remoteSocketAddress = socket.socket().getRemoteSocketAddress().toString();
+
+
+ try
+ {
+ //Create but deliberately don't start the thread.
+ senderThread = Threading.getThreadFactory().createThread(this);
+ }
+ catch(Exception e)
+ {
+ throw new Error("Error creating IOSender thread",e);
+ }
+
+ senderThread.setDaemon(true);
+ senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress));
+ }
+
+ public void initiate()
+ {
+ senderThread.start();
+ }
+
+
+ public void send(ByteBuffer buf)
+ {
+ checkNotAlreadyClosed();
+
+ if(!senderThread.isAlive())
+ {
+ throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress));
+ }
+
+
+ _buffers.add(buf);
+ synchronized (_isEmpty)
+ {
+ _isEmpty.notifyAll();
+ }
+
+ }
+
+ public void flush()
+ {
+ if (idle)
+ {
+ synchronized (_isEmpty)
+ {
+ _isEmpty.notify();
+ }
+ }
+ }
+
+ public void close()
+ {
+ close(true, true);
+ }
+
+ private void close(boolean awaitSenderBeforeClose, boolean reportException)
+ {
+ if (!closed.getAndSet(true))
+ {
+
+ synchronized (_isEmpty)
+ {
+ _isEmpty.notify();
+ }
+
+ try
+ {
+ if (awaitSenderBeforeClose)
+ {
+ awaitSenderThreadShutdown();
+ }
+ }
+ finally
+ {
+ closeReceiver();
+ }
+ if (reportException && exception != null)
+ {
+ throw new SenderException(exception);
+ }
+ }
+ }
+
+ private void closeReceiver()
+ {
+ if(_receiver != null)
+ {
+ try
+ {
+ _receiver.close();
+ }
+ catch(RuntimeException e)
+ {
+ log.error(e, "Exception closing receiver for socket %s", _remoteSocketAddress);
+ throw new SenderException(e.getMessage(), e);
+ }
+ }
+ }
+
+ public void run()
+ {
+ while (true)
+ {
+
+ if (closed.get())
+ {
+ break;
+ }
+
+ idle = true;
+
+ synchronized (_isEmpty)
+ {
+ while (_buffers.isEmpty() && !closed.get())
+ {
+ try
+ {
+ _isEmpty.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // pass
+ }
+ }
+ }
+
+ idle = false;
+
+ ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
+ Iterator<ByteBuffer> bufferIterator = _buffers.iterator();
+ for(int i = 0; i < bufArray.length; i++)
+ {
+ bufArray[i] = bufferIterator.next();
+ }
+
+ try
+ {
+ socket.write(bufArray);
+ for(ByteBuffer buf : bufArray)
+ {
+ if(buf.remaining() == 0)
+ {
+ _buffers.poll();
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ catch (IOException e)
+ {
+ log.info("Exception in thread sending to '" + _remoteSocketAddress + "': " + e);
+ exception = e;
+ close(false, false);
+ break;
+ }
+
+ }
+
+ }
+
+ public void setIdleTimeout(int i)
+ {
+ try
+ {
+ socket.socket().setSoTimeout(i);
+ }
+ catch (Exception e)
+ {
+ throw new SenderException(e);
+ }
+ }
+
+ public void setReceiver(NonBlockingReceiver receiver)
+ {
+ _receiver = receiver;
+ }
+
+ private void awaitSenderThreadShutdown()
+ {
+ if (Thread.currentThread() != senderThread)
+ {
+ try
+ {
+ senderThread.join(timeout);
+ if (senderThread.isAlive())
+ {
+ log.error("join timed out for socket %s to stop", _remoteSocketAddress);
+ throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress));
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress);
+ throw new SenderException(e);
+ }
+ }
+ }
+
+ private void checkNotAlreadyClosed()
+ {
+ if (closed.get())
+ {
+ throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception);
+ }
+ }
+}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
index 24f95d7798..e69de29bb2 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport.network.security.ssl;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLEngineResult.HandshakeStatus;
-import javax.net.ssl.SSLEngineResult.Status;
-import javax.net.ssl.SSLException;
-import org.apache.qpid.transport.Sender;
-import org.apache.qpid.transport.SenderException;
-import org.apache.qpid.transport.network.security.SSLStatus;
-import org.apache.qpid.transport.util.Logger;
-
-public class SSLBufferingSender implements Sender<ByteBuffer>
-{
- private static final Logger log = Logger.get(SSLBufferingSender.class);
- private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
-
- private final Sender<ByteBuffer> delegate;
- private final SSLEngine engine;
- private final int sslBufSize;
- private final ByteBuffer netData;
- private final SSLStatus _sslStatus;
-
- private String _hostname;
-
- private final AtomicBoolean closed = new AtomicBoolean(false);
- private ByteBuffer _appData = EMPTY_BYTE_BUFFER;
-
-
- public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus)
- {
- this.engine = engine;
- this.delegate = delegate;
- sslBufSize = engine.getSession().getPacketBufferSize();
- netData = ByteBuffer.allocate(sslBufSize);
- _sslStatus = sslStatus;
- }
-
- public void setHostname(String hostname)
- {
- _hostname = hostname;
- }
-
- public void close()
- {
- if (!closed.getAndSet(true))
- {
- if (engine.isOutboundDone())
- {
- return;
- }
- log.debug("Closing SSL connection");
- doSend();
- engine.closeOutbound();
- try
- {
- tearDownSSLConnection();
- }
- catch(Exception e)
- {
- throw new SenderException("Error closing SSL connection",e);
- }
-
-
- synchronized(_sslStatus.getSslLock())
- {
- while (!engine.isOutboundDone())
- {
- try
- {
- _sslStatus.getSslLock().wait();
- }
- catch(InterruptedException e)
- {
- // pass
- }
-
- }
- }
- delegate.close();
- }
- }
-
- private void tearDownSSLConnection() throws Exception
- {
- SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData);
- Status status = result.getStatus();
- int read = result.bytesProduced();
- while (status != Status.CLOSED)
- {
- if (status == Status.BUFFER_OVERFLOW)
- {
- netData.clear();
- }
- if(read > 0)
- {
- int limit = netData.limit();
- netData.limit(netData.position());
- netData.position(netData.position() - read);
-
- ByteBuffer data = netData.slice();
-
- netData.limit(limit);
- netData.position(netData.position() + read);
-
- delegate.send(data);
- flush();
- }
- result = engine.wrap(ByteBuffer.allocate(0), netData);
- status = result.getStatus();
- read = result.bytesProduced();
- }
- }
-
- public void flush()
- {
- delegate.flush();
- }
-
- public void send()
- {
- if(!closed.get())
- {
- doSend();
- }
- }
-
- public synchronized void send(ByteBuffer appData)
- {
- boolean buffered;
- if(buffered = _appData.hasRemaining())
- {
- ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining());
- newBuf.put(_appData);
- newBuf.put(appData);
- newBuf.flip();
- _appData = newBuf;
- }
- if (closed.get())
- {
- throw new SenderException("SSL Sender is closed");
- }
- doSend();
- if(!appData.hasRemaining())
- {
- _appData = EMPTY_BYTE_BUFFER;
- }
- else if(!buffered)
- {
- _appData = ByteBuffer.allocate(appData.remaining());
- _appData.put(appData);
- _appData.flip();
- }
- }
-
- private synchronized void doSend()
- {
-
- HandshakeStatus handshakeStatus;
- Status status;
-
- while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP)
- && !_sslStatus.getSslErrorFlag())
- {
- int read = 0;
- try
- {
- SSLEngineResult result = engine.wrap(_appData, netData);
- read = result.bytesProduced();
- status = result.getStatus();
- handshakeStatus = result.getHandshakeStatus();
- }
- catch(SSLException e)
- {
- // Should this set _sslError??
- throw new SenderException("SSL, Error occurred while encrypting data",e);
- }
-
- if(read > 0)
- {
- int limit = netData.limit();
- netData.limit(netData.position());
- netData.position(netData.position() - read);
-
- ByteBuffer data = netData.slice();
-
- netData.limit(limit);
- netData.position(netData.position() + read);
-
- delegate.send(data);
- }
-
- switch(status)
- {
- case CLOSED:
- throw new SenderException("SSLEngine is closed");
-
- case BUFFER_OVERFLOW:
- netData.clear();
- continue;
-
- case OK:
- break; // do nothing
-
- default:
- throw new IllegalStateException("SSLReceiver: Invalid State " + status);
- }
-
- switch (handshakeStatus)
- {
- case NEED_WRAP:
- if (netData.hasRemaining())
- {
- continue;
- }
-
- case NEED_TASK:
- doTasks();
- break;
-
- case NEED_UNWRAP:
- flush();
- return;
-
- case FINISHED:
- if (_hostname != null)
- {
- SSLUtil.verifyHostname(engine, _hostname);
- }
-
- case NOT_HANDSHAKING:
- break; //do nothing
-
- default:
- throw new IllegalStateException("SSLSender: Invalid State " + status);
- }
-
- }
- }
-
- private void doTasks()
- {
- Runnable runnable;
- while ((runnable = engine.getDelegatedTask()) != null) {
- runnable.run();
- }
- }
-
- public void setIdleTimeout(int i)
- {
- delegate.setIdleTimeout(i);
- }
-}