summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-02-01 21:45:19 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-02-01 21:45:19 +0000
commit60c62c03ca404e98e4fbd1abf4a5ebf50763d604 (patch)
treefc240b7d11ec94b80e13c4f2650829b4b6c19ab6 /qpid/java
parent50876b8a80c5bfd4ba125f87e07fe77669520c80 (diff)
downloadqpid-python-60c62c03ca404e98e4fbd1abf4a5ebf50763d604.tar.gz
Remove accepting thread and use non blocking io accept
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1656365 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java263
-rw-r--r--qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java71
2 files changed, 152 insertions, 182 deletions
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
index c231a0a7ca..6c96c0a18e 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingNetworkTransport.java
@@ -22,7 +22,6 @@ package org.apache.qpid.transport.network.io;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.net.Socket;
import java.net.StandardSocketOptions;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@@ -47,18 +46,39 @@ public class NonBlockingNetworkTransport
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
- private AcceptingThread _acceptor;
private SelectorThread _selector;
+
+ private Set<TransportEncryption> _encryptionSet;
+ private volatile boolean _closed = false;
+ private NetworkTransportConfiguration _config;
+ private ProtocolEngineFactory _factory;
+ private SSLContext _sslContext;
+ private ServerSocketChannel _serverSocket;
+ private int _timeout;
+
public void close()
{
- if(_acceptor != null)
- {
- _acceptor.close();
- }
if(_selector != null)
{
- _selector.close();
+ try
+ {
+ if (_serverSocket != null)
+ {
+ _selector.cancelAcceptingSocket(_serverSocket);
+ _serverSocket.close();
+ }
+ }
+ catch (IOException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ finally
+ {
+
+ _selector.close();
+ }
}
}
@@ -69,41 +89,7 @@ public class NonBlockingNetworkTransport
{
try
{
- _acceptor = new AcceptingThread(config, factory, sslContext, encryptionSet);
- _acceptor.setDaemon(false);
- _acceptor.start();
- _selector = new SelectorThread(config.getAddress().toString());
- _selector.start();
- }
- catch (IOException e)
- {
- throw new TransportException("Failed to start AMQP on port : " + config, e);
- }
-
-
- }
-
- public int getAcceptingPort()
- {
- return _acceptor == null ? -1 : _acceptor.getPort();
- }
-
- private class AcceptingThread extends Thread
- {
- private final Set<TransportEncryption> _encryptionSet;
- private volatile boolean _closed = false;
- private final NetworkTransportConfiguration _config;
- private final ProtocolEngineFactory _factory;
- private final SSLContext _sslContext;
- private final ServerSocketChannel _serverSocket;
- private int _timeout;
-
- private AcceptingThread(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext,
- final Set<TransportEncryption> encryptionSet) throws IOException
- {
_config = config;
_factory = factory;
_sslContext = sslContext;
@@ -115,158 +101,83 @@ public class NonBlockingNetworkTransport
_serverSocket.setOption(StandardSocketOptions.SO_REUSEADDR, true);
_serverSocket.bind(address);
+ _serverSocket.configureBlocking(false);
_encryptionSet = encryptionSet;
- }
-
-
- /**
- Close the underlying ServerSocket if it has not already been closed.
- */
- public void close()
- {
- LOGGER.debug("Shutting down the Acceptor");
- _closed = true;
- if (!_serverSocket.socket().isClosed())
- {
- try
- {
- _serverSocket.close();
- }
- catch (IOException e)
- {
- throw new TransportException(e);
- }
- }
+ _selector = new SelectorThread(config.getAddress().toString(), this);
+ _selector.start();
+ _selector.addAcceptingSocket(_serverSocket);
}
-
- private int getPort()
+ catch (IOException e)
{
- return _serverSocket.socket().getLocalPort();
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
}
- @Override
- public void run()
- {
- try
- {
- while (!_closed)
- {
- SocketChannel socketChannel = null;
- try
- {
- socketChannel = _serverSocket.accept();
-
- acceptSocketChannel(socketChannel);
- }
- catch(RuntimeException e)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socketChannel.socket());
- }
- catch(IOException e)
- {
- if(!_closed)
- {
- LOGGER.error("Error in Acceptor thread on address " + _config.getAddress(), e);
- closeSocketIfNecessary(socketChannel.socket());
- try
- {
- //Delay to avoid tight spinning the loop during issues such as too many open files
- Thread.sleep(1000);
- }
- catch (InterruptedException ie)
- {
- LOGGER.debug("Stopping acceptor due to interrupt request");
- _closed = true;
- }
- }
- }
- }
- }
- finally
- {
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Acceptor exiting, no new connections will be accepted on address "
- + _config.getAddress());
- }
- }
- }
- public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
- {
- final ServerProtocolEngine engine =
- (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
- .getRemoteSocketAddress());
+ }
- if(engine != null)
- {
- socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
- socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
- socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
- socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-
-
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
-
- NonBlockingConnection connection =
- new NonBlockingConnection(socketChannel,
- engine,
- sendBufferSize,
- receiveBufferSize,
- _timeout,
- ticker,
- _encryptionSet,
- _sslContext,
- _config.wantClientAuth(),
- _config.needClientAuth(),
- new Runnable()
- {
+ public int getAcceptingPort()
+ {
+ return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort();
+ }
- @Override
- public void run()
- {
- engine.encryptedTransport();
- }
- },
- _selector);
+ public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
+ {
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
+ .getRemoteSocketAddress());
- engine.setNetworkConnection(connection, connection.getSender());
- connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+ if(engine != null)
+ {
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+ final Integer sendBufferSize = _config.getSendBufferSize();
+ final Integer receiveBufferSize = _config.getReceiveBufferSize();
+
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
+
+ NonBlockingConnection connection =
+ new NonBlockingConnection(socketChannel,
+ engine,
+ sendBufferSize,
+ receiveBufferSize,
+ _timeout,
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth(),
+ new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ },
+ _selector);
- ticker.setConnection(connection);
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
- connection.start();
+ ticker.setConnection(connection);
- _selector.addConnection(connection);
+ connection.start();
- }
- else
- {
- socketChannel.close();
- }
- }
+ _selector.addConnection(connection);
- private void closeSocketIfNecessary(final Socket socket)
+ }
+ else
{
- if(socket != null)
- {
- try
- {
- socket.close();
- }
- catch (IOException e)
- {
- LOGGER.debug("Exception while closing socket", e);
- }
- }
+ socketChannel.close();
}
-
}
+
}
diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
index ff89d9b05c..bd8d3ad804 100644
--- a/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
+++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
@@ -20,8 +20,11 @@
package org.apache.qpid.transport.network.io;
import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
@@ -38,16 +41,18 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class SelectorThread extends Thread
{
-
+ private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
private final Selector _selector;
private final AtomicBoolean _closed = new AtomicBoolean();
private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
+ private final NonBlockingNetworkTransport _transport;
- SelectorThread(final String name)
+ SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport)
{
super("SelectorThread-"+name);
+ _transport = nonBlockingNetworkTransport;
try
{
_selector = Selector.open();
@@ -59,6 +64,45 @@ public class SelectorThread extends Thread
}
}
+ public void addAcceptingSocket(final ServerSocketChannel socketChannel)
+ {
+ _tasks.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
+ try
+ {
+ socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
+ }
+ catch (ClosedChannelException e)
+ {
+ // TODO
+ e.printStackTrace();
+ }
+ }
+ });
+ _selector.wakeup();
+ }
+
+ public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+ {
+ _tasks.add(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ SelectionKey selectionKey = socketChannel.keyFor(_selector);
+ if(selectionKey != null)
+ {
+ selectionKey.cancel();
+ }
+ }
+ });
+ _selector.wakeup();
+ }
+
@Override
public void run()
{
@@ -72,18 +116,33 @@ public class SelectorThread extends Thread
_selector.select(nextTimeout);
+ while(_tasks.peek() != null)
+ {
+ Runnable task = _tasks.poll();
+ task.run();
+ }
+
List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
Set<SelectionKey> selectionKeys = _selector.selectedKeys();
for (SelectionKey key : selectionKeys)
{
- NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+ if(key.isAcceptable())
+ {
+ // todo - should we schedule this rather than running in this thread?
+ SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept();
+ _transport.acceptSocketChannel(acceptedChannel);
+ }
+ else
+ {
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
- key.channel().register(_selector, 0);
+ key.channel().register(_selector, 0);
- toBeScheduled.add(connection);
- _unscheduledConnections.remove(connection);
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
+ }
}
selectionKeys.clear();