diff options
| author | Keith Wall <kwall@apache.org> | 2014-12-05 14:48:00 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-12-05 14:48:00 +0000 |
| commit | a0cdd525f55c8386e2b3e86cdd683c69d181c209 (patch) | |
| tree | ee0f6d906cbcfe1b0d8a60dc3ee23f963252377f /qpid/java | |
| parent | a8c5c3888feed40fd6c47a44f677668250e7635d (diff) | |
| download | qpid-python-a0cdd525f55c8386e2b3e86cdd683c69d181c209.tar.gz | |
QPID-6262: Rob's prototype NIO work
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1643302 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
11 files changed, 1642 insertions, 576 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java index dd5e01ebc5..aa9f649995 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java @@ -44,7 +44,6 @@ import org.apache.qpid.server.plugin.ProtocolEngineCreator; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender; import org.apache.qpid.transport.network.security.ssl.SSLReceiver; import org.apache.qpid.transport.network.security.ssl.SSLUtil; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java new file mode 100644 index 0000000000..09e8a68fb0 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SSLBufferingSender.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; +import javax.net.ssl.SSLEngineResult.Status; +import javax.net.ssl.SSLException; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.network.security.SSLStatus; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; +import org.apache.qpid.transport.util.Logger; + +public class SSLBufferingSender implements Sender<ByteBuffer> +{ + private static final Logger LOGGER = Logger.get(SSLBufferingSender.class); + private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); + + private final Sender<ByteBuffer> _delegate; + private final SSLEngine _engine; + private final int _sslBufSize; + private final SSLStatus _sslStatus; + + private String _hostname; + + private final AtomicBoolean _closed = new AtomicBoolean(false); + private ByteBuffer _appData = EMPTY_BYTE_BUFFER; + + + public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) + { + _engine = engine; + _delegate = delegate; + _sslBufSize = engine.getSession().getPacketBufferSize(); + _sslStatus = sslStatus; + } + + public void setHostname(String hostname) + { + _hostname = hostname; + } + + public void close() + { + if (!_closed.getAndSet(true)) + { + if (_engine.isOutboundDone()) + { + return; + } + LOGGER.debug("Closing SSL connection"); + doSend(); + _engine.closeOutbound(); + try + { + tearDownSSLConnection(); + } + catch(Exception e) + { + throw new SenderException("Error closing SSL connection",e); + } + + + synchronized(_sslStatus.getSslLock()) + { + while (!_engine.isOutboundDone()) + { + try + { + _sslStatus.getSslLock().wait(); + } + catch(InterruptedException e) + { + // pass + } + + } + } + _delegate.close(); + } + } + + private void tearDownSSLConnection() throws Exception + { + ByteBuffer netData = getNetDataBuffer(); + SSLEngineResult result = _engine.wrap(ByteBuffer.allocate(0), netData); + Status status = result.getStatus(); + int read = result.bytesProduced(); + while (status != Status.CLOSED) + { + if (status == Status.BUFFER_OVERFLOW) + { + netData.clear(); + } + if(read > 0) + { + int limit = netData.limit(); + netData.limit(netData.position()); + netData.position(netData.position() - read); + + ByteBuffer data = netData.slice(); + + netData.limit(limit); + netData.position(netData.position() + read); + + _delegate.send(data); + flush(); + } + result = _engine.wrap(ByteBuffer.allocate(0), netData); + status = result.getStatus(); + read = result.bytesProduced(); + } + } + + private ByteBuffer getNetDataBuffer() + { + return ByteBuffer.allocate(_sslBufSize); + } + + public void flush() + { + _delegate.flush(); + } + + public void send() + { + if(!_closed.get()) + { + doSend(); + } + } + + public synchronized void send(ByteBuffer appData) + { + boolean buffered; + if(buffered = _appData.hasRemaining()) + { + ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining()); + newBuf.put(_appData); + newBuf.put(appData); + newBuf.flip(); + _appData = newBuf; + } + if (_closed.get()) + { + throw new SenderException("SSL Sender is closed"); + } + doSend(); + if(!appData.hasRemaining()) + { + _appData = EMPTY_BYTE_BUFFER; + } + else if(!buffered) + { + _appData = ByteBuffer.allocate(appData.remaining()); + _appData.put(appData); + _appData.flip(); + } + } + + private synchronized void doSend() + { + + HandshakeStatus handshakeStatus; + Status status; + + while((_appData.hasRemaining() || _engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) + && !_sslStatus.getSslErrorFlag()) + { + ByteBuffer netData = getNetDataBuffer(); + + int read = 0; + try + { + SSLEngineResult result = _engine.wrap(_appData, netData); + read = result.bytesProduced(); + status = result.getStatus(); + handshakeStatus = result.getHandshakeStatus(); + } + catch(SSLException e) + { + // Should this set _sslError?? + throw new SenderException("SSL, Error occurred while encrypting data",e); + } + + if(read > 0) + { + int limit = netData.limit(); + netData.limit(netData.position()); + netData.position(netData.position() - read); + + ByteBuffer data = netData.slice(); + + netData.limit(limit); + netData.position(netData.position() + read); + + _delegate.send(data); + } + + switch(status) + { + case CLOSED: + throw new SenderException("SSLEngine is closed"); + + case BUFFER_OVERFLOW: + netData.clear(); + continue; + + case OK: + break; // do nothing + + default: + throw new IllegalStateException("SSLReceiver: Invalid State " + status); + } + + switch (handshakeStatus) + { + case NEED_WRAP: + if (netData.hasRemaining()) + { + continue; + } + + case NEED_TASK: + doTasks(); + break; + + case NEED_UNWRAP: + flush(); + return; + + case FINISHED: + if (_hostname != null) + { + SSLUtil.verifyHostname(_engine, _hostname); + } + + case NOT_HANDSHAKING: + break; //do nothing + + default: + throw new IllegalStateException("SSLSender: Invalid State " + status); + } + + } + } + + private void doTasks() + { + Runnable runnable; + while ((runnable = _engine.getDelegatedTask()) != null) { + runnable.run(); + } + } + + public void setIdleTimeout(int i) + { + _delegate.setIdleTimeout(i); + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java index b1f6b84b72..d3a984c4f0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -28,13 +28,13 @@ import java.util.Set; import javax.net.ssl.SSLContext; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.io.NonBlockingNetworkTransport; class TCPandSSLTransport implements AcceptingTransport { @@ -78,10 +78,10 @@ class TCPandSSLTransport implements AcceptingTransport } final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(); - _networkTransport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance(); + _networkTransport = new NonBlockingNetworkTransport(); final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory( - _port.getParent(Broker.class), _transports.contains(Transport.TCP) ? _sslContext : null, + _port.getParent(Broker.class), _sslContext, settings.wantClientAuth(), settings.needClientAuth(), _supported, _defaultSupportedProtocolReply, diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java index 854cd388b9..670acc4709 100755 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java @@ -111,7 +111,10 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void send(ByteBuffer msg) { _lastWriteTime = System.currentTimeMillis(); - sender.send(msg); + ByteBuffer copy = ByteBuffer.wrap(new byte[msg.remaining()]); + copy.put(msg); + copy.flip(); + sender.send(copy); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java new file mode 100644 index 0000000000..871283ef2e --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java @@ -0,0 +1,340 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLServerSocketFactory; + +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.CommonProperties; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.OutgoingNetworkTransport; +import org.apache.qpid.transport.network.TransportActivity; +import org.apache.qpid.transport.network.security.ssl.SSLUtil; + +public abstract class AbstractNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport +{ + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, + CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); + private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + private Socket _socket; + private NetworkConnection _connection; + private AcceptingThread _acceptor; + + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, + TransportActivity transportActivity) + { + int sendBufferSize = settings.getWriteBufferSize(); + int receiveBufferSize = settings.getReadBufferSize(); + + try + { + _socket = new Socket(); + _socket.setReuseAddress(true); + _socket.setTcpNoDelay(settings.isTcpNodelay()); + _socket.setSendBufferSize(sendBufferSize); + _socket.setReceiveBufferSize(receiveBufferSize); + + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize()); + LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize()); + LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay()); + } + + InetAddress address = InetAddress.getByName(settings.getHost()); + + _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout()); + } + catch (SocketException e) + { + throw new TransportException("Error connecting to broker", e); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + + try + { + IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); + _connection = createNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); + ticker.setConnection(_connection); + _connection.start(); + } + catch(Exception e) + { + try + { + _socket.close(); + } + catch(IOException ioe) + { + //ignored, throw based on original exception + } + + throw new TransportException("Error creating network connection", e); + } + + return _connection; + } + + public void close() + { + if(_connection != null) + { + _connection.close(); + } + if(_acceptor != null) + { + _acceptor.close(); + } + } + + public NetworkConnection getConnection() + { + return _connection; + } + + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) + { + try + { + _acceptor = new AcceptingThread(config, factory, sslContext); + _acceptor.setDaemon(false); + _acceptor.start(); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + } + + public int getAcceptingPort() + { + return _acceptor == null ? -1 : _acceptor.getPort(); + } + + protected abstract NetworkConnection createNetworkConnection(Socket socket, + Receiver<ByteBuffer> engine, + Integer sendBufferSize, + Integer receiveBufferSize, + int timeout, + IdleTimeoutTicker ticker); + + private class AcceptingThread extends Thread + { + private volatile boolean _closed = false; + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContext; + private ServerSocket _serverSocket; + private int _timeout; + + private AcceptingThread(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) throws IOException + { + _config = config; + _factory = factory; + _sslContext = sslContext; + _timeout = TIMEOUT; + + InetSocketAddress address = config.getAddress(); + + if(sslContext == null) + { + _serverSocket = new ServerSocket(); + } + else + { + SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); + _serverSocket = socketFactory.createServerSocket(); + + SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket; + + SSLUtil.removeSSLv3Support(sslServerSocket); + + if(config.needClientAuth()) + { + sslServerSocket.setNeedClientAuth(true); + } + else if(config.wantClientAuth()) + { + sslServerSocket.setWantClientAuth(true); + } + + } + + _serverSocket.setReuseAddress(true); + _serverSocket.bind(address); + } + + + /** + Close the underlying ServerSocket if it has not already been closed. + */ + public void close() + { + LOGGER.debug("Shutting down the Acceptor"); + _closed = true; + + if (!_serverSocket.isClosed()) + { + try + { + _serverSocket.close(); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + + private int getPort() + { + return _serverSocket.getLocalPort(); + } + + @Override + public void run() + { + try + { + while (!_closed) + { + Socket socket = null; + try + { + socket = _serverSocket.accept(); + + ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress()); + + if(engine != null) + { + socket.setTcpNoDelay(_config.getTcpNoDelay()); + socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socket.setSendBufferSize(sendBufferSize); + socket.setReceiveBufferSize(receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NetworkConnection connection = + createNetworkConnection(socket, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker); + + connection.setMaxReadIdle(HANSHAKE_TIMEOUT); + + ticker.setConnection(connection); + + engine.setNetworkConnection(connection, connection.getSender()); + + connection.start(); + } + else + { + socket.close(); + } + } + catch(RuntimeException e) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket); + } + catch(IOException e) + { + if(!_closed) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket); + try + { + //Delay to avoid tight spinning the loop during issues such as too many open files + Thread.sleep(1000); + } + catch (InterruptedException ie) + { + LOGGER.debug("Stopping acceptor due to interrupt request"); + _closed = true; + } + } + } + } + } + finally + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + + _config.getAddress()); + } + } + } + + private void closeSocketIfNecessary(final Socket socket) + { + if(socket != null) + { + try + { + socket.close(); + } + catch (IOException e) + { + LOGGER.debug("Exception while closing socket", e); + } + } + } + + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java index e5bc9fa977..f33f626601 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java @@ -20,312 +20,25 @@ */ package org.apache.qpid.transport.network.io; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketException; import java.nio.ByteBuffer; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLServerSocket; -import javax.net.ssl.SSLServerSocketFactory; - -import org.slf4j.LoggerFactory; - -import org.apache.qpid.configuration.CommonProperties; -import org.apache.qpid.protocol.ProtocolEngine; -import org.apache.qpid.protocol.ProtocolEngineFactory; -import org.apache.qpid.transport.ConnectionSettings; -import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.Receiver; -import org.apache.qpid.transport.TransportException; -import org.apache.qpid.transport.network.IncomingNetworkTransport; -import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.transport.network.OutgoingNetworkTransport; -import org.apache.qpid.transport.network.TransportActivity; -import org.apache.qpid.transport.network.security.ssl.SSLUtil; -public class IoNetworkTransport implements OutgoingNetworkTransport, IncomingNetworkTransport +public class IoNetworkTransport extends AbstractNetworkTransport { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(IoNetworkTransport.class); - private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, - CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); - private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , - CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); - - - private Socket _socket; - private IoNetworkConnection _connection; - private AcceptingThread _acceptor; - - public NetworkConnection connect(ConnectionSettings settings, - Receiver<ByteBuffer> delegate, - TransportActivity transportActivity) - { - int sendBufferSize = settings.getWriteBufferSize(); - int receiveBufferSize = settings.getReadBufferSize(); - - try - { - _socket = new Socket(); - _socket.setReuseAddress(true); - _socket.setTcpNoDelay(settings.isTcpNodelay()); - _socket.setSendBufferSize(sendBufferSize); - _socket.setReceiveBufferSize(receiveBufferSize); - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("SO_RCVBUF : " + _socket.getReceiveBufferSize()); - LOGGER.debug("SO_SNDBUF : " + _socket.getSendBufferSize()); - LOGGER.debug("TCP_NODELAY : " + _socket.getTcpNoDelay()); - } - - InetAddress address = InetAddress.getByName(settings.getHost()); - - _socket.connect(new InetSocketAddress(address, settings.getPort()), settings.getConnectTimeout()); - } - catch (SocketException e) - { - throw new TransportException("Error connecting to broker", e); - } - catch (IOException e) - { - throw new TransportException("Error connecting to broker", e); - } - - try - { - IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); - _connection = new IoNetworkConnection(_socket, delegate, sendBufferSize, receiveBufferSize, TIMEOUT, ticker); - ticker.setConnection(_connection); - _connection.start(); - } - catch(Exception e) - { - try - { - _socket.close(); - } - catch(IOException ioe) - { - //ignored, throw based on original exception - } - - throw new TransportException("Error creating network connection", e); - } - - return _connection; - } - - public void close() - { - if(_connection != null) - { - _connection.close(); - } - if(_acceptor != null) - { - _acceptor.close(); - } - } - - public NetworkConnection getConnection() - { - return _connection; - } - - public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext) - { - try - { - _acceptor = new AcceptingThread(config, factory, sslContext); - _acceptor.setName(String.format("IoNetworkAcceptor - %s", config.getAddress())); - _acceptor.setDaemon(false); - _acceptor.start(); - } - catch (IOException e) - { - throw new TransportException("Failed to start AMQP on port : " + config, e); - } - } - - public int getAcceptingPort() - { - return _acceptor == null ? -1 : _acceptor.getPort(); - } - private class AcceptingThread extends Thread + @Override + protected IoNetworkConnection createNetworkConnection(final Socket socket, + final Receiver<ByteBuffer> engine, + final Integer sendBufferSize, + final Integer receiveBufferSize, + final int timeout, + final IdleTimeoutTicker ticker) { - private volatile boolean _closed = false; - private NetworkTransportConfiguration _config; - private ProtocolEngineFactory _factory; - private SSLContext _sslContext; - private ServerSocket _serverSocket; - private int _timeout; - - private AcceptingThread(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext) throws IOException - { - _config = config; - _factory = factory; - _sslContext = sslContext; - _timeout = TIMEOUT; - - InetSocketAddress address = config.getAddress(); - - if(sslContext == null) - { - _serverSocket = new ServerSocket(); - } - else - { - SSLServerSocketFactory socketFactory = _sslContext.getServerSocketFactory(); - _serverSocket = socketFactory.createServerSocket(); - - SSLServerSocket sslServerSocket = (SSLServerSocket) _serverSocket; - - SSLUtil.removeSSLv3Support(sslServerSocket); - - if(config.needClientAuth()) - { - sslServerSocket.setNeedClientAuth(true); - } - else if(config.wantClientAuth()) - { - sslServerSocket.setWantClientAuth(true); - } - - } - - _serverSocket.setReuseAddress(true); - _serverSocket.bind(address); - } - - - /** - Close the underlying ServerSocket if it has not already been closed. - */ - public void close() - { - LOGGER.debug("Shutting down the Acceptor"); - _closed = true; - - if (!_serverSocket.isClosed()) - { - try - { - _serverSocket.close(); - } - catch (IOException e) - { - throw new TransportException(e); - } - } - } - - private int getPort() - { - return _serverSocket.getLocalPort(); - } - - @Override - public void run() - { - try - { - while (!_closed) - { - Socket socket = null; - try - { - socket = _serverSocket.accept(); - - ProtocolEngine engine = _factory.newProtocolEngine(socket.getRemoteSocketAddress()); - - if(engine != null) - { - socket.setTcpNoDelay(_config.getTcpNoDelay()); - socket.setSoTimeout(1000 * HANSHAKE_TIMEOUT); - - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); - - socket.setSendBufferSize(sendBufferSize); - socket.setReceiveBufferSize(receiveBufferSize); - - - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - NetworkConnection connection = - new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, _timeout, - ticker); - - connection.setMaxReadIdle(HANSHAKE_TIMEOUT); - - ticker.setConnection(connection); - - engine.setNetworkConnection(connection, connection.getSender()); - - connection.start(); - } - else - { - socket.close(); - } - } - catch(RuntimeException e) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socket); - } - catch(IOException e) - { - if(!_closed) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socket); - try - { - //Delay to avoid tight spinning the loop during issues such as too many open files - Thread.sleep(1000); - } - catch (InterruptedException ie) - { - LOGGER.debug("Stopping acceptor due to interrupt request"); - _closed = true; - } - } - } - } - } - finally - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + _config.getAddress()); - } - } - } - - private void closeSocketIfNecessary(final Socket socket) - { - if(socket != null) - { - try - { - socket.close(); - } - catch (IOException e) - { - LOGGER.debug("Exception while closing socket", e); - } - } - } - + return new IoNetworkConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, + ticker); } } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java new file mode 100644 index 0000000000..c848ba7a16 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingConnection.java @@ -0,0 +1,147 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.security.Principal; + +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSocket; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.Ticker; + +public class NonBlockingConnection implements NetworkConnection +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); + private final SocketChannel _socket; + private final long _timeout; + private final NonBlockingSender _ioSender; + private final NonBlockingReceiver _ioReceiver; + private int _maxReadIdle; + private int _maxWriteIdle; + private Principal _principal; + private boolean _principalChecked; + private final Object _lock = new Object(); + + public NonBlockingConnection(SocketChannel socket, Receiver<ByteBuffer> delegate, + int sendBufferSize, int receiveBufferSize, long timeout, Ticker ticker) + { + _socket = socket; + _timeout = timeout; + + _ioReceiver = new NonBlockingReceiver(_socket, delegate, receiveBufferSize,_timeout); + _ioReceiver.setTicker(ticker); + + _ioSender = new NonBlockingSender(_socket, 2 * sendBufferSize, _timeout); + + _ioSender.setReceiver(_ioReceiver); + + } + + public void start() + { + _ioSender.initiate(); + _ioReceiver.initiate(); + } + + public Sender<ByteBuffer> getSender() + { + return _ioSender; + } + + public void close() + { + try + { + _ioSender.close(); + } + finally + { + _ioReceiver.close(false); + } + } + + public SocketAddress getRemoteAddress() + { + return _socket.socket().getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() + { + return _socket.socket().getLocalSocketAddress(); + } + + public void setMaxWriteIdle(int sec) + { + _maxWriteIdle = sec; + } + + public void setMaxReadIdle(int sec) + { + _maxReadIdle = sec; + } + + @Override + public Principal getPeerPrincipal() + { + synchronized (_lock) + { + if(!_principalChecked) + { + if(_socket.socket() instanceof SSLSocket) + { + try + { + _principal = ((SSLSocket) _socket.socket()).getSession().getPeerPrincipal(); + } + catch(SSLPeerUnverifiedException e) + { + _principal = null; + } + } + + _principalChecked = true; + } + + return _principal; + } + } + + @Override + public int getMaxReadIdle() + { + return _maxReadIdle; + } + + @Override + public int getMaxWriteIdle() + { + return _maxWriteIdle; + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java new file mode 100644 index 0000000000..3d45dafa05 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java @@ -0,0 +1,319 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +import javax.net.ssl.SSLContext; + +import org.slf4j.LoggerFactory; + +import org.apache.qpid.configuration.CommonProperties; +import org.apache.qpid.protocol.ProtocolEngine; +import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.transport.ConnectionSettings; +import org.apache.qpid.transport.NetworkTransportConfiguration; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.IncomingNetworkTransport; +import org.apache.qpid.transport.network.NetworkConnection; +import org.apache.qpid.transport.network.TransportActivity; + +public class NonBlockingNetworkTransport implements IncomingNetworkTransport +{ + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, + CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); + private static final int HANSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + private AcceptingThread _acceptor; + + private SocketChannel _socketChannel; + private NonBlockingConnection _connection; + + public NetworkConnection connect(ConnectionSettings settings, + Receiver<ByteBuffer> delegate, + TransportActivity transportActivity) + { + int sendBufferSize = settings.getWriteBufferSize(); + int receiveBufferSize = settings.getReadBufferSize(); + + try + { + _socketChannel = SocketChannel.open(); + _socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); + _socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, settings.isTcpNodelay()); + _socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + _socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("SO_RCVBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_RCVBUF)); + LOGGER.debug("SO_SNDBUF : " + _socketChannel.getOption(StandardSocketOptions.SO_SNDBUF)); + LOGGER.debug("TCP_NODELAY : " + _socketChannel.getOption(StandardSocketOptions.TCP_NODELAY)); + } + + InetAddress address = InetAddress.getByName(settings.getHost()); + + _socketChannel.socket().connect(new InetSocketAddress(address, settings.getPort()), + settings.getConnectTimeout()); + } + catch (IOException e) + { + throw new TransportException("Error connecting to broker", e); + } + + try + { + IdleTimeoutTicker ticker = new IdleTimeoutTicker(transportActivity, TIMEOUT); + _connection = createNetworkConnection(_socketChannel, delegate, sendBufferSize, receiveBufferSize, + TIMEOUT, ticker); + ticker.setConnection(_connection); + _connection.start(); + } + catch(Exception e) + { + try + { + _socketChannel.close(); + } + catch(IOException ioe) + { + //ignored, throw based on original exception + } + + throw new TransportException("Error creating network connection", e); + } + + return _connection; + } + + + protected NonBlockingConnection createNetworkConnection(final SocketChannel socket, + final Receiver<ByteBuffer> engine, + final Integer sendBufferSize, + final Integer receiveBufferSize, + final int timeout, + final IdleTimeoutTicker ticker) + { + return new NonBlockingConnection(socket, engine, sendBufferSize, receiveBufferSize, timeout, ticker); + } + + public void close() + { + if(_connection != null) + { + _connection.close(); + } + if(_acceptor != null) + { + _acceptor.close(); + } + } + + public NetworkConnection getConnection() + { + return _connection; + } + + public void accept(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) + { + try + { + _acceptor = new AcceptingThread(config, factory, sslContext); + _acceptor.setDaemon(false); + _acceptor.start(); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + } + + public int getAcceptingPort() + { + return _acceptor == null ? -1 : _acceptor.getPort(); + } + + private class AcceptingThread extends Thread + { + private volatile boolean _closed = false; + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContext; + private ServerSocketChannel _serverSocket; + private int _timeout; + + private AcceptingThread(NetworkTransportConfiguration config, + ProtocolEngineFactory factory, + SSLContext sslContext) throws IOException + { + _config = config; + _factory = factory; + _sslContext = sslContext; + _timeout = TIMEOUT; + + InetSocketAddress address = config.getAddress(); + + _serverSocket = ServerSocketChannel.open(); + + _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true); + _serverSocket.bind(address); + } + + + /** + Close the underlying ServerSocket if it has not already been closed. + */ + public void close() + { + LOGGER.debug("Shutting down the Acceptor"); + _closed = true; + + if (!_serverSocket.socket().isClosed()) + { + try + { + _serverSocket.close(); + } + catch (IOException e) + { + throw new TransportException(e); + } + } + } + + private int getPort() + { + return _serverSocket.socket().getLocalPort(); + } + + @Override + public void run() + { + try + { + while (!_closed) + { + SocketChannel socket = null; + try + { + socket = _serverSocket.accept(); + + ProtocolEngine engine = _factory.newProtocolEngine(socket.socket().getRemoteSocketAddress()); + + if(engine != null) + { + socket.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socket.socket().setSoTimeout(1000 * HANSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socket.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socket.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NetworkConnection connection = + createNetworkConnection(socket, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker); + + connection.setMaxReadIdle(HANSHAKE_TIMEOUT); + + ticker.setConnection(connection); + + engine.setNetworkConnection(connection, connection.getSender()); + + connection.start(); + } + else + { + socket.close(); + } + } + catch(RuntimeException e) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket.socket()); + } + catch(IOException e) + { + if(!_closed) + { + LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); + closeSocketIfNecessary(socket.socket()); + try + { + //Delay to avoid tight spinning the loop during issues such as too many open files + Thread.sleep(1000); + } + catch (InterruptedException ie) + { + LOGGER.debug("Stopping acceptor due to interrupt request"); + _closed = true; + } + } + } + } + } + finally + { + if(LOGGER.isDebugEnabled()) + { + LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " + + _config.getAddress()); + } + } + } + + private void closeSocketIfNecessary(final Socket socket) + { + if(socket != null) + { + try + { + socket.close(); + } + catch (IOException e) + { + LOGGER.debug("Exception while closing socket", e); + } + } + } + + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java new file mode 100644 index 0000000000..ccd7170b62 --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingReceiver.java @@ -0,0 +1,266 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.net.ssl.SSLSocket; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Receiver; +import org.apache.qpid.transport.TransportException; +import org.apache.qpid.transport.network.Ticker; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.util.SystemUtils; + +/** + * IoReceiver + * + */ + +final class NonBlockingReceiver implements Runnable +{ + + private static final Logger log = Logger.get(NonBlockingReceiver.class); + + private final Receiver<ByteBuffer> receiver; + private final int bufferSize; + private final SocketChannel socket; + private final long timeout; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Thread receiverThread; + private static final boolean shutdownBroken; + + private Ticker _ticker; + static + { + shutdownBroken = SystemUtils.isWindows(); + } + + public NonBlockingReceiver(SocketChannel socket, Receiver<ByteBuffer> receiver, int bufferSize, long timeout) + { + this.receiver = receiver; + this.bufferSize = bufferSize; + this.socket = socket; + this.timeout = timeout; + + try + { + //Create but deliberately don't start the thread. + receiverThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new RuntimeException("Error creating IOReceiver thread",e); + } + receiverThread.setDaemon(true); + receiverThread.setName(String.format("IoReceiver - %s", socket.socket().getRemoteSocketAddress())); + } + + public void initiate() + { + receiverThread.start(); + } + + public void close() + { + close(false); + } + + void close(boolean block) + { + if (!closed.getAndSet(true)) + { + try + { + try + { + if (shutdownBroken || socket.socket() instanceof SSLSocket) + { + socket.close(); + } + else + { + socket.shutdownInput(); + } + } + catch(SocketException se) + { + if(!socket.socket().isClosed() && !socket.socket().isInputShutdown()) + { + throw se; + } + } + if (block && Thread.currentThread() != receiverThread) + { + receiverThread.join(timeout); + if (receiverThread.isAlive()) + { + throw new TransportException("join timed out"); + } + } + } + catch (InterruptedException e) + { + throw new TransportException(e); + } + catch (IOException e) + { + throw new TransportException(e); + } + + } + } + + public void run() + { + final int threshold = bufferSize / 2; + + // I set the read buffer size similar to SO_RCVBUF + // Haven't tested with a lower value to see if it's better or worse + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + try + { + int read = 0; + long currentTime; + while(read != -1) + { + try + { + while ((read = socket.read(buffer)) != -1) + { + if (read > 0) + { + ByteBuffer b = buffer.duplicate(); + b.flip(); + receiver.received(b); + + if (buffer.remaining() < threshold) + { + buffer = ByteBuffer.allocate(bufferSize); + } + else + { + buffer = buffer.slice(); + } + } + currentTime = System.currentTimeMillis(); + + if(_ticker != null) + { + int tick = _ticker.getTimeToNextTick(currentTime); + if(tick <= 0) + { + tick = _ticker.tick(currentTime); + } + try + { + if(!socket.socket().isClosed()) + { + socket.socket().setSoTimeout(tick <= 0 ? 1 : tick); + } + } + catch(SocketException e) + { + // ignore - closed socket + } + } + } + } + catch (SocketTimeoutException e) + { + currentTime = System.currentTimeMillis(); + if(_ticker != null) + { + final int tick = _ticker.tick(currentTime); + if(!socket.socket().isClosed()) + { + try + { + socket.socket().setSoTimeout(tick <= 0 ? 1 : tick ); + } + catch(SocketException ex) + { + // ignore - closed socket + } + } + } + } + } + } + catch (Exception t) + { + if (shouldReport(t)) + { + receiver.exception(t); + } + } + finally + { + receiver.closed(); + try + { + socket.close(); + } + catch(Exception e) + { + log.warn(e, "Error closing socket"); + } + } + } + + private boolean shouldReport(Throwable t) + { + boolean brokenClose = closed.get() && + shutdownBroken && + t instanceof SocketException && + "socket closed".equalsIgnoreCase(t.getMessage()); + + boolean sslSocketClosed = closed.get() && + socket.socket() instanceof SSLSocket && + t instanceof SocketException && + "Socket is closed".equalsIgnoreCase(t.getMessage()); + + boolean recvFailed = closed.get() && + shutdownBroken && + t instanceof SocketException && + "Socket operation on nonsocket: recv failed".equalsIgnoreCase(t.getMessage()); + + return !brokenClose && !sslSocketClosed && !recvFailed; + } + + public Ticker getTicker() + { + return _ticker; + } + + public void setTicker(Ticker ticker) + { + _ticker = ticker; + } + + +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java new file mode 100644 index 0000000000..e3d604d2ba --- /dev/null +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSender.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.transport.network.io; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.qpid.thread.Threading; +import org.apache.qpid.transport.Sender; +import org.apache.qpid.transport.SenderClosedException; +import org.apache.qpid.transport.SenderException; +import org.apache.qpid.transport.util.Logger; + + +public final class NonBlockingSender implements Runnable, Sender<ByteBuffer> +{ + + private static final Logger log = Logger.get(NonBlockingSender.class); + + // by starting here, we ensure that we always test the wraparound + // case, we should probably make this configurable somehow so that + // we can test other cases as well + private final static int START = Integer.MAX_VALUE - 10; + + private final long timeout; + private final SocketChannel socket; + + private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); + private final Object _isEmpty = new Object(); + + private volatile boolean idle = true; + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Thread senderThread; + private NonBlockingReceiver _receiver; + private final String _remoteSocketAddress; + + private volatile Throwable exception = null; + + public NonBlockingSender(SocketChannel socket, int bufferSize, long timeout) + { + this.socket = socket; + this.timeout = timeout; + _remoteSocketAddress = socket.socket().getRemoteSocketAddress().toString(); + + + try + { + //Create but deliberately don't start the thread. + senderThread = Threading.getThreadFactory().createThread(this); + } + catch(Exception e) + { + throw new Error("Error creating IOSender thread",e); + } + + senderThread.setDaemon(true); + senderThread.setName(String.format("IoSender - %s", _remoteSocketAddress)); + } + + public void initiate() + { + senderThread.start(); + } + + + public void send(ByteBuffer buf) + { + checkNotAlreadyClosed(); + + if(!senderThread.isAlive()) + { + throw new SenderException(String.format("sender thread for socket %s is not alive", _remoteSocketAddress)); + } + + + _buffers.add(buf); + synchronized (_isEmpty) + { + _isEmpty.notifyAll(); + } + + } + + public void flush() + { + if (idle) + { + synchronized (_isEmpty) + { + _isEmpty.notify(); + } + } + } + + public void close() + { + close(true, true); + } + + private void close(boolean awaitSenderBeforeClose, boolean reportException) + { + if (!closed.getAndSet(true)) + { + + synchronized (_isEmpty) + { + _isEmpty.notify(); + } + + try + { + if (awaitSenderBeforeClose) + { + awaitSenderThreadShutdown(); + } + } + finally + { + closeReceiver(); + } + if (reportException && exception != null) + { + throw new SenderException(exception); + } + } + } + + private void closeReceiver() + { + if(_receiver != null) + { + try + { + _receiver.close(); + } + catch(RuntimeException e) + { + log.error(e, "Exception closing receiver for socket %s", _remoteSocketAddress); + throw new SenderException(e.getMessage(), e); + } + } + } + + public void run() + { + while (true) + { + + if (closed.get()) + { + break; + } + + idle = true; + + synchronized (_isEmpty) + { + while (_buffers.isEmpty() && !closed.get()) + { + try + { + _isEmpty.wait(); + } + catch (InterruptedException e) + { + // pass + } + } + } + + idle = false; + + ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; + Iterator<ByteBuffer> bufferIterator = _buffers.iterator(); + for(int i = 0; i < bufArray.length; i++) + { + bufArray[i] = bufferIterator.next(); + } + + try + { + socket.write(bufArray); + for(ByteBuffer buf : bufArray) + { + if(buf.remaining() == 0) + { + _buffers.poll(); + } + else + { + break; + } + } + } + catch (IOException e) + { + log.info("Exception in thread sending to '" + _remoteSocketAddress + "': " + e); + exception = e; + close(false, false); + break; + } + + } + + } + + public void setIdleTimeout(int i) + { + try + { + socket.socket().setSoTimeout(i); + } + catch (Exception e) + { + throw new SenderException(e); + } + } + + public void setReceiver(NonBlockingReceiver receiver) + { + _receiver = receiver; + } + + private void awaitSenderThreadShutdown() + { + if (Thread.currentThread() != senderThread) + { + try + { + senderThread.join(timeout); + if (senderThread.isAlive()) + { + log.error("join timed out for socket %s to stop", _remoteSocketAddress); + throw new SenderException(String.format("join timed out for socket %s to stop", _remoteSocketAddress)); + } + } + catch (InterruptedException e) + { + log.error("interrupted whilst waiting for sender thread for socket %s to stop", _remoteSocketAddress); + throw new SenderException(e); + } + } + } + + private void checkNotAlreadyClosed() + { + if (closed.get()) + { + throw new SenderClosedException(String.format("sender for socket %s is closed", _remoteSocketAddress), exception); + } + } +} diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java index 24f95d7798..e69de29bb2 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.transport.network.security.ssl; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLEngineResult.HandshakeStatus; -import javax.net.ssl.SSLEngineResult.Status; -import javax.net.ssl.SSLException; -import org.apache.qpid.transport.Sender; -import org.apache.qpid.transport.SenderException; -import org.apache.qpid.transport.network.security.SSLStatus; -import org.apache.qpid.transport.util.Logger; - -public class SSLBufferingSender implements Sender<ByteBuffer> -{ - private static final Logger log = Logger.get(SSLBufferingSender.class); - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); - - private final Sender<ByteBuffer> delegate; - private final SSLEngine engine; - private final int sslBufSize; - private final ByteBuffer netData; - private final SSLStatus _sslStatus; - - private String _hostname; - - private final AtomicBoolean closed = new AtomicBoolean(false); - private ByteBuffer _appData = EMPTY_BYTE_BUFFER; - - - public SSLBufferingSender(SSLEngine engine, Sender<ByteBuffer> delegate, SSLStatus sslStatus) - { - this.engine = engine; - this.delegate = delegate; - sslBufSize = engine.getSession().getPacketBufferSize(); - netData = ByteBuffer.allocate(sslBufSize); - _sslStatus = sslStatus; - } - - public void setHostname(String hostname) - { - _hostname = hostname; - } - - public void close() - { - if (!closed.getAndSet(true)) - { - if (engine.isOutboundDone()) - { - return; - } - log.debug("Closing SSL connection"); - doSend(); - engine.closeOutbound(); - try - { - tearDownSSLConnection(); - } - catch(Exception e) - { - throw new SenderException("Error closing SSL connection",e); - } - - - synchronized(_sslStatus.getSslLock()) - { - while (!engine.isOutboundDone()) - { - try - { - _sslStatus.getSslLock().wait(); - } - catch(InterruptedException e) - { - // pass - } - - } - } - delegate.close(); - } - } - - private void tearDownSSLConnection() throws Exception - { - SSLEngineResult result = engine.wrap(ByteBuffer.allocate(0), netData); - Status status = result.getStatus(); - int read = result.bytesProduced(); - while (status != Status.CLOSED) - { - if (status == Status.BUFFER_OVERFLOW) - { - netData.clear(); - } - if(read > 0) - { - int limit = netData.limit(); - netData.limit(netData.position()); - netData.position(netData.position() - read); - - ByteBuffer data = netData.slice(); - - netData.limit(limit); - netData.position(netData.position() + read); - - delegate.send(data); - flush(); - } - result = engine.wrap(ByteBuffer.allocate(0), netData); - status = result.getStatus(); - read = result.bytesProduced(); - } - } - - public void flush() - { - delegate.flush(); - } - - public void send() - { - if(!closed.get()) - { - doSend(); - } - } - - public synchronized void send(ByteBuffer appData) - { - boolean buffered; - if(buffered = _appData.hasRemaining()) - { - ByteBuffer newBuf = ByteBuffer.allocate(_appData.remaining()+appData.remaining()); - newBuf.put(_appData); - newBuf.put(appData); - newBuf.flip(); - _appData = newBuf; - } - if (closed.get()) - { - throw new SenderException("SSL Sender is closed"); - } - doSend(); - if(!appData.hasRemaining()) - { - _appData = EMPTY_BYTE_BUFFER; - } - else if(!buffered) - { - _appData = ByteBuffer.allocate(appData.remaining()); - _appData.put(appData); - _appData.flip(); - } - } - - private synchronized void doSend() - { - - HandshakeStatus handshakeStatus; - Status status; - - while((_appData.hasRemaining() || engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) - && !_sslStatus.getSslErrorFlag()) - { - int read = 0; - try - { - SSLEngineResult result = engine.wrap(_appData, netData); - read = result.bytesProduced(); - status = result.getStatus(); - handshakeStatus = result.getHandshakeStatus(); - } - catch(SSLException e) - { - // Should this set _sslError?? - throw new SenderException("SSL, Error occurred while encrypting data",e); - } - - if(read > 0) - { - int limit = netData.limit(); - netData.limit(netData.position()); - netData.position(netData.position() - read); - - ByteBuffer data = netData.slice(); - - netData.limit(limit); - netData.position(netData.position() + read); - - delegate.send(data); - } - - switch(status) - { - case CLOSED: - throw new SenderException("SSLEngine is closed"); - - case BUFFER_OVERFLOW: - netData.clear(); - continue; - - case OK: - break; // do nothing - - default: - throw new IllegalStateException("SSLReceiver: Invalid State " + status); - } - - switch (handshakeStatus) - { - case NEED_WRAP: - if (netData.hasRemaining()) - { - continue; - } - - case NEED_TASK: - doTasks(); - break; - - case NEED_UNWRAP: - flush(); - return; - - case FINISHED: - if (_hostname != null) - { - SSLUtil.verifyHostname(engine, _hostname); - } - - case NOT_HANDSHAKING: - break; //do nothing - - default: - throw new IllegalStateException("SSLSender: Invalid State " + status); - } - - } - } - - private void doTasks() - { - Runnable runnable; - while ((runnable = engine.getDelegatedTask()) != null) { - runnable.run(); - } - } - - public void setIdleTimeout(int i) - { - delegate.setIdleTimeout(i); - } -} |
