diff options
| author | Keith Wall <kwall@apache.org> | 2015-03-16 16:52:32 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-03-16 16:52:32 +0000 |
| commit | 67a2cb9fe4149dc9d6cd750b3426995033ea9d9d (patch) | |
| tree | 20633ed3c757a764123d5c98811462afe72cb9b2 /qpid/java | |
| parent | e756d0579c8e0f4373e56a4d608acf9eb5632f57 (diff) | |
| download | qpid-python-67a2cb9fe4149dc9d6cd750b3426995033ea9d9d.tar.gz | |
QPID-6429, QPID-6262: [Java Broker] Improve error handling in new NIO code; Remove MINA terminlogy (session etc) in 0-8 stack
* Also added uncaught exception handler in test framework (QBTC) to guard log the case where a thread exits abnormally.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667068 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
19 files changed, 514 insertions, 445 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index 95b9bf8970..c587394821 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -69,8 +69,6 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends */ public LogSubject getLogSubject(); - public boolean isSessionNameUnique(byte[] name); - String getRemoteAddressString(); SocketAddress getRemoteAddress(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java index 0cf34af2ac..dfb843b708 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/KerberosAuthenticationManager.java @@ -92,7 +92,6 @@ public class KerberosAuthenticationManager extends AbstractAuthenticationManager } catch (SaslException e) { - e.printStackTrace(System.err); return new AuthenticationResult(AuthenticationResult.AuthenticationStatus.ERROR, e); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java new file mode 100644 index 0000000000..36fd63c360 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java @@ -0,0 +1,114 @@ +/* +* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.transport; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class NetworkConnectionScheduler +{ + private static final Logger LOGGER = LoggerFactory.getLogger(NetworkConnectionScheduler.class); + + private final SelectorThread _selectorThread; + private final ScheduledThreadPoolExecutor _executor; + private final AtomicInteger _running = new AtomicInteger(); + private final int _poolSize; + + NetworkConnectionScheduler(final SelectorThread selectorThread) + { + _selectorThread = selectorThread; + _poolSize = Runtime.getRuntime().availableProcessors(); + _executor = new ScheduledThreadPoolExecutor(_poolSize); + _executor.prestartAllCoreThreads(); + } + + public void schedule(final NonBlockingConnection connection) + { + _executor.submit(new Runnable() + { + @Override + public void run() + { + String currentName = Thread.currentThread().getName(); + try + { + Thread.currentThread().setName( + SelectorThread.IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString()); + processConnection(connection); + } + finally + { + Thread.currentThread().setName(currentName); + } + } + }); + } + + private void processConnection(final NonBlockingConnection connection) + { + try + { + _running.incrementAndGet(); + boolean rerun; + do + { + rerun = false; + boolean closed = connection.doWork(); + + if (!closed) + { + + if (connection.isStateChanged()) + { + if (_running.get() == _poolSize) + { + schedule(connection); + } + else + { + rerun = true; + } + } + else + { + _selectorThread.addConnection(connection); + } + } + + } while (rerun); + } + finally + { + _running.decrementAndGet(); + } + } + + public void close() + { + _executor.shutdown(); + } + + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java index ae5816a0d1..4e27fa8476 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java @@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory; import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.ByteBufferSender; -import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.network.NetworkConnection; import org.apache.qpid.transport.network.Ticker; import org.apache.qpid.transport.network.TransportEncryption; @@ -55,18 +55,12 @@ import org.apache.qpid.util.SystemUtils; public class NonBlockingConnection implements NetworkConnection, ByteBufferSender { private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class); + private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; + private final SocketChannel _socketChannel; - private final long _timeout; private final Ticker _ticker; + private final Object _peerPrincipalLock = new Object(); private final SelectorThread _selector; - private int _maxReadIdle; - private int _maxWriteIdle; - private Principal _principal; - private boolean _principalChecked; - private final Object _lock = new Object(); - - public static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6; - private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new ConcurrentLinkedQueue<>(); private final List<ByteBuffer> _encryptedOutput = new ArrayList<>(); @@ -74,9 +68,14 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende private final AtomicBoolean _closed = new AtomicBoolean(false); private final ServerProtocolEngine _protocolEngine; private final int _receiveBufSize; - private final Set<TransportEncryption> _encryptionSet; - private final SSLContext _sslContext; private final Runnable _onTransportEncryptionAction; + + + private int _maxReadIdle; + private int _maxWriteIdle; + private Principal _principal; + private boolean _principalChecked; + private ByteBuffer _netInputBuffer; private SSLEngine _sslEngine; @@ -90,9 +89,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende public NonBlockingConnection(SocketChannel socketChannel, ServerProtocolEngine delegate, - int sendBufferSize, int receiveBufferSize, - long timeout, Ticker ticker, final Set<TransportEncryption> encryptionSet, final SSLContext sslContext, @@ -104,14 +101,11 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende final SelectorThread selectorThread) { _socketChannel = socketChannel; - _timeout = timeout; _ticker = ticker; _selector = selectorThread; _protocolEngine = delegate; _receiveBufSize = receiveBufferSize; - _encryptionSet = encryptionSet; - _sslContext = sslContext; _onTransportEncryptionAction = onTransportEncryptionAction; delegate.setWorkListener(new Action<ServerProtocolEngine>() @@ -125,7 +119,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende if(encryptionSet.size() == 1) { - _transportEncryption = _encryptionSet.iterator().next(); + _transportEncryption = encryptionSet.iterator().next(); if (_transportEncryption == TransportEncryption.TLS) { onTransportEncryptionAction.run(); @@ -134,7 +128,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende if(encryptionSet.contains(TransportEncryption.TLS)) { - _sslEngine = _sslContext.createSSLEngine(); + _sslEngine = sslContext.createSSLEngine(); _sslEngine.setUseClientMode(false); SSLUtil.removeSSLv3Support(_sslEngine); SSLUtil.updateEnabledCipherSuites(_sslEngine, enabledCipherSuites, disabledCipherSuites); @@ -150,26 +144,16 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende _netInputBuffer = ByteBuffer.allocate(Math.max(_sslEngine.getSession().getPacketBufferSize(), _receiveBufSize * 2)); } - try - { - _remoteSocketAddress = _socketChannel.getRemoteAddress().toString(); - _socketChannel.configureBlocking(false); - } - catch (IOException e) - { - throw new SenderException("Unable to prepare the channel for non-blocking IO", e); - } - - + _remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString(); } - public Ticker getTicker() + Ticker getTicker() { return _ticker; } - public SocketChannel getSocketChannel() + SocketChannel getSocketChannel() { return _socketChannel; } @@ -189,7 +173,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende if(_closed.compareAndSet(false,true)) { _protocolEngine.notifyWork(); - getSelector().wakeup(); + _selector.wakeup(); } } @@ -216,7 +200,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende @Override public Principal getPeerPrincipal() { - synchronized (_lock) + synchronized (_peerPrincipalLock) { if(!_principalChecked) { @@ -301,7 +285,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende // tell all consumer targets that it is okay to accept more _protocolEngine.setMessageAssignmentSuspended(false); } - catch (IOException e) + catch (IOException | ConnectionScopedRuntimeException e) { LOGGER.info("Exception performing I/O for thread '" + _remoteSocketAddress + "': " + e); LOGGER.debug("Closing " + _remoteSocketAddress); @@ -359,22 +343,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende } - public SelectorThread getSelector() - { - return _selector; - } - - public boolean looksLikeSSLv2ClientHello(final byte[] headerBytes) - { - return headerBytes[0] == -128 && - headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[4] == 0 || // SSL 3.0 - headerBytes[4] == 1 || // TLS 1.0 - headerBytes[4] == 2 || // TLS 1.1 - headerBytes[4] == 3); - } - - public boolean doRead() throws IOException + private boolean doRead() throws IOException { boolean readData = false; if(_transportEncryption == TransportEncryption.NONE) @@ -496,7 +465,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende return readData; } - public boolean doWrite() throws IOException + private boolean doWrite() throws IOException { ByteBuffer[] bufArray = new ByteBuffer[_buffers.size()]; @@ -589,18 +558,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende } } - public boolean looksLikeSSLv3ClientHello(final byte[] headerBytes) - { - return headerBytes[0] == 22 && // SSL Handshake - (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x - (headerBytes[2] == 0 || // SSL 3.0 - headerBytes[2] == 1 || // TLS 1.0 - headerBytes[2] == 2 || // TLS 1.1 - headerBytes[2] == 3)) && // TLS1.2 - (headerBytes[5] == 1); // client_hello - } - - public boolean runSSLEngineTasks(final SSLEngineResult status) + private boolean runSSLEngineTasks(final SSLEngineResult status) { if(status.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_TASK) { @@ -614,15 +572,11 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende return false; } - public boolean looksLikeSSL(final byte[] headerBytes) - { - return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); - } - @Override public void send(final ByteBuffer msg) { - assert Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX) : "Send called by unexpected thread " + Thread.currentThread().getName(); + assert _selector.isIOThread() : "Send called by unexpected thread " + Thread.currentThread().getName(); + if (_closed.get()) { @@ -631,7 +585,7 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende else { _buffers.add(msg); - _protocolEngine.notifyWork(); + _protocolEngine.notifyWork(); // TODO now redundant } } @@ -639,4 +593,36 @@ public class NonBlockingConnection implements NetworkConnection, ByteBufferSende public void flush() { } + + @Override + public String toString() + { + return "[NonBlockingConnection " + _remoteSocketAddress + "]"; + } + + private boolean looksLikeSSL(final byte[] headerBytes) + { + return looksLikeSSLv3ClientHello(headerBytes) || looksLikeSSLv2ClientHello(headerBytes); + } + + private boolean looksLikeSSLv3ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == 22 && // SSL Handshake + (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[2] == 0 || // SSL 3.0 + headerBytes[2] == 1 || // TLS 1.0 + headerBytes[2] == 2 || // TLS 1.1 + headerBytes[2] == 3)) && // TLS1.2 + (headerBytes[5] == 1); // client_hello + } + + private boolean looksLikeSSLv2ClientHello(final byte[] headerBytes) + { + return headerBytes[0] == -128 && + headerBytes[3] == 3 && // SSL 3.0 / TLS 1.x + (headerBytes[4] == 0 || // SSL 3.0 + headerBytes[4] == 1 || // TLS 1.0 + headerBytes[4] == 2 || // TLS 1.1 + headerBytes[4] == 3); + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java index 79313712a5..3bc7978931 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java @@ -25,14 +25,17 @@ import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.EnumSet; import java.util.Set; import javax.net.ssl.SSLContext; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.configuration.CommonProperties; import org.apache.qpid.protocol.ProtocolEngineFactory; +import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.protocol.ServerProtocolEngine; import org.apache.qpid.transport.NetworkTransportConfiguration; import org.apache.qpid.transport.TransportException; @@ -43,51 +46,24 @@ import org.apache.qpid.transport.network.io.IdleTimeoutTicker; public class NonBlockingNetworkTransport { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNetworkTransport.class); private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); - private SelectorThread _selector; - - - private Set<TransportEncryption> _encryptionSet; - private volatile boolean _closed = false; - private NetworkTransportConfiguration _config; - private ProtocolEngineFactory _factory; - private SSLContext _sslContext; - private ServerSocketChannel _serverSocket; - private int _timeout; - - public void close() - { - if(_selector != null) - { - try - { - if (_serverSocket != null) - { - _selector.cancelAcceptingSocket(_serverSocket); - _serverSocket.close(); - } - } - catch (IOException e) - { - // TODO - e.printStackTrace(); - } - finally - { + private final Set<TransportEncryption> _encryptionSet; + private final NetworkTransportConfiguration _config; + private final ProtocolEngineFactory _factory; + private final SSLContext _sslContext; + private final ServerSocketChannel _serverSocket; + private final int _timeout; - _selector.close(); - } - } - } + private SelectorThread _selector; - public void accept(NetworkTransportConfiguration config, - ProtocolEngineFactory factory, - SSLContext sslContext, - final Set<TransportEncryption> encryptionSet) + public NonBlockingNetworkTransport(final NetworkTransportConfiguration config, + final MultiVersionProtocolEngineFactory factory, + final SSLContext sslContext, + final EnumSet<TransportEncryption> encryptionSet) { try { @@ -106,80 +82,138 @@ public class NonBlockingNetworkTransport _serverSocket.configureBlocking(false); _encryptionSet = encryptionSet; - _selector = new SelectorThread(config.getAddress().toString(), this); + } + catch (IOException e) + { + throw new TransportException("Failed to start AMQP on port : " + config, e); + } + + } + + public void start() + { + try + { + _selector = new SelectorThread(this); _selector.start(); _selector.addAcceptingSocket(_serverSocket); } catch (IOException e) { - throw new TransportException("Failed to start AMQP on port : " + config, e); + throw new TransportException("Failed to start", e); } + } + public void close() + { + if(_selector != null) + { + _selector.cancelAcceptingSocket(_serverSocket); + try + { + _serverSocket.close(); + } + catch (IOException e) + { + LOGGER.warn("Error closing the server socket for : " + _config.getAddress().toString(), e); + } + finally + { + _selector.close(); + _selector = null; + } + } } public int getAcceptingPort() { - return _serverSocket == null ? -1 : _serverSocket.socket().getLocalPort(); + return _serverSocket.socket().getLocalPort(); } - public void acceptSocketChannel(final SocketChannel socketChannel) throws IOException + public NetworkTransportConfiguration getConfig() { - final ServerProtocolEngine engine = - (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() - .getRemoteSocketAddress()); + return _config; + } - if(engine != null) + void acceptSocketChannel(final ServerSocketChannel serverSocketChannel) + { + SocketChannel socketChannel = null; + boolean success = false; + try { - socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); - socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); - - final Integer sendBufferSize = _config.getSendBufferSize(); - final Integer receiveBufferSize = _config.getReceiveBufferSize(); - - socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); - socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); - - - final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, TIMEOUT); - - NonBlockingConnection connection = - new NonBlockingConnection(socketChannel, - engine, - sendBufferSize, - receiveBufferSize, - _timeout, - ticker, - _encryptionSet, - _sslContext, - _config.wantClientAuth(), - _config.needClientAuth(), - _config.getEnabledCipherSuites(), - _config.getDisabledCipherSuites(), - new Runnable() - { - - @Override - public void run() + socketChannel = serverSocketChannel.accept(); + + final ServerProtocolEngine engine = + (ServerProtocolEngine) _factory.newProtocolEngine(socketChannel.socket() + .getRemoteSocketAddress()); + + if(engine != null) + { + socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, _config.getTcpNoDelay()); + socketChannel.socket().setSoTimeout(1000 * HANDSHAKE_TIMEOUT); + + final int sendBufferSize = _config.getSendBufferSize(); + final int receiveBufferSize = _config.getReceiveBufferSize(); + + socketChannel.setOption(StandardSocketOptions.SO_SNDBUF, sendBufferSize); + socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, receiveBufferSize); + + socketChannel.configureBlocking(false); + + final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, _timeout); + + NonBlockingConnection connection = + new NonBlockingConnection(socketChannel, + engine, + receiveBufferSize, + ticker, + _encryptionSet, + _sslContext, + _config.wantClientAuth(), + _config.needClientAuth(), + _config.getEnabledCipherSuites(), + _config.getDisabledCipherSuites(), + new Runnable() { - engine.encryptedTransport(); - } - }, - _selector); - engine.setNetworkConnection(connection, connection.getSender()); - connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); + @Override + public void run() + { + engine.encryptedTransport(); + } + }, + _selector); + + engine.setNetworkConnection(connection, connection.getSender()); + connection.setMaxReadIdle(HANDSHAKE_TIMEOUT); - ticker.setConnection(connection); + ticker.setConnection(connection); - connection.start(); + connection.start(); - _selector.addConnection(connection); + _selector.addConnection(connection); + success = true; + } } - else + catch (IOException e) { - socketChannel.close(); + LOGGER.error("Failed to process incoming socket", e); + } + finally + { + if (!success && socketChannel != null) + { + try + { + socketChannel.close(); + } + catch (IOException e) + { + LOGGER.debug("Failed to close socket " + socketChannel, e); + } + } } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java index 774888e934..ff75448787 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -32,43 +32,40 @@ import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.thread.LoggingUncaughtExceptionHandler; - public class SelectorThread extends Thread { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SelectorThread.class); - public static final String IO_THREAD_NAME_PREFIX = "NCS-"; + static final String IO_THREAD_NAME_PREFIX = "IO-"; private final Queue<Runnable> _tasks = new ConcurrentLinkedQueue<>(); + + /** + * Queue of connections that are not currently scheduled and not registered with the selector. + * These need to go back into the Selector. + */ private final Queue<NonBlockingConnection> _unregisteredConnections = new ConcurrentLinkedQueue<>(); + + /** Set of connections that are currently being selected upon */ private final Set<NonBlockingConnection> _unscheduledConnections = new HashSet<>(); + private final Selector _selector; private final AtomicBoolean _closed = new AtomicBoolean(); - private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(); + private final NetworkConnectionScheduler _scheduler = new NetworkConnectionScheduler(this); private final NonBlockingNetworkTransport _transport; + private long _nextTimeout; - SelectorThread(final String name, final NonBlockingNetworkTransport nonBlockingNetworkTransport) + SelectorThread(final NonBlockingNetworkTransport nonBlockingNetworkTransport) throws IOException { - super("SelectorThread-"+name); + super("SelectorThread-" + nonBlockingNetworkTransport.getConfig().getAddress().toString()); + _transport = nonBlockingNetworkTransport; - try - { - _selector = Selector.open(); - } - catch (IOException e) - { - e.printStackTrace(); - throw new RuntimeException(e); - } + _selector = Selector.open(); } public void addAcceptingSocket(final ServerSocketChannel socketChannel) @@ -83,10 +80,10 @@ public class SelectorThread extends Thread { socketChannel.register(_selector, SelectionKey.OP_ACCEPT); } - catch (ClosedChannelException e) + catch (IllegalStateException | ClosedChannelException e) { - // TODO - e.printStackTrace(); + // TODO Communicate condition back to model object to make it go into the ERROR state + LOGGER.error("Failed to register selector on accepting port", e); } } }); @@ -114,91 +111,38 @@ public class SelectorThread extends Thread public void run() { - long nextTimeout = 0; + _nextTimeout = 0; try { while (!_closed.get()) { - _selector.select(nextTimeout); - - while(_tasks.peek() != null) + try { - Runnable task = _tasks.poll(); - task.run(); + _selector.select(_nextTimeout); } - - List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - - - Set<SelectionKey> selectionKeys = _selector.selectedKeys(); - for (SelectionKey key : selectionKeys) + catch (IOException e) { - if(key.isAcceptable()) - { - // todo - should we schedule this rather than running in this thread? - SocketChannel acceptedChannel = ((ServerSocketChannel)key.channel()).accept(); - _transport.acceptSocketChannel(acceptedChannel); - } - else - { - NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); - - key.channel().register(_selector, 0); - - toBeScheduled.add(connection); - _unscheduledConnections.remove(connection); - } - + // TODO Inform the model object + LOGGER.error("Failed to select for " + _transport.getConfig().getAddress().toString(),e ); + break; } - selectionKeys.clear(); - - while (_unregisteredConnections.peek() != null) - { - NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); - _unscheduledConnections.add(unregisteredConnection); + runTasks(); - final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) - | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); - unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); + List<NonBlockingConnection> toBeScheduled = processSelectionKeys(); - } - - long currentTime = System.currentTimeMillis(); - Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); - nextTimeout = Integer.MAX_VALUE; - while (iterator.hasNext()) - { - NonBlockingConnection connection = iterator.next(); - - int period = connection.getTicker().getTimeToNextTick(currentTime); + toBeScheduled.addAll(reregisterUnregisteredConnections()); - if (period <= 0 || connection.isStateChanged()) - { - toBeScheduled.add(connection); - connection.getSocketChannel().register(_selector, 0).cancel(); - iterator.remove(); - } - else - { - nextTimeout = Math.min(period, nextTimeout); - } - } + toBeScheduled.addAll(processUnscheduledConnections()); for (NonBlockingConnection connection : toBeScheduled) { _scheduler.schedule(connection); } - } } - catch (IOException e) - { - //TODO - e.printStackTrace(); - } finally { try @@ -207,114 +151,144 @@ public class SelectorThread extends Thread } catch (IOException e) { - e.printStackTrace(); + LOGGER.debug("Failed to close selector", e); } } - - - } - public void addConnection(final NonBlockingConnection connection) + private List<NonBlockingConnection> processUnscheduledConnections() { - _unregisteredConnections.add(connection); - _selector.wakeup(); + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - } + long currentTime = System.currentTimeMillis(); + Iterator<NonBlockingConnection> iterator = _unscheduledConnections.iterator(); + _nextTimeout = Integer.MAX_VALUE; + while (iterator.hasNext()) + { + NonBlockingConnection connection = iterator.next(); - public void wakeup() - { - _selector.wakeup(); - } + int period = connection.getTicker().getTimeToNextTick(currentTime); - public void close() - { - _closed.set(true); - _selector.wakeup(); - _scheduler.close(); + if (period <= 0 || connection.isStateChanged()) + { + toBeScheduled.add(connection); + try + { + LOGGER.debug("KWDEBUG# Setting interest to zero (PUC) " + connection); + + SelectionKey register = connection.getSocketChannel().register(_selector, 0); + register.cancel(); + } + catch (ClosedChannelException e) + { + LOGGER.debug("Failed to register with selector for connection " + connection + + ". Connection is probably being closed by peer.", e); + } + iterator.remove(); + } + else + { + _nextTimeout = Math.min(period, _nextTimeout); + } + } + + return toBeScheduled; } - private class NetworkConnectionScheduler + private List<NonBlockingConnection> reregisterUnregisteredConnections() { - private final ScheduledThreadPoolExecutor _executor; - private final AtomicInteger _running = new AtomicInteger(); - private final int _poolSize; + List<NonBlockingConnection> unregisterableConnections = new ArrayList<>(); - private NetworkConnectionScheduler() + while (_unregisteredConnections.peek() != null) { - _poolSize = Runtime.getRuntime().availableProcessors(); - _executor = new ScheduledThreadPoolExecutor(_poolSize); - _executor.prestartAllCoreThreads(); - } + NonBlockingConnection unregisteredConnection = _unregisteredConnections.poll(); + _unscheduledConnections.add(unregisteredConnection); - public void processConnection(final NonBlockingConnection connection) - { + + final int ops = (unregisteredConnection.canRead() ? SelectionKey.OP_READ : 0) + | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); try { - _running.incrementAndGet(); - boolean rerun; - do - { - rerun = false; - boolean closed = connection.doWork(); + LOGGER.debug("KWDEBUG# Registering " + unregisteredConnection); + unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); + } + catch (ClosedChannelException e) + { + unregisterableConnections.add(unregisteredConnection); + } + } - if (!closed) - { + return unregisterableConnections; + } - if (connection.isStateChanged()) - { - if (_running.get() == _poolSize) - { - schedule(connection); - } - else - { - rerun = true; - } - } - else - { - SelectorThread.this.addConnection(connection); - } - } + private List<NonBlockingConnection> processSelectionKeys() + { + List<NonBlockingConnection> toBeScheduled = new ArrayList<>(); - } while (rerun); + Set<SelectionKey> selectionKeys = _selector.selectedKeys(); + for (SelectionKey key : selectionKeys) + { + if(key.isAcceptable()) + { + // todo - should we schedule this rather than running in this thread? + _transport.acceptSocketChannel((ServerSocketChannel)key.channel()); } - finally + else { - _running.decrementAndGet(); + NonBlockingConnection connection = (NonBlockingConnection) key.attachment(); + + try + { + LOGGER.debug("KWDEBUG# Setting interest to zero (PSK)" + connection); + + key.channel().register(_selector, 0); + } + catch (ClosedChannelException e) + { + // Ignore - we will schedule the connection anyway + } + + toBeScheduled.add(connection); + _unscheduledConnections.remove(connection); } - } - public void schedule(final NonBlockingConnection connection) - { - _executor.submit(new Runnable() - { - @Override - public void run() - { - String currentName = Thread.currentThread().getName(); - try - { - Thread.currentThread().setName( - IO_THREAD_NAME_PREFIX + connection.getRemoteAddress().toString()); - processConnection(connection); - } - finally - { - Thread.currentThread().setName(currentName); - } - } - }); } + selectionKeys.clear(); + + return toBeScheduled; + } - public void close() + private void runTasks() + { + while(_tasks.peek() != null) { - _executor.shutdown(); + Runnable task = _tasks.poll(); + task.run(); } + } + + public void addConnection(final NonBlockingConnection connection) + { + _unregisteredConnections.add(connection); + _selector.wakeup(); + + } + public void wakeup() + { + _selector.wakeup(); + } + public void close() + { + _closed.set(true); + _selector.wakeup(); + _scheduler.close(); + } + boolean isIOThread() + { + return Thread.currentThread().getName().startsWith(SelectorThread.IO_THREAD_NAME_PREFIX); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java index 7874437a2f..ad236733d3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java @@ -79,7 +79,8 @@ class TCPandSSLTransport implements AcceptingTransport } final NetworkTransportConfiguration settings = new ServerNetworkTransportConfiguration(); - _networkTransport = new NonBlockingNetworkTransport(); + + final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory( _port.getParent(Broker.class), @@ -97,7 +98,9 @@ class TCPandSSLTransport implements AcceptingTransport { encryptionSet.add(TransportEncryption.TLS); } - _networkTransport.accept(settings, protocolEngineFactory, _sslContext, encryptionSet); + + _networkTransport = new NonBlockingNetworkTransport(settings, protocolEngineFactory, _sslContext, encryptionSet); + _networkTransport.start(); } public int getAcceptingPort() diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index 34afcdef66..6762c00a37 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -592,12 +592,6 @@ public class MockConsumer implements ConsumerTarget } @Override - public boolean isSessionNameUnique(byte[] name) - { - return false; - } - - @Override public String getRemoteAddressString() { return "remoteAddress:1234"; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 2280377fca..12aaa09eb0 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -540,11 +540,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S return _connectionId; } - public boolean isSessionNameUnique(byte[] name) - { - return !super.hasSessionWithName(name); - } - public String getRemoteAddressString() { return String.valueOf(getRemoteAddress()); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 7f646b43b4..b4c0b15dc0 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -371,12 +371,17 @@ public class ServerConnectionDelegate extends ServerDelegate while(connections.hasNext()) { final AMQConnectionModel amqConnectionModel = connections.next(); - final String userName = amqConnectionModel.getAuthorizedPrincipal() == null - ? "" - : amqConnectionModel.getAuthorizedPrincipal().getName(); - if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name)) + if (amqConnectionModel instanceof ServerConnection) { - return false; + ServerConnection otherConnection = (ServerConnection)amqConnectionModel; + + final String userName = amqConnectionModel.getAuthorizedPrincipal() == null + ? "" + : amqConnectionModel.getAuthorizedPrincipal().getName(); + if (userId.equals(userName) && otherConnection.hasSessionWithName(name)) + { + return false; + } } } return true; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 87becd955d..16ea23b765 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -857,9 +857,6 @@ public class AMQChannel return false; } - /** - * Called from the protocol session to close this channel and clean up. T - */ @Override public void close() { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 3cd5e7bba2..cf61e135b0 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -93,7 +93,6 @@ import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.util.BytesDataOutput; public class AMQProtocolEngine implements ServerProtocolEngine, AMQConnectionModel<AMQProtocolEngine, AMQChannel>, @@ -117,9 +116,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; - private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; - public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength"; - public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80; + private static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength"; + private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80; private static final long AWAIT_CLOSED_TIMEOUT = 60000; private final AmqpPort<?> _port; private final long _creationTime; @@ -156,10 +154,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private volatile boolean _closed; - // maximum number of channels this session should have private long _maxNoOfChannels; - /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList = @@ -172,7 +168,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); - private final long _connectionID; + private final long _connectionId; private Object _reference = new Object(); private LogSubject _logSubject; @@ -190,7 +186,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private ByteBufferSender _sender; private volatile boolean _deferFlush; - private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? + private long _lastReceivedTime = System.currentTimeMillis(); private boolean _blocking; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); @@ -247,7 +243,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _decoder = new BrokerDecoder(this); - _connectionID = connectionId; + _connectionId = connectionId; _logSubject = new ConnectionLogSubject(this); _binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH) ? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH) @@ -272,11 +268,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return null; } }); - - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); - _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); - _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); - _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + _connectionId); + _dataDelivered = new StatisticsCounter("data-delivered-" + _connectionId); + _messagesReceived = new StatisticsCounter("messages-received-" + _connectionId); + _dataReceived = new StatisticsCounter("data-received-" + _connectionId); _creationTime = System.currentTimeMillis(); } @@ -323,11 +319,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _sender = sender; } - public long getSessionID() - { - return _connectionID; - } - public void setMaxFrameSize(int frameMax) { _maxFrameSize = frameMax; @@ -368,7 +359,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + "ms to establish identity. Closing as possible DoS."); getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); - closeProtocolSession(); + closeNetworkConnection(); } _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; @@ -382,37 +373,37 @@ public class AMQProtocolEngine implements ServerProtocolEngine, catch (ConnectionScopedRuntimeException e) { _logger.error("Unexpected exception", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (AMQProtocolVersionException e) { _logger.error("Unexpected protocol version", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (SenderClosedException e) { _logger.debug("Sender was closed abruptly, closing network.", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (SenderException e) { _logger.info("Unexpected exception on send, closing network.", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (TransportException e) { _logger.error("Unexpected transport exception", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (AMQFrameDecodingException e) { _logger.error("Frame decoding", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (IOException e) { _logger.error("I/O Exception", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (StoreException e) { @@ -484,7 +475,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, ProtocolVersion pv = pi.checkVersion(); // Fails if not correct - // This sets the protocol version (and hence framing classes) for this session. setProtocolVersion(pv); StringBuilder mechanismBuilder = new StringBuilder(); @@ -538,15 +528,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } - private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; - private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); - /** - * Convenience method that writes a frame to the protocol session. Equivalent to calling - * getProtocolSession().write(). - * - * @param frame the frame to write - */ + public synchronized void writeFrame(AMQDataBlock frame) { if(_logger.isDebugEnabled()) @@ -730,12 +713,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, sessionRemoved(session); } - /** - * Initialise heartbeats on the session. - * - * @param delay delay in seconds (not ms) - */ - public void initHeartbeats(int delay) + private void initHeartbeats(int delay) { if (delay > 0) { @@ -749,9 +727,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - /** - * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. - */ private void closeAllChannels() { try @@ -792,7 +767,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - public void closeSession(final boolean connectionDropped) + private void closeConnectionInternal(final boolean connectionDropped) { if(runningAsSubject()) @@ -823,7 +798,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, @Override public Object run() { - closeSession(connectionDropped); + closeConnectionInternal(connectionDropped); return null; } }); @@ -921,7 +896,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { markChannelAwaitingCloseOk(channelId); - closeSession(false); + closeConnectionInternal(false); } finally { @@ -931,7 +906,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } finally { - closeProtocolSession(); + closeNetworkConnection(); } } } @@ -941,7 +916,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - public void closeProtocolSession() + public void closeNetworkConnection() { _network.close(); } @@ -951,19 +926,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")"); } - /** @return an object that can be used to identity */ - public Object getKey() - { - return getRemoteAddress(); - } - - /** - * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may - * be bound to multiple addresses this could vary depending on the acceptor this session was created from. - * - * @return a String FQDN - */ - public String getLocalFQDN() + private String getLocalFQDN() { SocketAddress address = _network.getLocalAddress(); if (address instanceof InetSocketAddress) @@ -1149,11 +1112,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { try { - closeSession(true); + closeConnectionInternal(true); } finally { - closeProtocolSession(); + closeNetworkConnection(); } } catch (ConnectionScopedRuntimeException | TransportException e) @@ -1311,7 +1274,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public long getConnectionId() { - return getSessionID(); + return _connectionId; } public String getAddress() @@ -1452,12 +1415,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _dataReceived.reset(); } - public boolean isSessionNameUnique(byte[] name) - { - // 0-8/0-9/0-9-1 sessions don't have names - return true; - } - public String getRemoteAddressString() { return String.valueOf(getRemoteAddress()); @@ -1640,19 +1597,20 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } try { - closeSession(false); + closeConnectionInternal(false); + + MethodRegistry methodRegistry = getMethodRegistry(); + ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); + writeFrame(responseBody.generateFrame(0)); } catch (Exception e) { - _logger.error("Error closing protocol session: " + e, e); + _logger.error("Error closing connection for " + getRemoteAddressString(), e); + } + finally + { + closeNetworkConnection(); } - - MethodRegistry methodRegistry = getMethodRegistry(); - ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); - writeFrame(responseBody.generateFrame(0)); - - closeProtocolSession(); - } @Override @@ -1667,12 +1625,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { - closeSession(false); + closeConnectionInternal(false); } catch (Exception e) { - _logger.error("Error closing protocol session: " + e, e); + _logger.error("Error closing connection: " + getRemoteAddressString(), e); } + finally + { + closeNetworkConnection(); + } + } @Override @@ -1692,7 +1655,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, SaslServer ss = getSaslServer(); if (ss == null) { - closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 ); + closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in connection",0 ); } MethodRegistry methodRegistry = getMethodRegistry(); SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 3e5da53d80..5f7d5fe46e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -377,7 +377,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen { String subscriber = "[channel=" + _channel + ", consumerTag=" + _consumerTag + - ", session=" + getProtocolSession().getKey() ; + ", session=" + getConnection().getRemoteAddressString(); return subscriber + "]"; } @@ -450,7 +450,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return _consumerTag; } - public AMQProtocolEngine getProtocolSession() + private AMQProtocolEngine getConnection() { return _channel.getConnection(); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java index 4a84ccad37..15ce9262fc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java @@ -365,7 +365,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _underlyingBody.writePayload(sender); } - public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException { throw new AMQException("This block should never be dispatched!"); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 67c51c78ed..7e68bee661 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -218,7 +218,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } } - public void closeProtocolSession() + public void closeNetworkConnection() { // Override as we don't have a real IOSession to close. // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java index c6f7defe56..459fc94484 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; -/** Test class to test MBean operations for AMQMinaProtocolSession. */ public class MaxChannelsTest extends QpidTestCase { private AMQProtocolEngine _session; @@ -62,7 +61,6 @@ public class MaxChannelsTest extends QpidTestCase try { _session.getVirtualHost().close(); - _session.closeSession(false); } finally { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index b515fda4a7..ff8f642c0d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -324,12 +324,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public boolean isSessionNameUnique(byte[] name) - { - return true; // TODO - } - - @Override public String getRemoteAddressString() { return String.valueOf(_conn.getRemoteAddress()); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 5020ffe3cd..17e306f5bd 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -212,7 +212,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem if(port.getState() != State.ACTIVE) { - // TODO - RG + // TODO - RG - probably does nothing port.startAsync(); } Connector connector = null; diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java index 4965f54403..3c3276c87a 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java @@ -49,6 +49,21 @@ public class InternalBrokerHolder implements BrokerHolder @Override public void start(BrokerOptions options) throws Exception { + if (Thread.getDefaultUncaughtExceptionHandler() != null) + { + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() + { + @Override + public void uncaughtException(final Thread t, final Throwable e) + { + System.err.print("Thread terminated due to uncaught exception"); + e.printStackTrace(); + + LOGGER.error("Uncaught exception from thread " + t.getName(), e); + } + }); + } + LOGGER.info("Starting internal broker (same JVM)"); _broker = new Broker(new Action<Integer>() |
