From a7d6b731a7cca5b3a61855b222e0a8e7f2da8e18 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 16 Mar 2015 23:26:20 +0000 Subject: QPID-6429 : ensure the consumer target queues are emptied before we perform async actions like close git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667152 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/protocol/v0_10/ServerConnection.java | 10 +++++----- .../apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java | 9 ++++----- .../org/apache/qpid/server/protocol/v1_0/Connection_1_0.java | 10 +++++----- 3 files changed, 14 insertions(+), 15 deletions(-) (limited to 'qpid/java/broker-plugins') diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 12aaa09eb0..855272fbef 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -718,17 +718,17 @@ public class ServerConnection extends Connection implements AMQConnectionModel asyncAction = _asyncTaskList.poll(); - asyncAction.performAction(this); - } for (AMQSessionModel session : getSessionModels()) { session.processPending(); } + while(_asyncTaskList.peek() != null) + { + Action asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); + } } public void closeAndIgnoreFutureInput() diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index cc7c74057e..9a80c870d7 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -2034,7 +2034,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, @Override public void processPending() { - + for (AMQSessionModel session : getSessionModels()) + { + session.processPending(); + } while(_asyncTaskList.peek() != null) { @@ -2042,10 +2045,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, asyncAction.performAction(this); } - for (AMQSessionModel session : getSessionModels()) - { - session.processPending(); - } } @Override diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index ff8f642c0d..d1254cb289 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -538,15 +538,15 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod public void processPending() { - while(_asyncTaskList.peek() != null) + for (AMQSessionModel session : getSessionModels()) { - Action asyncAction = _asyncTaskList.poll(); - asyncAction.performAction(this); + session.processPending(); } - for (AMQSessionModel session : getSessionModels()) + while(_asyncTaskList.peek() != null) { - session.processPending(); + Action asyncAction = _asyncTaskList.poll(); + asyncAction.performAction(this); } } -- cgit v1.2.1