diff options
| author | Keith Wall <kwall@apache.org> | 2015-03-16 16:52:32 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-03-16 16:52:32 +0000 |
| commit | 67a2cb9fe4149dc9d6cd750b3426995033ea9d9d (patch) | |
| tree | 20633ed3c757a764123d5c98811462afe72cb9b2 /qpid/java/broker-core/src | |
| parent | e756d0579c8e0f4373e56a4d608acf9eb5632f57 (diff) | |
| download | qpid-python-67a2cb9fe4149dc9d6cd750b3426995033ea9d9d.tar.gz | |
QPID-6429, QPID-6262: [Java Broker] Improve error handling in new NIO code; Remove MINA terminlogy (session etc) in 0-8 stack
* Also added uncaught exception handler in test framework (QBTC) to guard log the case where a thread exits abnormally.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667068 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
8 files changed, 437 insertions, 335 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 95b9bf8970..c587394821 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -69,8 +69,6 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends */ public LogSubject getLogSubject(); - public boolean isSessionNameUnique(byte[] name); - String getRemoteAddressString(); SocketAddress getRemoteAddress(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java index 0cf34af2ac..dfb843b708 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java @@ -92,7 +92,6 @@ public class KerberosAuthenticationManager extends AbstractAuthenticationManager } catch (SaslException e) { - e.printStackTrace(System.err); return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java new file mode 100644 index 0000000000..36fd63c360 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java @@ -0,0 +1,114 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class NetworkConnectionScheduler +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class); + + private final SelectorThread _selectorThread; + private final ScheduledThreadPoolExecutor _executor; + private final AtomicInteger _running = new AtomicInteger(); + private final int _poolSize; + + NetworkConnectionScheduler(final SelectorThread selectorThread) + { + _selectorThread = selectorThread; + _poolSize = Runtime.getRuntime().availableProcessors(); + _executor = new ScheduledThreadPoolExecutor(_poolSize); + _executor.prestartAllCoreThreads(); + } + + public void schedule(final NonBlockingConnection connection) + { + _executor.submit(new Runnable() + { + @Override + public void run() + { + String currentName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName( + SelectorThread.IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString()); + processConnection(connection); + } + finally + { + Thread.currentThread().setName(currentName); + } + } + }); + } + + private void processConnection(final NonBlockingConnection connection) + { + try + { + _running.incrementAndGet(); + boolean rerun; + do + { + rerun = false; + boolean closed = connection.doWork(); + + if (!closed) + { + + if (connection.isStateChanged()) + { + if (_running.get() == _poolSize) + { + schedule(connection); + } + else + { + rerun = true; + } + } + else + { + _selectorThread.addConnection(connection); + } + } + + } while (rerun); + } + finally + { + _running.decrementAndGet(); + } + } + + public void close() + { + _executor.shutdown(); + } + + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index ae5816a0d1..4e27fa8476 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.ByteBufferSender; -import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportEncryption; @@ -55,18 +55,12 @@ import org.apache.qpid.util.SystemUtils; public class NonBlockingConnection implements NetworkConnection, ByteBufferSender { private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); + private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; + private final SocketChannel _socketChannel; - private final long _timeout; private final Ticker _ticker; + private final Object _peerPrincipalLock = new Object(); private final SelectorThread _selector; - private int _maxReadIdle; - private int _maxWriteIdle; - private Principal _principal; - private boolean _principalChecked; - private final Object _lock = new Object(); - - public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; - private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); private final List<ByteBuffer> _encryptedOutput = new ArrayList<>(); @@ -74,9 +68,14 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende private final AtomicBoolean _closed = new AtomicBoolean(false); private final ServerProtocolEngine _protocolEngine; private final int _receiveBufSize; - private final Set<TransportEncryption> _encryptionSet; - private final SSLContext _sslContext; private final Runnable _onTransportEncryptionAction; + + + private int _maxReadIdle; + private int _maxWriteIdle; + private Principal _principal; + private boolean _principalChecked; + private ByteBuffer _netInputBuffer; private SSLEngine _sslEngine; @@ -90,9 +89,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende public NonBlockingConnection(SocketChannel socketChannel, ServerProtocolEngine delegate, - int sendBufferSize, int receiveBufferSize, - long timeout, Ticker ticker, final Set<TransportEncryption> encryptionSet, final SSLContext sslContext, @@ -104,14 +101,11 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende final SelectorThread selectorThread) { _socketChannel = socketChannel; - _timeout = timeout; _ticker = ticker; _selector = selectorThread; _protocolEngine = delegate; _receiveBufSize = receiveBufferSize; - _encryptionSet = encryptionSet; - _sslContext = sslContext; _onTransportEncryptionAction = onTransportEncryptionAction; delegate.setWorkListener(new Action<ServerProtocolEngine>() @@ -125,7 +119,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende if(encryptionSet.size() == 1) { - _transportEncryption = _encryptionSet.iterator().next(); + _transportEncryption = encryptionSet.iterator().next(); if (_transportEncryption == TransportEncryption.TLS) { onTransportEncryptionAction.run(); @@ -134,7 +128,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende if(encryptionSet.contains(TransportEncryption.TLS)) { - _sslEngine = _sslContext.createSSLEngine(); + _sslEngine = sslContext.createSSLEngine(); _sslEngine.setUseClientMode(false); SSLUtil.removeSSLv3Support(_sslEngine); SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites); @@ -150,26 +144,16 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2)); } - try - { - _remoteSocketAddress = _socketChannel.getRemoteAddress().toString(); - _socketChannel.configureBlocking(false); - } - catch (IOException e) - { - throw new SenderException("Unable to prepare the channel for non-blocking IO", e); - } - - + _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString(); } - public Ticker getTicker() + Ticker getTicker() { return _ticker; } - public SocketChannel getSocketChannel() + SocketChannel getSocketChannel() { return _socketChannel; } @@ -189,7 +173,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende if(_closed.compareAndSet(false,true)) { _protocolEngine.notifyWork(); - getSelector().wakeup(); + _selector.wakeup(); } } @@ -216,7 +200,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende @Override public Principal getPeerPrincipal() { - synchronized (_lock) + synchronized (_peerPrincipalLock) { if(!_principalChecked) { @@ -301,7 +285,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende // tell all consumer targets that it is okay to accept more _protocolEngine.setMessageAssignmentSuspended(false); } - catch (IOException e) + catch (IOException | ConnectionScopedRuntimeException e) { LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); LOGGER.debug("Closing " + _remoteSocketAddress); @@ -359,22 +343,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende } - public SelectorThread getSelector() - { - return _selector; - } - - public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - public boolean doRead() throws IOException + private boolean doRead() throws IOException { boolean readData = false; if(_transportEncryption == TransportEncryption.NONE) @@ -496,7 +465,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende return readData; } - public boolean doWrite() throws IOException + private boolean doWrite() throws IOException { ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; @@ -589,18 +558,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende } } - public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes) - { - return headerBytes[0] == 22 && // SSL Handshake - (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[2] == 0 || // SSL 3.0 - headerBytes[2] == 1 || // TLS 1.0 - headerBytes[2] == 2 || // TLS 1.1 - headerBytes[2] == 3)) && // TLS1.2 - (headerBytes[5] == 1); // client_hello - } - - public boolean runSSLEngineTasks(final SSLEngineResult status) + private boolean runSSLEngineTasks(final SSLEngineResult status) { if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) { @@ -614,15 +572,11 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende return false; } - public boolean looksLikeSSL(final byte[] headerBytes) - { - return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); - } - @Override public void send(final ByteBuffer msg) { - assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName(); + assert _selector.isIOThread() : "Send called by unexpected thread " + Thread.currentThread().getName(); + if (_closed.get()) { @@ -631,7 +585,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende else { _buffers.add(msg); - _protocolEngine.notifyWork(); + _protocolEngine.notifyWork(); // TODO now redundant } } @@ -639,4 +593,36 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende public void flush() { } + + @Override + public String toString() + { + return "[NonBlockingConnection " + _remoteSocketAddress + "]"; + } + + private boolean looksLikeSSL(final byte[] headerBytes) + { + return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); + } + + private boolean looksLikeSSLv3ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == 22 && // SSL Handshake + (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[2] == 0 || // SSL 3.0 + headerBytes[2] == 1 || // TLS 1.0 + headerBytes[2] == 2 || // TLS 1.1 + headerBytes[2] == 3)) && // TLS1.2 + (headerBytes[5] == 1); // client_hello + } + + private boolean looksLikeSSLv2ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == -128 && + headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[4] == 0 || // SSL 3.0 + headerBytes[4] == 1 || // TLS 1.0 + headerBytes[4] == 2 || // TLS 1.1 + headerBytes[4] == 3); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java index 79313712a5..3bc7978931 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java @@ -25,14 +25,17 @@ import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.EnumSet; import java.util.Set; import javax.net.ssl.SSLContext; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.configuration.CommonProperties; import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.TransportException; @@ -43,51 +46,24 @@ import org.apache.qpid.transport.network.io.IdleTimeoutTicker; public class NonBlockingNetworkTransport { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); - private SelectorThread _selector; - - - private Set<TransportEncryption> _encryptionSet; - private volatile boolean _closed = false; - private NetworkTransportConfiguration _config; - private ProtocolEngineFactory _factory; - private SSLContext _sslContext; - private ServerSocketChannel _serverSocket; - private int _timeout; - - public void close() - { - if(_selector != null) - { - try - { - if (_serverSocket != null) - { - _selector.cancelAcceptingSocket(_serverSocket); - _serverSocket.close(); - } - } - catch (IOException e) - { - // TODO - e.printStackTrace(); - } - finally - { + private final Set<TransportEncryption> _encryptionSet; + private final NetworkTransportConfiguration _config; + private final ProtocolEngineFactory _factory; + private final SSLContext _sslContext; + private final ServerSocketChannel _serverSocket; + private final int _timeout; - _selector.close(); - } - } - } + private SelectorThread _selector; - public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext, - final Set<TransportEncryption> encryptionSet) + public NonBlockingNetworkTransport(final NetworkTransportConfiguration config, + final MultiVersionProtocolEngineFactory factory, + final SSLContext sslContext, + final EnumSet<TransportEncryption> encryptionSet) { try { @@ -106,80 +82,138 @@ public class NonBlockingNetworkTransport _serverSocket.configureBlocking(false); _encryptionSet = encryptionSet; - _selector = new SelectorThread(config.getAddress().toString(), this); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + + } + + public void start() + { + try + { + _selector = new SelectorThread(this); _selector.start(); _selector.addAcceptingSocket(_serverSocket); } catch (IOException e) { - throw new TransportException("Failed to start AMQP on port : " + config, e); + throw new TransportException("Failed to start", e); } + } + public void close() + { + if(_selector != null) + { + _selector.cancelAcceptingSocket(_serverSocket); + try + { + _serverSocket.close(); + } + catch (IOException e) + { + LOGGER.warn("Error closing the server socket for : " + _config.getAddress().toString(), e); + } + finally + { + _selector.close(); + _selector = null; + } + } } public int getAcceptingPort() { - return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort(); + return _serverSocket.socket().getLocalPort(); } - public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException + public NetworkTransportConfiguration getConfig() { - final ServerProtocolEngine engine = - (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() - .getRemoteSocketAddress()); + return _config; + } - if(engine != null) + void acceptSocketChannel(final ServerSocketChannel serverSocketChannel) + { + SocketChannel socketChannel = null; + boolean success = false; + try { - socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); - socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); - - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); - - socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - - - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - - NonBlockingConnection connection = - new NonBlockingConnection(socketChannel, - engine, - sendBufferSize, - receiveBufferSize, - _timeout, - ticker, - _encryptionSet, - _sslContext, - _config.wantClientAuth(), - _config.needClientAuth(), - _config.getEnabledCipherSuites(), - _config.getDisabledCipherSuites(), - new Runnable() - { - - @Override - public void run() + socketChannel = serverSocketChannel.accept(); + + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() + .getRemoteSocketAddress()); + + if(engine != null) + { + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + + final int sendBufferSize = _config.getSendBufferSize(); + final int receiveBufferSize = _config.getReceiveBufferSize(); + + socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + socketChannel.configureBlocking(false); + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, _timeout); + + NonBlockingConnection connection = + new NonBlockingConnection(socketChannel, + engine, + receiveBufferSize, + ticker, + _encryptionSet, + _sslContext, + _config.wantClientAuth(), + _config.needClientAuth(), + _config.getEnabledCipherSuites(), + _config.getDisabledCipherSuites(), + new Runnable() { - engine.encryptedTransport(); - } - }, - _selector); - engine.setNetworkConnection(connection, connection.getSender()); - connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + @Override + public void run() + { + engine.encryptedTransport(); + } + }, + _selector); + + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); - ticker.setConnection(connection); + ticker.setConnection(connection); - connection.start(); + connection.start(); - _selector.addConnection(connection); + _selector.addConnection(connection); + success = true; + } } - else + catch (IOException e) { - socketChannel.close(); + LOGGER.error("Failed to process incoming socket", e); + } + finally + { + if (!success && socketChannel != null) + { + try + { + socketChannel.close(); + } + catch (IOException e) + { + LOGGER.debug("Failed to close socket " + socketChannel, e); + } + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java index 774888e934..ff75448787 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -32,43 +32,40 @@ import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.thread.LoggingUncaughtExceptionHandler; - public class SelectorThread extends Thread { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class); - public static final String IO_THREAD_NAME_PREFIX = "NCS-"; + static final String IO_THREAD_NAME_PREFIX = "IO-"; private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); + + /** + * Queue of connections that are not currently scheduled and not registered with the selector. + * These need to go back into the Selector. + */ private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); + + /** Set of connections that are currently being selected upon */ private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); + private final Selector _selector; private final AtomicBoolean _closed = new AtomicBoolean(); - private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); + private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(this); private final NonBlockingNetworkTransport _transport; + private long _nextTimeout; - SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport) + SelectorThread(final NonBlockingNetworkTransport nonBlockingNetworkTransport) throws IOException { - super("SelectorThread-"+name); + super("SelectorThread-" + nonBlockingNetworkTransport.getConfig().getAddress().toString()); + _transport = nonBlockingNetworkTransport; - try - { - _selector = Selector.open(); - } - catch (IOException e) - { - e.printStackTrace(); - throw new RuntimeException(e); - } + _selector = Selector.open(); } public void addAcceptingSocket(final ServerSocketChannel socketChannel) @@ -83,10 +80,10 @@ public class SelectorThread extends Thread { socketChannel.register(_selector, SelectionKey.OP_ACCEPT); } - catch (ClosedChannelException e) + catch (IllegalStateException | ClosedChannelException e) { - // TODO - e.printStackTrace(); + // TODO Communicate condition back to model object to make it go into the ERROR state + LOGGER.error("Failed to register selector on accepting port", e); } } }); @@ -114,91 +111,38 @@ public class SelectorThread extends Thread public void run() { - long nextTimeout = 0; + _nextTimeout = 0; try { while (!_closed.get()) { - _selector.select(nextTimeout); - - while(_tasks.peek() != null) + try { - Runnable task = _tasks.poll(); - task.run(); + _selector.select(_nextTimeout); } - - List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - - - Set<SelectionKey> selectionKeys = _selector.selectedKeys(); - for (SelectionKey key : selectionKeys) + catch (IOException e) { - if(key.isAcceptable()) - { - // todo - should we schedule this rather than running in this thread? - SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept(); - _transport.acceptSocketChannel(acceptedChannel); - } - else - { - NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); - - key.channel().register(_selector, 0); - - toBeScheduled.add(connection); - _unscheduledConnections.remove(connection); - } - + // TODO Inform the model object + LOGGER.error("Failed to select for " + _transport.getConfig().getAddress().toString(),e ); + break; } - selectionKeys.clear(); - - while (_unregisteredConnections.peek() != null) - { - NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); - _unscheduledConnections.add(unregisteredConnection); + runTasks(); - final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) - | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); - unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); + List<NonBlockingConnection> toBeScheduled = processSelectionKeys(); - } - - long currentTime = System.currentTimeMillis(); - Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); - nextTimeout = Integer.MAX_VALUE; - while (iterator.hasNext()) - { - NonBlockingConnection connection = iterator.next(); - - int period = connection.getTicker().getTimeToNextTick(currentTime); + toBeScheduled.addAll(reregisterUnregisteredConnections()); - if (period <= 0 || connection.isStateChanged()) - { - toBeScheduled.add(connection); - connection.getSocketChannel().register(_selector, 0).cancel(); - iterator.remove(); - } - else - { - nextTimeout = Math.min(period, nextTimeout); - } - } + toBeScheduled.addAll(processUnscheduledConnections()); for (NonBlockingConnection connection : toBeScheduled) { _scheduler.schedule(connection); } - } } - catch (IOException e) - { - //TODO - e.printStackTrace(); - } finally { try @@ -207,114 +151,144 @@ public class SelectorThread extends Thread } catch (IOException e) { - e.printStackTrace(); + LOGGER.debug("Failed to close selector", e); } } - - - } - public void addConnection(final NonBlockingConnection connection) + private List<NonBlockingConnection> processUnscheduledConnections() { - _unregisteredConnections.add(connection); - _selector.wakeup(); + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - } + long currentTime = System.currentTimeMillis(); + Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); + _nextTimeout = Integer.MAX_VALUE; + while (iterator.hasNext()) + { + NonBlockingConnection connection = iterator.next(); - public void wakeup() - { - _selector.wakeup(); - } + int period = connection.getTicker().getTimeToNextTick(currentTime); - public void close() - { - _closed.set(true); - _selector.wakeup(); - _scheduler.close(); + if (period <= 0 || connection.isStateChanged()) + { + toBeScheduled.add(connection); + try + { + LOGGER.debug("KWDEBUG# Setting interest to zero (PUC) " + connection); + + SelectionKey register = connection.getSocketChannel().register(_selector, 0); + register.cancel(); + } + catch (ClosedChannelException e) + { + LOGGER.debug("Failed to register with selector for connection " + connection + + ". Connection is probably being closed by peer.", e); + } + iterator.remove(); + } + else + { + _nextTimeout = Math.min(period, _nextTimeout); + } + } + + return toBeScheduled; } - private class NetworkConnectionScheduler + private List<NonBlockingConnection> reregisterUnregisteredConnections() { - private final ScheduledThreadPoolExecutor _executor; - private final AtomicInteger _running = new AtomicInteger(); - private final int _poolSize; + List<NonBlockingConnection> unregisterableConnections = new ArrayList<>(); - private NetworkConnectionScheduler() + while (_unregisteredConnections.peek() != null) { - _poolSize = Runtime.getRuntime().availableProcessors(); - _executor = new ScheduledThreadPoolExecutor(_poolSize); - _executor.prestartAllCoreThreads(); - } + NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); + _unscheduledConnections.add(unregisteredConnection); - public void processConnection(final NonBlockingConnection connection) - { + + final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) + | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); try { - _running.incrementAndGet(); - boolean rerun; - do - { - rerun = false; - boolean closed = connection.doWork(); + LOGGER.debug("KWDEBUG# Registering " + unregisteredConnection); + unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); + } + catch (ClosedChannelException e) + { + unregisterableConnections.add(unregisteredConnection); + } + } - if (!closed) - { + return unregisterableConnections; + } - if (connection.isStateChanged()) - { - if (_running.get() == _poolSize) - { - schedule(connection); - } - else - { - rerun = true; - } - } - else - { - SelectorThread.this.addConnection(connection); - } - } + private List<NonBlockingConnection> processSelectionKeys() + { + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - } while (rerun); + Set<SelectionKey> selectionKeys = _selector.selectedKeys(); + for (SelectionKey key : selectionKeys) + { + if(key.isAcceptable()) + { + // todo - should we schedule this rather than running in this thread? + _transport.acceptSocketChannel((ServerSocketChannel)key.channel()); } - finally + else { - _running.decrementAndGet(); + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); + + try + { + LOGGER.debug("KWDEBUG# Setting interest to zero (PSK)" + connection); + + key.channel().register(_selector, 0); + } + catch (ClosedChannelException e) + { + // Ignore - we will schedule the connection anyway + } + + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); } - } - public void schedule(final NonBlockingConnection connection) - { - _executor.submit(new Runnable() - { - @Override - public void run() - { - String currentName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName( - IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString()); - processConnection(connection); - } - finally - { - Thread.currentThread().setName(currentName); - } - } - }); } + selectionKeys.clear(); + + return toBeScheduled; + } - public void close() + private void runTasks() + { + while(_tasks.peek() != null) { - _executor.shutdown(); + Runnable task = _tasks.poll(); + task.run(); } + } + + public void addConnection(final NonBlockingConnection connection) + { + _unregisteredConnections.add(connection); + _selector.wakeup(); + + } + public void wakeup() + { + _selector.wakeup(); + } + public void close() + { + _closed.set(true); + _selector.wakeup(); + _scheduler.close(); + } + boolean isIOThread() + { + return Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java index 7874437a2f..ad236733d3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -79,7 +79,8 @@ class TCPandSSLTransport implements AcceptingTransport } final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(); - _networkTransport = new NonBlockingNetworkTransport(); + + final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory( _port.getParent(Broker.class), @@ -97,7 +98,9 @@ class TCPandSSLTransport implements AcceptingTransport { encryptionSet.add(TransportEncryption.TLS); } - _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet); + + _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory, _sslContext, encryptionSet); + _networkTransport.start(); } public int getAcceptingPort() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 34afcdef66..6762c00a37 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -592,12 +592,6 @@ public class MockConsumer implements ConsumerTarget } @Override - public boolean isSessionNameUnique(byte[] name) - { - return false; - } - - @Override public String getRemoteAddressString() { return "remoteAddress:1234"; |
