summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-16 16:52:32 +0000
committerKeith Wall <kwall@apache.org>2015-03-16 16:52:32 +0000
commit67a2cb9fe4149dc9d6cd750b3426995033ea9d9d (patch)
tree20633ed3c757a764123d5c98811462afe72cb9b2 /qpid/java/broker-core/src
parente756d0579c8e0f4373e56a4d608acf9eb5632f57 (diff)
downloadqpid-python-67a2cb9fe4149dc9d6cd750b3426995033ea9d9d.tar.gz
QPID-6429, QPID-6262: [Java Broker] Improve error handling in new NIO code; Remove MINA terminlogy (session etc) in 0-8 stack
* Also added uncaught exception handler in test framework (QBTC) to guard log the case where a thread exits abnormally. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667068 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java1
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java114
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java128
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java210
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java304
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java6
8 files changed, 437 insertions, 335 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 95b9bf8970..c587394821 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -69,8 +69,6 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends
*/
public LogSubject getLogSubject();
- public boolean isSessionNameUnique(byte[] name);
-
String getRemoteAddressString();
SocketAddress getRemoteAddress();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
index 0cf34af2ac..dfb843b708 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java
@@ -92,7 +92,6 @@ public class KerberosAuthenticationManager extends AbstractAuthenticationManager
}
catch (SaslException e)
{
- e.printStackTrace(System.err);
return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
new file mode 100644
index 0000000000..36fd63c360
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -0,0 +1,114 @@
+/*
+*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transport;
+
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class NetworkConnectionScheduler
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class);
+
+ private final SelectorThread _selectorThread;
+ private final ScheduledThreadPoolExecutor _executor;
+ private final AtomicInteger _running = new AtomicInteger();
+ private final int _poolSize;
+
+ NetworkConnectionScheduler(final SelectorThread selectorThread)
+ {
+ _selectorThread = selectorThread;
+ _poolSize = Runtime.getRuntime().availableProcessors();
+ _executor = new ScheduledThreadPoolExecutor(_poolSize);
+ _executor.prestartAllCoreThreads();
+ }
+
+ public void schedule(final NonBlockingConnection connection)
+ {
+ _executor.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ String currentName = Thread.currentThread().getName();
+ try
+ {
+ Thread.currentThread().setName(
+ SelectorThread.IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString());
+ processConnection(connection);
+ }
+ finally
+ {
+ Thread.currentThread().setName(currentName);
+ }
+ }
+ });
+ }
+
+ private void processConnection(final NonBlockingConnection connection)
+ {
+ try
+ {
+ _running.incrementAndGet();
+ boolean rerun;
+ do
+ {
+ rerun = false;
+ boolean closed = connection.doWork();
+
+ if (!closed)
+ {
+
+ if (connection.isStateChanged())
+ {
+ if (_running.get() == _poolSize)
+ {
+ schedule(connection);
+ }
+ else
+ {
+ rerun = true;
+ }
+ }
+ else
+ {
+ _selectorThread.addConnection(connection);
+ }
+ }
+
+ } while (rerun);
+ }
+ finally
+ {
+ _running.decrementAndGet();
+ }
+ }
+
+ public void close()
+ {
+ _executor.shutdown();
+ }
+
+
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index ae5816a0d1..4e27fa8476 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
@@ -55,18 +55,12 @@ import org.apache.qpid.util.SystemUtils;
public class NonBlockingConnection implements NetworkConnection, ByteBufferSender
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
+ private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
+
private final SocketChannel _socketChannel;
- private final long _timeout;
private final Ticker _ticker;
+ private final Object _peerPrincipalLock = new Object();
private final SelectorThread _selector;
- private int _maxReadIdle;
- private int _maxWriteIdle;
- private Principal _principal;
- private boolean _principalChecked;
- private final Object _lock = new Object();
-
- public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
-
private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final List<ByteBuffer> _encryptedOutput = new ArrayList<>();
@@ -74,9 +68,14 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final ServerProtocolEngine _protocolEngine;
private final int _receiveBufSize;
- private final Set<TransportEncryption> _encryptionSet;
- private final SSLContext _sslContext;
private final Runnable _onTransportEncryptionAction;
+
+
+ private int _maxReadIdle;
+ private int _maxWriteIdle;
+ private Principal _principal;
+ private boolean _principalChecked;
+
private ByteBuffer _netInputBuffer;
private SSLEngine _sslEngine;
@@ -90,9 +89,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
public NonBlockingConnection(SocketChannel socketChannel,
ServerProtocolEngine delegate,
- int sendBufferSize,
int receiveBufferSize,
- long timeout,
Ticker ticker,
final Set<TransportEncryption> encryptionSet,
final SSLContext sslContext,
@@ -104,14 +101,11 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
final SelectorThread selectorThread)
{
_socketChannel = socketChannel;
- _timeout = timeout;
_ticker = ticker;
_selector = selectorThread;
_protocolEngine = delegate;
_receiveBufSize = receiveBufferSize;
- _encryptionSet = encryptionSet;
- _sslContext = sslContext;
_onTransportEncryptionAction = onTransportEncryptionAction;
delegate.setWorkListener(new Action<ServerProtocolEngine>()
@@ -125,7 +119,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
if(encryptionSet.size() == 1)
{
- _transportEncryption = _encryptionSet.iterator().next();
+ _transportEncryption = encryptionSet.iterator().next();
if (_transportEncryption == TransportEncryption.TLS)
{
onTransportEncryptionAction.run();
@@ -134,7 +128,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
if(encryptionSet.contains(TransportEncryption.TLS))
{
- _sslEngine = _sslContext.createSSLEngine();
+ _sslEngine = sslContext.createSSLEngine();
_sslEngine.setUseClientMode(false);
SSLUtil.removeSSLv3Support(_sslEngine);
SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites);
@@ -150,26 +144,16 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
_netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2));
}
- try
- {
- _remoteSocketAddress = _socketChannel.getRemoteAddress().toString();
- _socketChannel.configureBlocking(false);
- }
- catch (IOException e)
- {
- throw new SenderException("Unable to prepare the channel for non-blocking IO", e);
- }
-
-
+ _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
}
- public Ticker getTicker()
+ Ticker getTicker()
{
return _ticker;
}
- public SocketChannel getSocketChannel()
+ SocketChannel getSocketChannel()
{
return _socketChannel;
}
@@ -189,7 +173,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
if(_closed.compareAndSet(false,true))
{
_protocolEngine.notifyWork();
- getSelector().wakeup();
+ _selector.wakeup();
}
}
@@ -216,7 +200,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
@Override
public Principal getPeerPrincipal()
{
- synchronized (_lock)
+ synchronized (_peerPrincipalLock)
{
if(!_principalChecked)
{
@@ -301,7 +285,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
// tell all consumer targets that it is okay to accept more
_protocolEngine.setMessageAssignmentSuspended(false);
}
- catch (IOException e)
+ catch (IOException | ConnectionScopedRuntimeException e)
{
LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e);
LOGGER.debug("Closing " + _remoteSocketAddress);
@@ -359,22 +343,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
}
- public SelectorThread getSelector()
- {
- return _selector;
- }
-
- public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
- {
- return headerBytes[0] == -128 &&
- headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[4] == 0 || // SSL 3.0
- headerBytes[4] == 1 || // TLS 1.0
- headerBytes[4] == 2 || // TLS 1.1
- headerBytes[4] == 3);
- }
-
- public boolean doRead() throws IOException
+ private boolean doRead() throws IOException
{
boolean readData = false;
if(_transportEncryption == TransportEncryption.NONE)
@@ -496,7 +465,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
return readData;
}
- public boolean doWrite() throws IOException
+ private boolean doWrite() throws IOException
{
ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()];
@@ -589,18 +558,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
}
}
- public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
- {
- return headerBytes[0] == 22 && // SSL Handshake
- (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
- (headerBytes[2] == 0 || // SSL 3.0
- headerBytes[2] == 1 || // TLS 1.0
- headerBytes[2] == 2 || // TLS 1.1
- headerBytes[2] == 3)) && // TLS1.2
- (headerBytes[5] == 1); // client_hello
- }
-
- public boolean runSSLEngineTasks(final SSLEngineResult status)
+ private boolean runSSLEngineTasks(final SSLEngineResult status)
{
if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK)
{
@@ -614,15 +572,11 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
return false;
}
- public boolean looksLikeSSL(final byte[] headerBytes)
- {
- return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
- }
-
@Override
public void send(final ByteBuffer msg)
{
- assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName();
+ assert _selector.isIOThread() : "Send called by unexpected thread " + Thread.currentThread().getName();
+
if (_closed.get())
{
@@ -631,7 +585,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
else
{
_buffers.add(msg);
- _protocolEngine.notifyWork();
+ _protocolEngine.notifyWork(); // TODO now redundant
}
}
@@ -639,4 +593,36 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende
public void flush()
{
}
+
+ @Override
+ public String toString()
+ {
+ return "[NonBlockingConnection " + _remoteSocketAddress + "]";
+ }
+
+ private boolean looksLikeSSL(final byte[] headerBytes)
+ {
+ return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes);
+ }
+
+ private boolean looksLikeSSLv3ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == 22 && // SSL Handshake
+ (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[2] == 0 || // SSL 3.0
+ headerBytes[2] == 1 || // TLS 1.0
+ headerBytes[2] == 2 || // TLS 1.1
+ headerBytes[2] == 3)) && // TLS1.2
+ (headerBytes[5] == 1); // client_hello
+ }
+
+ private boolean looksLikeSSLv2ClientHello(final byte[] headerBytes)
+ {
+ return headerBytes[0] == -128 &&
+ headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[4] == 0 || // SSL 3.0
+ headerBytes[4] == 1 || // TLS 1.0
+ headerBytes[4] == 2 || // TLS 1.1
+ headerBytes[4] == 3);
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
index 79313712a5..3bc7978931 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
@@ -25,14 +25,17 @@ import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.EnumSet;
import java.util.Set;
import javax.net.ssl.SSLContext;
+import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.ProtocolEngineFactory;
+import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.protocol.ServerProtocolEngine;
import org.apache.qpid.transport.NetworkTransportConfiguration;
import org.apache.qpid.transport.TransportException;
@@ -43,51 +46,24 @@ import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
public class NonBlockingNetworkTransport
{
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class);
private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
- private SelectorThread _selector;
-
-
- private Set<TransportEncryption> _encryptionSet;
- private volatile boolean _closed = false;
- private NetworkTransportConfiguration _config;
- private ProtocolEngineFactory _factory;
- private SSLContext _sslContext;
- private ServerSocketChannel _serverSocket;
- private int _timeout;
-
- public void close()
- {
- if(_selector != null)
- {
- try
- {
- if (_serverSocket != null)
- {
- _selector.cancelAcceptingSocket(_serverSocket);
- _serverSocket.close();
- }
- }
- catch (IOException e)
- {
- // TODO
- e.printStackTrace();
- }
- finally
- {
+ private final Set<TransportEncryption> _encryptionSet;
+ private final NetworkTransportConfiguration _config;
+ private final ProtocolEngineFactory _factory;
+ private final SSLContext _sslContext;
+ private final ServerSocketChannel _serverSocket;
+ private final int _timeout;
- _selector.close();
- }
- }
- }
+ private SelectorThread _selector;
- public void accept(NetworkTransportConfiguration config,
- ProtocolEngineFactory factory,
- SSLContext sslContext,
- final Set<TransportEncryption> encryptionSet)
+ public NonBlockingNetworkTransport(final NetworkTransportConfiguration config,
+ final MultiVersionProtocolEngineFactory factory,
+ final SSLContext sslContext,
+ final EnumSet<TransportEncryption> encryptionSet)
{
try
{
@@ -106,80 +82,138 @@ public class NonBlockingNetworkTransport
_serverSocket.configureBlocking(false);
_encryptionSet = encryptionSet;
- _selector = new SelectorThread(config.getAddress().toString(), this);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException("Failed to start AMQP on port : " + config, e);
+ }
+
+ }
+
+ public void start()
+ {
+ try
+ {
+ _selector = new SelectorThread(this);
_selector.start();
_selector.addAcceptingSocket(_serverSocket);
}
catch (IOException e)
{
- throw new TransportException("Failed to start AMQP on port : " + config, e);
+ throw new TransportException("Failed to start", e);
}
+ }
+ public void close()
+ {
+ if(_selector != null)
+ {
+ _selector.cancelAcceptingSocket(_serverSocket);
+ try
+ {
+ _serverSocket.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Error closing the server socket for : " + _config.getAddress().toString(), e);
+ }
+ finally
+ {
+ _selector.close();
+ _selector = null;
+ }
+ }
}
public int getAcceptingPort()
{
- return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort();
+ return _serverSocket.socket().getLocalPort();
}
- public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException
+ public NetworkTransportConfiguration getConfig()
{
- final ServerProtocolEngine engine =
- (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
- .getRemoteSocketAddress());
+ return _config;
+ }
- if(engine != null)
+ void acceptSocketChannel(final ServerSocketChannel serverSocketChannel)
+ {
+ SocketChannel socketChannel = null;
+ boolean success = false;
+ try
{
- socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
- socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
-
- final Integer sendBufferSize = _config.getSendBufferSize();
- final Integer receiveBufferSize = _config.getReceiveBufferSize();
-
- socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
- socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
-
-
- final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT);
-
- NonBlockingConnection connection =
- new NonBlockingConnection(socketChannel,
- engine,
- sendBufferSize,
- receiveBufferSize,
- _timeout,
- ticker,
- _encryptionSet,
- _sslContext,
- _config.wantClientAuth(),
- _config.needClientAuth(),
- _config.getEnabledCipherSuites(),
- _config.getDisabledCipherSuites(),
- new Runnable()
- {
-
- @Override
- public void run()
+ socketChannel = serverSocketChannel.accept();
+
+ final ServerProtocolEngine engine =
+ (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket()
+ .getRemoteSocketAddress());
+
+ if(engine != null)
+ {
+ socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay());
+ socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT);
+
+ final int sendBufferSize = _config.getSendBufferSize();
+ final int receiveBufferSize = _config.getReceiveBufferSize();
+
+ socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize);
+ socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
+
+ socketChannel.configureBlocking(false);
+
+ final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, _timeout);
+
+ NonBlockingConnection connection =
+ new NonBlockingConnection(socketChannel,
+ engine,
+ receiveBufferSize,
+ ticker,
+ _encryptionSet,
+ _sslContext,
+ _config.wantClientAuth(),
+ _config.needClientAuth(),
+ _config.getEnabledCipherSuites(),
+ _config.getDisabledCipherSuites(),
+ new Runnable()
{
- engine.encryptedTransport();
- }
- },
- _selector);
- engine.setNetworkConnection(connection, connection.getSender());
- connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+ @Override
+ public void run()
+ {
+ engine.encryptedTransport();
+ }
+ },
+ _selector);
+
+ engine.setNetworkConnection(connection, connection.getSender());
+ connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
- ticker.setConnection(connection);
+ ticker.setConnection(connection);
- connection.start();
+ connection.start();
- _selector.addConnection(connection);
+ _selector.addConnection(connection);
+ success = true;
+ }
}
- else
+ catch (IOException e)
{
- socketChannel.close();
+ LOGGER.error("Failed to process incoming socket", e);
+ }
+ finally
+ {
+ if (!success && socketChannel != null)
+ {
+ try
+ {
+ socketChannel.close();
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Failed to close socket " + socketChannel, e);
+ }
+ }
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
index 774888e934..ff75448787 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -32,43 +32,40 @@ import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.thread.LoggingUncaughtExceptionHandler;
-
public class SelectorThread extends Thread
{
- private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class);
- public static final String IO_THREAD_NAME_PREFIX = "NCS-";
+ static final String IO_THREAD_NAME_PREFIX = "IO-";
private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>();
+
+ /**
+ * Queue of connections that are not currently scheduled and not registered with the selector.
+ * These need to go back into the Selector.
+ */
private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>();
+
+ /** Set of connections that are currently being selected upon */
private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>();
+
private final Selector _selector;
private final AtomicBoolean _closed = new AtomicBoolean();
- private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler();
+ private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(this);
private final NonBlockingNetworkTransport _transport;
+ private long _nextTimeout;
- SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport)
+ SelectorThread(final NonBlockingNetworkTransport nonBlockingNetworkTransport) throws IOException
{
- super("SelectorThread-"+name);
+ super("SelectorThread-" + nonBlockingNetworkTransport.getConfig().getAddress().toString());
+
_transport = nonBlockingNetworkTransport;
- try
- {
- _selector = Selector.open();
- }
- catch (IOException e)
- {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
+ _selector = Selector.open();
}
public void addAcceptingSocket(final ServerSocketChannel socketChannel)
@@ -83,10 +80,10 @@ public class SelectorThread extends Thread
{
socketChannel.register(_selector, SelectionKey.OP_ACCEPT);
}
- catch (ClosedChannelException e)
+ catch (IllegalStateException | ClosedChannelException e)
{
- // TODO
- e.printStackTrace();
+ // TODO Communicate condition back to model object to make it go into the ERROR state
+ LOGGER.error("Failed to register selector on accepting port", e);
}
}
});
@@ -114,91 +111,38 @@ public class SelectorThread extends Thread
public void run()
{
- long nextTimeout = 0;
+ _nextTimeout = 0;
try
{
while (!_closed.get())
{
- _selector.select(nextTimeout);
-
- while(_tasks.peek() != null)
+ try
{
- Runnable task = _tasks.poll();
- task.run();
+ _selector.select(_nextTimeout);
}
-
- List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
-
-
- Set<SelectionKey> selectionKeys = _selector.selectedKeys();
- for (SelectionKey key : selectionKeys)
+ catch (IOException e)
{
- if(key.isAcceptable())
- {
- // todo - should we schedule this rather than running in this thread?
- SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept();
- _transport.acceptSocketChannel(acceptedChannel);
- }
- else
- {
- NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
-
- key.channel().register(_selector, 0);
-
- toBeScheduled.add(connection);
- _unscheduledConnections.remove(connection);
- }
-
+ // TODO Inform the model object
+ LOGGER.error("Failed to select for " + _transport.getConfig().getAddress().toString(),e );
+ break;
}
- selectionKeys.clear();
-
- while (_unregisteredConnections.peek() != null)
- {
- NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
- _unscheduledConnections.add(unregisteredConnection);
+ runTasks();
- final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
- | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
- unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+ List<NonBlockingConnection> toBeScheduled = processSelectionKeys();
- }
-
- long currentTime = System.currentTimeMillis();
- Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
- nextTimeout = Integer.MAX_VALUE;
- while (iterator.hasNext())
- {
- NonBlockingConnection connection = iterator.next();
-
- int period = connection.getTicker().getTimeToNextTick(currentTime);
+ toBeScheduled.addAll(reregisterUnregisteredConnections());
- if (period <= 0 || connection.isStateChanged())
- {
- toBeScheduled.add(connection);
- connection.getSocketChannel().register(_selector, 0).cancel();
- iterator.remove();
- }
- else
- {
- nextTimeout = Math.min(period, nextTimeout);
- }
- }
+ toBeScheduled.addAll(processUnscheduledConnections());
for (NonBlockingConnection connection : toBeScheduled)
{
_scheduler.schedule(connection);
}
-
}
}
- catch (IOException e)
- {
- //TODO
- e.printStackTrace();
- }
finally
{
try
@@ -207,114 +151,144 @@ public class SelectorThread extends Thread
}
catch (IOException e)
{
- e.printStackTrace();
+ LOGGER.debug("Failed to close selector", e);
}
}
-
-
-
}
- public void addConnection(final NonBlockingConnection connection)
+ private List<NonBlockingConnection> processUnscheduledConnections()
{
- _unregisteredConnections.add(connection);
- _selector.wakeup();
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
- }
+ long currentTime = System.currentTimeMillis();
+ Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator();
+ _nextTimeout = Integer.MAX_VALUE;
+ while (iterator.hasNext())
+ {
+ NonBlockingConnection connection = iterator.next();
- public void wakeup()
- {
- _selector.wakeup();
- }
+ int period = connection.getTicker().getTimeToNextTick(currentTime);
- public void close()
- {
- _closed.set(true);
- _selector.wakeup();
- _scheduler.close();
+ if (period <= 0 || connection.isStateChanged())
+ {
+ toBeScheduled.add(connection);
+ try
+ {
+ LOGGER.debug("KWDEBUG# Setting interest to zero (PUC) " + connection);
+
+ SelectionKey register = connection.getSocketChannel().register(_selector, 0);
+ register.cancel();
+ }
+ catch (ClosedChannelException e)
+ {
+ LOGGER.debug("Failed to register with selector for connection " + connection +
+ ". Connection is probably being closed by peer.", e);
+ }
+ iterator.remove();
+ }
+ else
+ {
+ _nextTimeout = Math.min(period, _nextTimeout);
+ }
+ }
+
+ return toBeScheduled;
}
- private class NetworkConnectionScheduler
+ private List<NonBlockingConnection> reregisterUnregisteredConnections()
{
- private final ScheduledThreadPoolExecutor _executor;
- private final AtomicInteger _running = new AtomicInteger();
- private final int _poolSize;
+ List<NonBlockingConnection> unregisterableConnections = new ArrayList<>();
- private NetworkConnectionScheduler()
+ while (_unregisteredConnections.peek() != null)
{
- _poolSize = Runtime.getRuntime().availableProcessors();
- _executor = new ScheduledThreadPoolExecutor(_poolSize);
- _executor.prestartAllCoreThreads();
- }
+ NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll();
+ _unscheduledConnections.add(unregisteredConnection);
- public void processConnection(final NonBlockingConnection connection)
- {
+
+ final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0)
+ | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
try
{
- _running.incrementAndGet();
- boolean rerun;
- do
- {
- rerun = false;
- boolean closed = connection.doWork();
+ LOGGER.debug("KWDEBUG# Registering " + unregisteredConnection);
+ unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection);
+ }
+ catch (ClosedChannelException e)
+ {
+ unregisterableConnections.add(unregisteredConnection);
+ }
+ }
- if (!closed)
- {
+ return unregisterableConnections;
+ }
- if (connection.isStateChanged())
- {
- if (_running.get() == _poolSize)
- {
- schedule(connection);
- }
- else
- {
- rerun = true;
- }
- }
- else
- {
- SelectorThread.this.addConnection(connection);
- }
- }
+ private List<NonBlockingConnection> processSelectionKeys()
+ {
+ List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
- } while (rerun);
+ Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+ for (SelectionKey key : selectionKeys)
+ {
+ if(key.isAcceptable())
+ {
+ // todo - should we schedule this rather than running in this thread?
+ _transport.acceptSocketChannel((ServerSocketChannel)key.channel());
}
- finally
+ else
{
- _running.decrementAndGet();
+ NonBlockingConnection connection = (NonBlockingConnection) key.attachment();
+
+ try
+ {
+ LOGGER.debug("KWDEBUG# Setting interest to zero (PSK)" + connection);
+
+ key.channel().register(_selector, 0);
+ }
+ catch (ClosedChannelException e)
+ {
+ // Ignore - we will schedule the connection anyway
+ }
+
+ toBeScheduled.add(connection);
+ _unscheduledConnections.remove(connection);
}
- }
- public void schedule(final NonBlockingConnection connection)
- {
- _executor.submit(new Runnable()
- {
- @Override
- public void run()
- {
- String currentName = Thread.currentThread().getName();
- try
- {
- Thread.currentThread().setName(
- IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString());
- processConnection(connection);
- }
- finally
- {
- Thread.currentThread().setName(currentName);
- }
- }
- });
}
+ selectionKeys.clear();
+
+ return toBeScheduled;
+ }
- public void close()
+ private void runTasks()
+ {
+ while(_tasks.peek() != null)
{
- _executor.shutdown();
+ Runnable task = _tasks.poll();
+ task.run();
}
+ }
+
+ public void addConnection(final NonBlockingConnection connection)
+ {
+ _unregisteredConnections.add(connection);
+ _selector.wakeup();
+
+ }
+ public void wakeup()
+ {
+ _selector.wakeup();
+ }
+ public void close()
+ {
+ _closed.set(true);
+ _selector.wakeup();
+ _scheduler.close();
+ }
+ boolean isIOThread()
+ {
+ return Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
index 7874437a2f..ad236733d3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
@@ -79,7 +79,8 @@ class TCPandSSLTransport implements AcceptingTransport
}
final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration();
- _networkTransport = new NonBlockingNetworkTransport();
+
+
final MultiVersionProtocolEngineFactory protocolEngineFactory =
new MultiVersionProtocolEngineFactory(
_port.getParent(Broker.class),
@@ -97,7 +98,9 @@ class TCPandSSLTransport implements AcceptingTransport
{
encryptionSet.add(TransportEncryption.TLS);
}
- _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet);
+
+ _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory, _sslContext, encryptionSet);
+ _networkTransport.start();
}
public int getAcceptingPort()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 34afcdef66..6762c00a37 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -592,12 +592,6 @@ public class MockConsumer implements ConsumerTarget
}
@Override
- public boolean isSessionNameUnique(byte[] name)
- {
- return false;
- }
-
- @Override
public String getRemoteAddressString()
{
return "remoteAddress:1234";