diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-10-17 16:09:51 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-10-17 16:09:51 +0000 |
| commit | f91439713b0990709b1d7e778166e8b7d0d59d8d (patch) | |
| tree | f946bc60bcbdb285a10d9a1d04dbf12f350e8fda /java | |
| parent | 4323068739c561d68ddeebfe1c5ccff4b8ae057b (diff) | |
| download | qpid-python-f91439713b0990709b1d7e778166e8b7d0d59d8d.tar.gz | |
QPID-1315:
BasicBytesFlowControl doesn't wait long enough to determine if the 3rd message is going to be delivered accidently. It also ack'd every message, which was not it's intent, so use acknowledgeThis() instead.
Refactor common code out of processQueue and flushSubscription into attemptDelivery.
Make sure sendLock is held when closing the consumer.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@705657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 124 insertions, 169 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a668016f93..0e2645689c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -336,7 +336,14 @@ public class AMQChannel Subscription sub = _tag2SubscriptionMap.remove(consumerTag); if (sub != null) { - sub.getQueue().unregisterSubscription(sub); + try { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } return true; } else @@ -395,7 +402,16 @@ public class AMQChannel Subscription sub = me.getValue(); - sub.getQueue().unregisterSubscription(sub); + try + { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } + } _tag2SubscriptionMap.clear(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index bfbcb9e22f..bc335d0ba5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1174,7 +1174,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean complete = false; try { - complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES); + complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); } catch (AMQException e) @@ -1204,79 +1204,28 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener flushSubscription(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException + public boolean flushSubscription(Subscription sub, Long deliveries) throws AMQException { boolean atTail = false; - boolean advanced; while (!sub.isSuspended() && !atTail && deliveries != 0) { - - advanced = false; - sub.getSendLock(); - try + try { - if (sub.isActive()) + sub.getSendLock(); + atTail = attemptDelivery(sub, deliveries); + if (atTail && sub.isAutoClose()) { - QueueEntry node = moveSubscriptionToNextNode(sub); - if (!(node.isAcquired() || node.isDeleted())) - { - if (!sub.isSuspended()) - { - if (sub.hasInterest(node)) - { - if (!sub.wouldSuspend(node)) - { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - - } - else - { - deliveries--; - deliverMessage(sub, node); - - if (sub.isBrowser()) - { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - advanced = true; - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } - } - } - - } - else - { - break; - } - } - else - { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } - } - } - - } - atTail = (_entries.next(node) == null) && !advanced; + unregisterSubscription(sub); + ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); } } finally { sub.releaseSendLock(); } - } // if there's (potentially) more than one subscription the others will potentially not have been advanced to the @@ -1287,16 +1236,72 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { advanceAllSubscriptions(); } + return atTail; + } - if (atTail && sub.isAutoClose()) + private boolean attemptDelivery(Subscription sub, Long deliveries) throws AMQException + { + boolean atTail = false; + boolean advanced = false; + boolean subActive = sub.isActive(); + if (subActive) { - unregisterSubscription(sub); + QueueEntry node = moveSubscriptionToNextNode(sub); + if (!(node.isAcquired() || node.isDeleted())) + { + if (!sub.isSuspended()) + { + if (sub.hasInterest(node)) + { + if (!sub.wouldSuspend(node)) + { + if (!sub.isBrowser() && !node.acquire(sub)) + { + sub.restoreCredit(node); - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); - } + } + else + { + deliveries--; + deliverMessage(sub, node); - return atTail; + if (sub.isBrowser()) + { + QueueEntry newNode = _entries.next(node); + + if (newNode != null) + { + advanced = true; + sub.setLastSeenEntry(node, newNode); + node = sub.getLastSeenEntry(); + } + } + } + + } + else // Not enough Credit for message and wouldSuspend + { + //QPID-1187 - Treat the subscription as suspended for this message + // and wait for the message to be removed to continue delivery. + subActive = false; + node.addStateChangeListener(new QueueEntryListener(sub, node)); + } + } + else + { + // this subscription is not interested in this node so we can skip over it + QueueEntry newNode = _entries.next(node); + if (newNode != null) + { + sub.setLastSeenEntry(node, newNode); + } + } + } + + } + atTail = (_entries.next(node) == null) && !advanced; + } + return atTail || !subActive; } protected void advanceAllSubscriptions() throws AMQException @@ -1347,7 +1352,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean deliveryIncomplete = true; int extraLoops = 1; - int deliveries = MAX_ASYNC_DELIVERIES; + Long deliveries = new Long(MAX_ASYNC_DELIVERIES); _asynchronousRunner.compareAndSet(runner, null); @@ -1372,110 +1377,46 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); - if (sub != null) + sub.getSendLock(); + try { - sub.getSendLock(); - try + if (sub != null) { - QueueEntry node = moveSubscriptionToNextNode(sub); - if (node != null && sub.isActive()) + QueueEntry node = moveSubscriptionToNextNode(sub); + if (node != null) { - boolean advanced = false; - boolean subActive = false; - - if (!(node.isAcquired() || node.isDeleted())) - { - if (!sub.isSuspended()) - { - subActive = true; - if (sub.hasInterest(node)) - { - if (!sub.wouldSuspend(node)) - { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - - } - else - { - deliverMessage(sub, node); - deliveries--; - - if (sub.isBrowser()) - { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - advanced = true; - } - - } - } - done = false; - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - - node.addStateChangeListener(new QueueEntryListener(sub, node)); - } - } - else - { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } - } - } - } - final boolean atTail = (_entries.next(node) == null); - - done = done && (!subActive || atTail); - - closeConsumer = (atTail && !advanced && sub.isAutoClose()); + done = attemptDelivery(sub, deliveries); } } - finally + if (done) { - sub.releaseSendLock(); - } - - if (closeConsumer) - { - unregisterSubscription(sub); - - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); - } + if (extraLoops == 0) + { + deliveryIncomplete = false; + if (sub.isAutoClose()) + { + unregisterSubscription(sub); - } - if (done) - { - if (extraLoops == 0) - { - deliveryIncomplete = false; + ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); + } + } + else + { + extraLoops--; + } } else { - extraLoops--; + extraLoops = 1; } } - else + finally { - extraLoops = 1; + sub.releaseSendLock(); } } - _asynchronousRunner.set(null); } diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java index 910d546034..91ed9766f6 100644 --- a/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java +++ b/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.test.client; import org.apache.qpid.client.AMQSession_0_8; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.log4j.Logger; @@ -91,25 +92,22 @@ public class FlowControlTest extends QpidTestCase assertNotNull("Second message not received", r2); assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); - Message r3 = recv.receiveNoWait(); + Message r3 = recv.receive(RECEIVE_TIMEOUT); assertNull("Third message incorrectly delivered", r3); - r1.acknowledge(); + ((AbstractJMSMessage)r1).acknowledgeThis(); - r3 = recv.receiveNoWait(); + r3 = recv.receive(RECEIVE_TIMEOUT); assertNull("Third message incorrectly delivered", r3); - r2.acknowledge(); + ((AbstractJMSMessage)r2).acknowledgeThis(); r3 = recv.receive(RECEIVE_TIMEOUT); assertNotNull("Third message not received", r3); assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); - r3.acknowledge(); - recv.close(); - consumerSession.close(); + ((AbstractJMSMessage)r3).acknowledgeThis(); consumerConnection.close(); - } public void testTwoConsumersBytesFlowControl() throws Exception @@ -161,21 +159,21 @@ public class FlowControlTest extends QpidTestCase assertNotNull("First message not received", r1); assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); - Message r2 = recv1.receiveNoWait(); + Message r2 = recv1.receive(RECEIVE_TIMEOUT); assertNull("Second message incorrectly delivered", r2); Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); ((AMQSession_0_8) consumerSession2).setPrefetchLimits(0, 256); MessageConsumer recv2 = consumerSession2.createConsumer(_queue); - r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT); + r2 = recv2.receive(RECEIVE_TIMEOUT); assertNotNull("Second message not received", r2); assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); - Message r3 = recv2.receiveNoWait(); + Message r3 = recv2.receive(RECEIVE_TIMEOUT); assertNull("Third message incorrectly delivered", r3); - r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT); + r3 = recv1.receive(RECEIVE_TIMEOUT); assertNotNull("Third message not received", r3); assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); |
