summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java13
1 files changed, 12 insertions, 1 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index a6275900d5..247402e442 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1242,10 +1242,12 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException
{
boolean atTail = false;
+ boolean advanced;
while(!sub.isSuspended() && !atTail && deliveries != 0)
{
+ advanced = false;
sub.getSendLock();
try
{
@@ -1276,6 +1278,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(newNode != null)
{
+ advanced = true;
sub.setLastSeenEntry(node, newNode);
node = sub.getLastSeenEntry();
}
@@ -1300,7 +1303,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
- atTail = (_entries.next(node) == null);
+ atTail = (_entries.next(node) == null) && !advanced;
}
}
@@ -1320,6 +1323,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
advanceAllSubscriptions();
}
+ if(atTail && sub.isAutoClose())
+ {
+ unregisterSubscription(sub);
+
+ ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
+ }
+
return atTail;
}