summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-16 22:01:30 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-16 22:01:30 +0000
commitfc3fd8427d41a3afed8a6cea83bf41149a788400 (patch)
tree1189ec0f22c230ff84439373165f9215c6c5486e /qpid/java
parent0822ad8d51667ded7246a95b68662eb007d28519 (diff)
downloadqpid-python-fc3fd8427d41a3afed8a6cea83bf41149a788400.tar.gz
QPID-6429 : ensure that when message suspension is set, all targets have finished any in flight deliveries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667142 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java16
1 files changed, 12 insertions, 4 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index cf61e135b0..cc7c74057e 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -219,13 +219,21 @@ public class AMQProtocolEngine implements ServerProtocolEngine,
public void setMessageAssignmentSuspended(final boolean messageAssignmentSuspended)
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
- if(!messageAssignmentSuspended)
+ for(AMQSessionModel<?,?> session : getSessionModels())
{
- for(AMQSessionModel<?,?> session : getSessionModels())
+ for (Consumer<?> consumer : session.getConsumers())
{
- for(Consumer<?> consumer : session.getConsumers())
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
{
- ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
}
}
}