diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2015-03-17 16:11:18 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2015-03-17 16:11:18 +0000 |
| commit | 54f8c414e2b6aa15039071f5cab23dcdc5822f41 (patch) | |
| tree | 7e0f6b6d14bddbaf90e6233a82003fab5b816dcb /qpid/java | |
| parent | 5d64ed503b8868a2930f01b8233926c689aed200 (diff) | |
| download | qpid-python-54f8c414e2b6aa15039071f5cab23dcdc5822f41.tar.gz | |
QPID-6429 : Fix issue when async close is called twice on a session, tidy up debug and logging
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667346 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
6 files changed, 41 insertions, 39 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java index 7d3f2bd9a0..36fd63c360 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java @@ -83,12 +83,10 @@ class NetworkConnectionScheduler { if (_running.get() == _poolSize) { - LOGGER.debug("RGDEBUG: scheduler rescheduling"); schedule(connection); } else { - LOGGER.debug("RGDEBUG: scheduler rerunning - ( " + _running.get() + "/" + _poolSize + ")"); rerun = true; } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java index 3bc7978931..a0659a0c3d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java @@ -50,7 +50,7 @@ public class NonBlockingNetworkTransport private static final int TIMEOUT = Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME, CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT); private static final int HANDSHAKE_TIMEOUT = Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME , - CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); + CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT); private final Set<TransportEncryption> _encryptionSet; private final NetworkTransportConfiguration _config; private final ProtocolEngineFactory _factory; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java index ff75448787..8f2300107d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java @@ -175,8 +175,6 @@ public class SelectorThread extends Thread toBeScheduled.add(connection); try { - LOGGER.debug("KWDEBUG# Setting interest to zero (PUC) " + connection); - SelectionKey register = connection.getSocketChannel().register(_selector, 0); register.cancel(); } @@ -210,7 +208,6 @@ public class SelectorThread extends Thread | (unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0); try { - LOGGER.debug("KWDEBUG# Registering " + unregisteredConnection); unregisteredConnection.getSocketChannel().register(_selector, ops, unregisteredConnection); } catch (ClosedChannelException e) @@ -240,8 +237,6 @@ public class SelectorThread extends Thread try { - LOGGER.debug("KWDEBUG# Setting interest to zero (PSK)" + connection); - key.channel().register(_selector, 0); } catch (ClosedChannelException e) diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 855272fbef..4956faa199 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -260,21 +260,24 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S @Override public void performAction(final ServerConnection conn) { - ExecutionException ex = new ExecutionException(); - ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; - try + if(!session.isClosing()) { - code = ExecutionErrorCode.get(cause.getCode()); + ExecutionException ex = new ExecutionException(); + ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; + try + { + code = ExecutionErrorCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore, already set to INTERNAL_ERROR + } + ex.setErrorCode(code); + ex.setDescription(message); + session.invoke(ex); + + session.close(cause, message); } - catch (IllegalArgumentException iae) - { - // Ignore, already set to INTERNAL_ERROR - } - ex.setErrorCode(code); - ex.setDescription(message); - session.invoke(ex); - - session.close(cause, message); } }); @@ -382,20 +385,23 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S @Override public void performAction(final ServerConnection object) { - closeSubscriptions(); - performDeleteTasks(); - - setState(CLOSING); - ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; - try - { - replyCode = ConnectionCloseCode.get(cause.getCode()); - } - catch (IllegalArgumentException iae) + if(!isClosing()) { - // Ignore + closeSubscriptions(); + performDeleteTasks(); + + setState(CLOSING); + ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; + try + { + replyCode = ConnectionCloseCode.get(cause.getCode()); + } + catch (IllegalArgumentException iae) + { + // Ignore + } + sendConnectionClose(replyCode, message); } - sendConnectionClose(replyCode, message); } }); } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 2092e17858..59237f8c69 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -622,11 +622,6 @@ public class Connection extends ConnectionInvoker close(ConnectionCloseCode.NORMAL, null); } - public void mgmtClose() - { - close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface."); - } - protected void sendConnectionClose(ConnectionCloseCode replyCode, String replyText, Option ... _options) { @@ -875,4 +870,12 @@ public class Connection extends ConnectionInvoker _redirecting.set(redirecting); } + public boolean isClosing() + { + synchronized (lock) + { + return state == CLOSING || state == CLOSED; + } + } + } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java index 3c3276c87a..ee2c4be3ff 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java @@ -49,7 +49,7 @@ public class InternalBrokerHolder implements BrokerHolder @Override public void start(BrokerOptions options) throws Exception { - if (Thread.getDefaultUncaughtExceptionHandler() != null) + if (Thread.getDefaultUncaughtExceptionHandler() == null) { Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { |
