diff options
Diffstat (limited to 'qpid/java')
3 files changed, 124 insertions, 169 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index a668016f93..0e2645689c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -336,7 +336,14 @@ public class AMQChannel Subscription sub = _tag2SubscriptionMap.remove(consumerTag); if (sub != null) { - sub.getQueue().unregisterSubscription(sub); + try { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } return true; } else @@ -395,7 +402,16 @@ public class AMQChannel Subscription sub = me.getValue(); - sub.getQueue().unregisterSubscription(sub); + try + { + sub.getSendLock(); + sub.getQueue().unregisterSubscription(sub); + } + finally + { + sub.releaseSendLock(); + } + } _tag2SubscriptionMap.clear(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index bfbcb9e22f..bc335d0ba5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1174,7 +1174,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean complete = false; try { - complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES); + complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES)); } catch (AMQException e) @@ -1204,79 +1204,28 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener flushSubscription(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException + public boolean flushSubscription(Subscription sub, Long deliveries) throws AMQException { boolean atTail = false; - boolean advanced; while (!sub.isSuspended() && !atTail && deliveries != 0) { - - advanced = false; - sub.getSendLock(); - try + try { - if (sub.isActive()) + sub.getSendLock(); + atTail = attemptDelivery(sub, deliveries); + if (atTail && sub.isAutoClose()) { - QueueEntry node = moveSubscriptionToNextNode(sub); - if (!(node.isAcquired() || node.isDeleted())) - { - if (!sub.isSuspended()) - { - if (sub.hasInterest(node)) - { - if (!sub.wouldSuspend(node)) - { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - - } - else - { - deliveries--; - deliverMessage(sub, node); - - if (sub.isBrowser()) - { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - advanced = true; - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - } - } - } - - } - else - { - break; - } - } - else - { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } - } - } - - } - atTail = (_entries.next(node) == null) && !advanced; + unregisterSubscription(sub); + ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); } } finally { sub.releaseSendLock(); } - } // if there's (potentially) more than one subscription the others will potentially not have been advanced to the @@ -1287,16 +1236,72 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { advanceAllSubscriptions(); } + return atTail; + } - if (atTail && sub.isAutoClose()) + private boolean attemptDelivery(Subscription sub, Long deliveries) throws AMQException + { + boolean atTail = false; + boolean advanced = false; + boolean subActive = sub.isActive(); + if (subActive) { - unregisterSubscription(sub); + QueueEntry node = moveSubscriptionToNextNode(sub); + if (!(node.isAcquired() || node.isDeleted())) + { + if (!sub.isSuspended()) + { + if (sub.hasInterest(node)) + { + if (!sub.wouldSuspend(node)) + { + if (!sub.isBrowser() && !node.acquire(sub)) + { + sub.restoreCredit(node); - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); - } + } + else + { + deliveries--; + deliverMessage(sub, node); - return atTail; + if (sub.isBrowser()) + { + QueueEntry newNode = _entries.next(node); + + if (newNode != null) + { + advanced = true; + sub.setLastSeenEntry(node, newNode); + node = sub.getLastSeenEntry(); + } + } + } + + } + else // Not enough Credit for message and wouldSuspend + { + //QPID-1187 - Treat the subscription as suspended for this message + // and wait for the message to be removed to continue delivery. + subActive = false; + node.addStateChangeListener(new QueueEntryListener(sub, node)); + } + } + else + { + // this subscription is not interested in this node so we can skip over it + QueueEntry newNode = _entries.next(node); + if (newNode != null) + { + sub.setLastSeenEntry(node, newNode); + } + } + } + + } + atTail = (_entries.next(node) == null) && !advanced; + } + return atTail || !subActive; } protected void advanceAllSubscriptions() throws AMQException @@ -1347,7 +1352,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean deliveryIncomplete = true; int extraLoops = 1; - int deliveries = MAX_ASYNC_DELIVERIES; + Long deliveries = new Long(MAX_ASYNC_DELIVERIES); _asynchronousRunner.compareAndSet(runner, null); @@ -1372,110 +1377,46 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { boolean closeConsumer = false; Subscription sub = subscriptionIter.getNode().getSubscription(); - if (sub != null) + sub.getSendLock(); + try { - sub.getSendLock(); - try + if (sub != null) { - QueueEntry node = moveSubscriptionToNextNode(sub); - if (node != null && sub.isActive()) + QueueEntry node = moveSubscriptionToNextNode(sub); + if (node != null) { - boolean advanced = false; - boolean subActive = false; - - if (!(node.isAcquired() || node.isDeleted())) - { - if (!sub.isSuspended()) - { - subActive = true; - if (sub.hasInterest(node)) - { - if (!sub.wouldSuspend(node)) - { - if (!sub.isBrowser() && !node.acquire(sub)) - { - sub.restoreCredit(node); - - } - else - { - deliverMessage(sub, node); - deliveries--; - - if (sub.isBrowser()) - { - QueueEntry newNode = _entries.next(node); - - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - node = sub.getLastSeenEntry(); - advanced = true; - } - - } - } - done = false; - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the subscription as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - - node.addStateChangeListener(new QueueEntryListener(sub, node)); - } - } - else - { - // this subscription is not interested in this node so we can skip over it - QueueEntry newNode = _entries.next(node); - if (newNode != null) - { - sub.setLastSeenEntry(node, newNode); - } - } - } - } - final boolean atTail = (_entries.next(node) == null); - - done = done && (!subActive || atTail); - - closeConsumer = (atTail && !advanced && sub.isAutoClose()); + done = attemptDelivery(sub, deliveries); } } - finally + if (done) { - sub.releaseSendLock(); - } - - if (closeConsumer) - { - unregisterSubscription(sub); - - ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); - } + if (extraLoops == 0) + { + deliveryIncomplete = false; + if (sub.isAutoClose()) + { + unregisterSubscription(sub); - } - if (done) - { - if (extraLoops == 0) - { - deliveryIncomplete = false; + ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); + } + } + else + { + extraLoops--; + } } else { - extraLoops--; + extraLoops = 1; } } - else + finally { - extraLoops = 1; + sub.releaseSendLock(); } } - _asynchronousRunner.set(null); } diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java index 910d546034..91ed9766f6 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/client/FlowControlTest.java @@ -21,6 +21,7 @@ package org.apache.qpid.test.client; import org.apache.qpid.client.AMQSession_0_8; +import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.log4j.Logger; @@ -91,25 +92,22 @@ public class FlowControlTest extends QpidTestCase assertNotNull("Second message not received", r2); assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); - Message r3 = recv.receiveNoWait(); + Message r3 = recv.receive(RECEIVE_TIMEOUT); assertNull("Third message incorrectly delivered", r3); - r1.acknowledge(); + ((AbstractJMSMessage)r1).acknowledgeThis(); - r3 = recv.receiveNoWait(); + r3 = recv.receive(RECEIVE_TIMEOUT); assertNull("Third message incorrectly delivered", r3); - r2.acknowledge(); + ((AbstractJMSMessage)r2).acknowledgeThis(); r3 = recv.receive(RECEIVE_TIMEOUT); assertNotNull("Third message not received", r3); assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); - r3.acknowledge(); - recv.close(); - consumerSession.close(); + ((AbstractJMSMessage)r3).acknowledgeThis(); consumerConnection.close(); - } public void testTwoConsumersBytesFlowControl() throws Exception @@ -161,21 +159,21 @@ public class FlowControlTest extends QpidTestCase assertNotNull("First message not received", r1); assertEquals("Messages in wrong order", 1, r1.getIntProperty("msg")); - Message r2 = recv1.receiveNoWait(); + Message r2 = recv1.receive(RECEIVE_TIMEOUT); assertNull("Second message incorrectly delivered", r2); Session consumerSession2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); ((AMQSession_0_8) consumerSession2).setPrefetchLimits(0, 256); MessageConsumer recv2 = consumerSession2.createConsumer(_queue); - r2 = recv2.receive(100000L);//RECEIVE_TIMEOUT); + r2 = recv2.receive(RECEIVE_TIMEOUT); assertNotNull("Second message not received", r2); assertEquals("Messages in wrong order", 2, r2.getIntProperty("msg")); - Message r3 = recv2.receiveNoWait(); + Message r3 = recv2.receive(RECEIVE_TIMEOUT); assertNull("Third message incorrectly delivered", r3); - r3 = recv1.receive(100000L);//RECEIVE_TIMEOUT); + r3 = recv1.receive(RECEIVE_TIMEOUT); assertNotNull("Third message not received", r3); assertEquals("Messages in wrong order", 3, r3.getIntProperty("msg")); |
