diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-02-01 21:45:19 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-02-01 21:45:19 +0000 |
| commit | 60c62c03ca404e98e4fbd1abf4a5ebf50763d604 (patch) | |
| tree | fc240b7d11ec94b80e13c4f2650829b4b6c19ab6 /qpid/java | |
| parent | 50876b8a80c5bfd4ba125f87e07fe77669520c80 (diff) | |
| download | qpid-python-60c62c03ca404e98e4fbd1abf4a5ebf50763d604.tar.gz | |
Remove accepting thread and use non blocking io accept
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656365 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
2 files changed, 152 insertions, 182 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java index c231a0a7ca..6c96c0a18e 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java @@ -22,7 +22,6 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; import java.net.StandardSocketOptions; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -47,18 +46,39 @@ public class NonBlockingNetworkTransport CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); - private AcceptingThread _acceptor; private SelectorThread _selector; + + private Set<TransportEncryption> _encryptionSet; + private volatile boolean _closed = false; + private NetworkTransportConfiguration _config; + private ProtocolEngineFactory _factory; + private SSLContext _sslContext; + private ServerSocketChannel _serverSocket; + private int _timeout; + public void close() { - if(_acceptor != null) - { - _acceptor.close(); - } if(_selector != null) { - _selector.close(); + try + { + if (_serverSocket != null) + { + _selector.cancelAcceptingSocket(_serverSocket); + _serverSocket.close(); + } + } + catch (IOException e) + { + // TODO + e.printStackTrace(); + } + finally + { + + _selector.close(); + } } } @@ -69,41 +89,7 @@ public class NonBlockingNetworkTransport { try { - _acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet); - _acceptor.setDaemon(false); - _acceptor.start(); - _selector = new SelectorThread(config.getAddress().toString()); - _selector.start(); - } - catch (IOException e) - { - throw new TransportException("Failed to start AMQP on port : " + config, e); - } - - - } - - public int getAcceptingPort() - { - return _acceptor == null ? -1 : _acceptor.getPort(); - } - - private class AcceptingThread extends Thread - { - private final Set<TransportEncryption> _encryptionSet; - private volatile boolean _closed = false; - private final NetworkTransportConfiguration _config; - private final ProtocolEngineFactory _factory; - private final SSLContext _sslContext; - private final ServerSocketChannel _serverSocket; - private int _timeout; - - private AcceptingThread(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext, - final Set<TransportEncryption> encryptionSet) throws IOException - { _config = config; _factory = factory; _sslContext = sslContext; @@ -115,158 +101,83 @@ public class NonBlockingNetworkTransport _serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true); _serverSocket.bind(address); + _serverSocket.configureBlocking(false); _encryptionSet = encryptionSet; - } - - - /** - Close the underlying ServerSocket if it has not already been closed. - */ - public void close() - { - LOGGER.debug("Shutting down the Acceptor"); - _closed = true; - if (!_serverSocket.socket().isClosed()) - { - try - { - _serverSocket.close(); - } - catch (IOException e) - { - throw new TransportException(e); - } - } + _selector = new SelectorThread(config.getAddress().toString(), this); + _selector.start(); + _selector.addAcceptingSocket(_serverSocket); } - - private int getPort() + catch (IOException e) { - return _serverSocket.socket().getLocalPort(); + throw new TransportException("Failed to start AMQP on port : " + config, e); } - @Override - public void run() - { - try - { - while (!_closed) - { - SocketChannel socketChannel = null; - try - { - socketChannel = _serverSocket.accept(); - - acceptSocketChannel(socketChannel); - } - catch(RuntimeException e) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socketChannel.socket()); - } - catch(IOException e) - { - if(!_closed) - { - LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e); - closeSocketIfNecessary(socketChannel.socket()); - try - { - //Delay to avoid tight spinning the loop during issues such as too many open files - Thread.sleep(1000); - } - catch (InterruptedException ie) - { - LOGGER.debug("Stopping acceptor due to interrupt request"); - _closed = true; - } - } - } - } - } - finally - { - if(LOGGER.isDebugEnabled()) - { - LOGGER.debug("Acceptor exiting, no new connections will be accepted on address " - + _config.getAddress()); - } - } - } - public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException - { - final ServerProtocolEngine engine = - (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() - .getRemoteSocketAddress()); + } - if(engine != null) - { - socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); - socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); - - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); - - socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - - - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - - NonBlockingConnection connection = - new NonBlockingConnection(socketChannel, - engine, - sendBufferSize, - receiveBufferSize, - _timeout, - ticker, - _encryptionSet, - _sslContext, - _config.wantClientAuth(), - _config.needClientAuth(), - new Runnable() - { + public int getAcceptingPort() + { + return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort(); + } - @Override - public void run() - { - engine.encryptedTransport(); - } - }, - _selector); + public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException + { + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() + .getRemoteSocketAddress()); - engine.setNetworkConnection(connection, connection.getSender()); - connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + if(engine != null) + { + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + + final Integer sendBufferSize = _config.getSendBufferSize(); + final Integer receiveBufferSize = _config.getReceiveBufferSize(); + + socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); + + NonBlockingConnection connection = + new NonBlockingConnection(socketChannel, + engine, + sendBufferSize, + receiveBufferSize, + _timeout, + ticker, + _encryptionSet, + _sslContext, + _config.wantClientAuth(), + _config.needClientAuth(), + new Runnable() + { + + @Override + public void run() + { + engine.encryptedTransport(); + } + }, + _selector); - ticker.setConnection(connection); + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); - connection.start(); + ticker.setConnection(connection); - _selector.addConnection(connection); + connection.start(); - } - else - { - socketChannel.close(); - } - } + _selector.addConnection(connection); - private void closeSocketIfNecessary(final Socket socket) + } + else { - if(socket != null) - { - try - { - socket.close(); - } - catch (IOException e) - { - LOGGER.debug("Exception while closing socket", e); - } - } + socketChannel.close(); } - } + } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java index ff89d9b05c..bd8d3ad804 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java @@ -20,8 +20,11 @@ package org.apache.qpid.transport.network.io; import java.io.IOException; +import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -38,16 +41,18 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class SelectorThread extends Thread { - + private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); private final Selector _selector; private final AtomicBoolean _closed = new AtomicBoolean(); private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); + private final NonBlockingNetworkTransport _transport; - SelectorThread(final String name) + SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport) { super("SelectorThread-"+name); + _transport = nonBlockingNetworkTransport; try { _selector = Selector.open(); @@ -59,6 +64,45 @@ public class SelectorThread extends Thread } } + public void addAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + + try + { + socketChannel.register(_selector, SelectionKey.OP_ACCEPT); + } + catch (ClosedChannelException e) + { + // TODO + e.printStackTrace(); + } + } + }); + _selector.wakeup(); + } + + public void cancelAcceptingSocket(final ServerSocketChannel socketChannel) + { + _tasks.add(new Runnable() + { + @Override + public void run() + { + SelectionKey selectionKey = socketChannel.keyFor(_selector); + if(selectionKey != null) + { + selectionKey.cancel(); + } + } + }); + _selector.wakeup(); + } + @Override public void run() { @@ -72,18 +116,33 @@ public class SelectorThread extends Thread _selector.select(nextTimeout); + while(_tasks.peek() != null) + { + Runnable task = _tasks.poll(); + task.run(); + } + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); Set<SelectionKey> selectionKeys = _selector.selectedKeys(); for (SelectionKey key : selectionKeys) { - NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); + if(key.isAcceptable()) + { + // todo - should we schedule this rather than running in this thread? + SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept(); + _transport.acceptSocketChannel(acceptedChannel); + } + else + { + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); - key.channel().register(_selector, 0); + key.channel().register(_selector, 0); - toBeScheduled.add(connection); - _unscheduledConnections.remove(connection); + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); + } } selectionKeys.clear(); |
