diff options
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 13 |
1 files changed, 12 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index a6275900d5..247402e442 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1242,10 +1242,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException { boolean atTail = false; + boolean advanced; while(!sub.isSuspended() && !atTail && deliveries != 0) { + advanced = false; sub.getSendLock(); try { @@ -1276,6 +1278,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if(newNode != null) { + advanced = true; sub.setLastSeenEntry(node, newNode); node = sub.getLastSeenEntry(); } @@ -1300,7 +1303,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } - atTail = (_entries.next(node) == null); + atTail = (_entries.next(node) == null) && !advanced; } } @@ -1320,6 +1323,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener advanceAllSubscriptions(); } + if(atTail && sub.isAutoClose()) + { + unregisterSubscription(sub); + + ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); + } + return atTail; } |