diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-10-24 16:20:16 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-10-24 16:20:16 +0000 |
| commit | 23f43391b5be10d744e0220a5cd523400ce7c719 (patch) | |
| tree | 5f99dae20349b9bd970b0a86fe4fb0123775fbc1 /java | |
| parent | 41a0c0dd0d0895afdaeb7054c8716dc4feb892dd (diff) | |
| download | qpid-python-23f43391b5be10d744e0220a5cd523400ce7c719.tar.gz | |
QPID-1315: Fix style issue, iterator control usage as per review comments from rgodfrey.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 3 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 23 |
2 files changed, 15 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 0fd2b5c83a..26ac562fb2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -333,7 +333,8 @@ public class AMQChannel Subscription sub = _tag2SubscriptionMap.remove(consumerTag); if (sub != null) { - try { + try + { sub.getSendLock(); sub.getQueue().unregisterSubscription(sub); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index bc335d0ba5..7e7e8b2114 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1204,16 +1204,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener flushSubscription(sub, Long.MAX_VALUE); } - public boolean flushSubscription(Subscription sub, Long deliveries) throws AMQException + public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException { boolean atTail = false; - while (!sub.isSuspended() && !atTail && deliveries != 0) + while (!sub.isSuspended() && !atTail && iterations != 0) { try { sub.getSendLock(); - atTail = attemptDelivery(sub, deliveries); + atTail = attemptDelivery(sub); if (atTail && sub.isAutoClose()) { unregisterSubscription(sub); @@ -1221,6 +1221,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter(); converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag()); } + else if (!atTail) + { + iterations--; + } } finally { @@ -1239,7 +1243,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener return atTail; } - private boolean attemptDelivery(Subscription sub, Long deliveries) throws AMQException + private boolean attemptDelivery(Subscription sub) throws AMQException { boolean atTail = false; boolean advanced = false; @@ -1258,11 +1262,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (!sub.isBrowser() && !node.acquire(sub)) { sub.restoreCredit(node); - } else { - deliveries--; deliverMessage(sub, node); if (sub.isBrowser()) @@ -1352,11 +1354,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener boolean deliveryIncomplete = true; int extraLoops = 1; - Long deliveries = new Long(MAX_ASYNC_DELIVERIES); + Long iterations = new Long(MAX_ASYNC_DELIVERIES); _asynchronousRunner.compareAndSet(runner, null); - while (deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) + while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner)) { // we want to have one extra loop after every subscription has reached the point where it cannot move // further, just in case the advance of one subscription in the last loop allows a different subscription to @@ -1386,7 +1388,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = moveSubscriptionToNextNode(sub); if (node != null) { - done = attemptDelivery(sub, deliveries); + done = attemptDelivery(sub); } } if (done) @@ -1409,6 +1411,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else { + iterations--; extraLoops = 1; } } @@ -1422,7 +1425,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). - if (deliveries == 0 && _asynchronousRunner.compareAndSet(null, runner)) + if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { _asyncDelivery.execute(runner); } |
