summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-16 23:26:20 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-16 23:26:20 +0000
commita7d6b731a7cca5b3a61855b222e0a8e7f2da8e18 (patch)
treed9c8a91c7d4b7bfe252f0ed126a481c1d2682cb8 /qpid/java/broker-plugins
parentba9edafa5804918594d1fb2d806d884962cea057 (diff)
downloadqpid-python-a7d6b731a7cca5b3a61855b222e0a8e7f2da8e18.tar.gz
QPID-6429 : ensure the consumer target queues are emptied before we perform async actions like close
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667152 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java9
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java10
3 files changed, 14 insertions, 15 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 12aaa09eb0..855272fbef 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -718,17 +718,17 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S
public void processPending()
{
- while(_asyncTaskList.peek() != null)
- {
- Action<? super ServerConnection> asyncAction = _asyncTaskList.poll();
- asyncAction.performAction(this);
- }
for (AMQSessionModel session : getSessionModels())
{
session.processPending();
}
+ while(_asyncTaskList.peek() != null)
+ {
+ Action<? super ServerConnection> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
+ }
}
public void closeAndIgnoreFutureInput()
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index cc7c74057e..9a80c870d7 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -2034,7 +2034,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
@Override
public void processPending()
{
-
+ for (AMQSessionModel session : getSessionModels())
+ {
+ session.processPending();
+ }
while(_asyncTaskList.peek() != null)
{
@@ -2042,10 +2045,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
asyncAction.performAction(this);
}
- for (AMQSessionModel session : getSessionModels())
- {
- session.processPending();
- }
}
@Override
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index ff8f642c0d..d1254cb289 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -538,15 +538,15 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
public void processPending()
{
- while(_asyncTaskList.peek() != null)
+ for (AMQSessionModel session : getSessionModels())
{
- Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
- asyncAction.performAction(this);
+ session.processPending();
}
- for (AMQSessionModel session : getSessionModels())
+ while(_asyncTaskList.peek() != null)
{
- session.processPending();
+ Action<? super Connection_1_0> asyncAction = _asyncTaskList.poll();
+ asyncAction.performAction(this);
}
}