diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-10-17 16:09:51 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-10-17 16:09:51 +0000 |
| commit | 613206be5872b25118b5476ceb5f4dda3edd0a6b (patch) | |
| tree | 7b2ff818b99f26f0be04316268c2cab65c7697bb /qpid/java/broker/src | |
| parent | cbd4b306577e57075b47f8825e8fc46b3966ddf4 (diff) | |
| download | qpid-python-613206be5872b25118b5476ceb5f4dda3edd0a6b.tar.gz | |
QPID-1315:
BasicBytesFlowControl doesn't wait long enough to determine if the 3rd message is going to be delivered accidently. It also ack'd every message, which was not it's intent, so use acknowledgeThis() instead.
Refactor common code out of processQueue and flushSubscription into attemptDelivery.
Make sure sendLock is held when closing the consumer.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@705657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 20 | ||||
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 251 |
2 files changed, 114 insertions, 157 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a668016f93..0e2645689c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -336,7 +336,14 @@ public class AMQChannel Subscription sub = _tag2SubscriptionMap.remove(consumerTag); if (sub != null) { - sub.getQueue().unregisterSubscription(sub); + try { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } return true; } else @@ -395,7 +402,16 @@ public class AMQChannel Subscription sub = me.getValue(); - sub.getQueue().unregisterSubscription(sub); + try + { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } + } _tag2SubscriptionMap.clear(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index bfbcb9e22f..bc335d0ba5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1174,7 +1174,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean complete = false; try { - complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES); + complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); } catch (AMQException e) @@ -1204,79 +1204,28 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener flushSubscription(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException + public boolean flushSubscription(Subscription sub, Long deliveries) throws AMQException { boolean atTail = false; - boolean advanced; while (!sub.isSuspended() && !atTail && deliveries != 0) { - - advanced = false; - sub.getSendLock(); - try + try { - if (sub.isActive()) + sub.getSendLock(); + atTail = attemptDelivery(sub, deliveries); + if (atTail && sub.isAutoClose()) { - QueueEntry node = moveSubscriptionToNextNode(sub); - if (!(node.isAcquired() || node.isDeleted())) - { - if (!sub.isSuspended()) - { - if (sub.hasInterest(node)) - { - if (!sub.wouldSuspend(node)) - { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - - } - else - { - deliveries--; - deliverMessage(sub, node); - - if (sub.isBrowser()) - { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - advanced = true; - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } - } - } - - } - else - { - break; - } - } - else - { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } - } - } - - } - atTail = (_entries.next(node) == null) && !advanced; + unregisterSubscription(sub); + ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); } } finally { sub.releaseSendLock(); } - } // if there's (potentially) more than one subscription the others will potentially not have been advanced to the @@ -1287,16 +1236,72 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { advanceAllSubscriptions(); } + return atTail; + } - if (atTail && sub.isAutoClose()) + private boolean attemptDelivery(Subscription sub, Long deliveries) throws AMQException + { + boolean atTail = false; + boolean advanced = false; + boolean subActive = sub.isActive(); + if (subActive) { - unregisterSubscription(sub); + QueueEntry node = moveSubscriptionToNextNode(sub); + if (!(node.isAcquired() || node.isDeleted())) + { + if (!sub.isSuspended()) + { + if (sub.hasInterest(node)) + { + if (!sub.wouldSuspend(node)) + { + if (!sub.isBrowser() && !node.acquire(sub)) + { + sub.restoreCredit(node); - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); - } + } + else + { + deliveries--; + deliverMessage(sub, node); - return atTail; + if (sub.isBrowser()) + { + QueueEntry newNode = _entries.next(node); + + if (newNode != null) + { + advanced = true; + sub.setLastSeenEntry(node, newNode); + node = sub.getLastSeenEntry(); + } + } + } + + } + else // Not enough Credit for message and wouldSuspend + { + //QPID-1187 - Treat the subscription as suspended for this message + // and wait for the message to be removed to continue delivery. + subActive = false; + node.addStateChangeListener(new QueueEntryListener(sub, node)); + } + } + else + { + // this subscription is not interested in this node so we can skip over it + QueueEntry newNode = _entries.next(node); + if (newNode != null) + { + sub.setLastSeenEntry(node, newNode); + } + } + } + + } + atTail = (_entries.next(node) == null) && !advanced; + } + return atTail || !subActive; } protected void advanceAllSubscriptions() throws AMQException @@ -1347,7 +1352,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean deliveryIncomplete = true; int extraLoops = 1; - int deliveries = MAX_ASYNC_DELIVERIES; + Long deliveries = new Long(MAX_ASYNC_DELIVERIES); _asynchronousRunner.compareAndSet(runner, null); @@ -1372,110 +1377,46 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); - if (sub != null) + sub.getSendLock(); + try { - sub.getSendLock(); - try + if (sub != null) { - QueueEntry node = moveSubscriptionToNextNode(sub); - if (node != null && sub.isActive()) + QueueEntry node = moveSubscriptionToNextNode(sub); + if (node != null) { - boolean advanced = false; - boolean subActive = false; - - if (!(node.isAcquired() || node.isDeleted())) - { - if (!sub.isSuspended()) - { - subActive = true; - if (sub.hasInterest(node)) - { - if (!sub.wouldSuspend(node)) - { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - - } - else - { - deliverMessage(sub, node); - deliveries--; - - if (sub.isBrowser()) - { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - advanced = true; - } - - } - } - done = false; - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - - node.addStateChangeListener(new QueueEntryListener(sub, node)); - } - } - else - { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } - } - } - } - final boolean atTail = (_entries.next(node) == null); - - done = done && (!subActive || atTail); - - closeConsumer = (atTail && !advanced && sub.isAutoClose()); + done = attemptDelivery(sub, deliveries); } } - finally + if (done) { - sub.releaseSendLock(); - } - - if (closeConsumer) - { - unregisterSubscription(sub); - - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); - } + if (extraLoops == 0) + { + deliveryIncomplete = false; + if (sub.isAutoClose()) + { + unregisterSubscription(sub); - } - if (done) - { - if (extraLoops == 0) - { - deliveryIncomplete = false; + ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); + } + } + else + { + extraLoops--; + } } else { - extraLoops--; + extraLoops = 1; } } - else + finally { - extraLoops = 1; + sub.releaseSendLock(); } } - _asynchronousRunner.set(null); } |
