summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-10-24 16:20:16 +0000
committerAidan Skinner <aidan@apache.org>2008-10-24 16:20:16 +0000
commit23f43391b5be10d744e0220a5cd523400ce7c719 (patch)
tree5f99dae20349b9bd970b0a86fe4fb0123775fbc1 /java
parent41a0c0dd0d0895afdaeb7054c8716dc4feb892dd (diff)
downloadqpid-python-23f43391b5be10d744e0220a5cd523400ce7c719.tar.gz
QPID-1315: Fix style issue, iterator control usage as per review comments from rgodfrey.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@707672 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java23
2 files changed, 15 insertions, 11 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 0fd2b5c83a..26ac562fb2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -333,7 +333,8 @@ public class AMQChannel
Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
if (sub != null)
{
- try {
+ try
+ {
sub.getSendLock();
sub.getQueue().unregisterSubscription(sub);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index bc335d0ba5..7e7e8b2114 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1204,16 +1204,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
flushSubscription(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, Long deliveries) throws AMQException
+ public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException
{
boolean atTail = false;
- while (!sub.isSuspended() && !atTail && deliveries != 0)
+ while (!sub.isSuspended() && !atTail && iterations != 0)
{
try
{
sub.getSendLock();
- atTail = attemptDelivery(sub, deliveries);
+ atTail = attemptDelivery(sub);
if (atTail && sub.isAutoClose())
{
unregisterSubscription(sub);
@@ -1221,6 +1221,10 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
}
+ else if (!atTail)
+ {
+ iterations--;
+ }
}
finally
{
@@ -1239,7 +1243,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
return atTail;
}
- private boolean attemptDelivery(Subscription sub, Long deliveries) throws AMQException
+ private boolean attemptDelivery(Subscription sub) throws AMQException
{
boolean atTail = false;
boolean advanced = false;
@@ -1258,11 +1262,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (!sub.isBrowser() && !node.acquire(sub))
{
sub.restoreCredit(node);
-
}
else
{
- deliveries--;
deliverMessage(sub, node);
if (sub.isBrowser())
@@ -1352,11 +1354,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
boolean deliveryIncomplete = true;
int extraLoops = 1;
- Long deliveries = new Long(MAX_ASYNC_DELIVERIES);
+ Long iterations = new Long(MAX_ASYNC_DELIVERIES);
_asynchronousRunner.compareAndSet(runner, null);
- while (deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
+ while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
{
// we want to have one extra loop after every subscription has reached the point where it cannot move
// further, just in case the advance of one subscription in the last loop allows a different subscription to
@@ -1386,7 +1388,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = moveSubscriptionToNextNode(sub);
if (node != null)
{
- done = attemptDelivery(sub, deliveries);
+ done = attemptDelivery(sub);
}
}
if (done)
@@ -1409,6 +1411,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
else
{
+ iterations--;
extraLoops = 1;
}
}
@@ -1422,7 +1425,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
- if (deliveries == 0 && _asynchronousRunner.compareAndSet(null, runner))
+ if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
_asyncDelivery.execute(runner);
}