summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2015-03-16 22:03:45 +0000
committerRobert Godfrey <rgodfrey@apache.org>2015-03-16 22:03:45 +0000
commitba9edafa5804918594d1fb2d806d884962cea057 (patch)
tree5503fc4031724d757bd4b4b3f3839524432f9b28 /qpid/java
parentfc3fd8427d41a3afed8a6cea83bf41149a788400 (diff)
downloadqpid-python-ba9edafa5804918594d1fb2d806d884962cea057.tar.gz
QPID-6429 : ensure that when message suspension is set, all targets have finished any in flight deliveries
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667144 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rwxr-xr-xqpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java24
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java16
2 files changed, 28 insertions, 12 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
index 5d90cfab19..b448919188 100755
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
@@ -92,15 +92,23 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
- if(!messageAssignmentSuspended)
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
{
- for(AMQSessionModel<?,?> session : _connection.getSessionModels())
- {
- for(Consumer<?> consumer : session.getConsumers())
- {
- ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
- }
- }
+ for (Consumer<?> consumer : session.getConsumers())
+ {
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
+ {
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
+ }
+ }
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index 28e15f04d3..0cb83d33a7 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -175,13 +175,21 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
_messageAssignmentSuspended.set(messageAssignmentSuspended ? Thread.currentThread() : null);
- if(!messageAssignmentSuspended)
+ for(AMQSessionModel<?,?> session : _connection.getSessionModels())
{
- for(AMQSessionModel<?,?> session : _connection.getSessionModels())
+ for(Consumer<?> consumer : session.getConsumers())
{
- for(Consumer<?> consumer : session.getConsumers())
+ ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+ if (!messageAssignmentSuspended)
{
- ((ConsumerImpl)consumer).getTarget().notifyCurrentState();
+ consumerImpl.getTarget().notifyCurrentState();
+ }
+ else
+ {
+ // ensure that by the time the method returns, no consumer can be in the process of
+ // delivering a message.
+ consumerImpl.getSendLock();
+ consumerImpl.releaseSendLock();
}
}
}