summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-10-17 16:09:51 +0000
committerAidan Skinner <aidan@apache.org>2008-10-17 16:09:51 +0000
commit613206be5872b25118b5476ceb5f4dda3edd0a6b (patch)
tree7b2ff818b99f26f0be04316268c2cab65c7697bb /qpid/java/broker/src
parentcbd4b306577e57075b47f8825e8fc46b3966ddf4 (diff)
downloadqpid-python-613206be5872b25118b5476ceb5f4dda3edd0a6b.tar.gz
QPID-1315:
BasicBytesFlowControl doesn't wait long enough to determine if the 3rd message is going to be delivered accidently. It also ack'd every message, which was not it's intent, so use acknowledgeThis() instead. Refactor common code out of processQueue and flushSubscription into attemptDelivery. Make sure sendLock is held when closing the consumer. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@705657 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java251
2 files changed, 114 insertions, 157 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index a668016f93..0e2645689c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -336,7 +336,14 @@ public class AMQChannel
Subscription sub = _tag2SubscriptionMap.remove(consumerTag);
if (sub != null)
{
- sub.getQueue().unregisterSubscription(sub);
+ try {
+ sub.getSendLock();
+ sub.getQueue().unregisterSubscription(sub);
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
return true;
}
else
@@ -395,7 +402,16 @@ public class AMQChannel
Subscription sub = me.getValue();
- sub.getQueue().unregisterSubscription(sub);
+ try
+ {
+ sub.getSendLock();
+ sub.getQueue().unregisterSubscription(sub);
+ }
+ finally
+ {
+ sub.releaseSendLock();
+ }
+
}
_tag2SubscriptionMap.clear();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index bfbcb9e22f..bc335d0ba5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1174,7 +1174,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
boolean complete = false;
try
{
- complete = flushSubscription(_sub, MAX_ASYNC_DELIVERIES);
+ complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
}
catch (AMQException e)
@@ -1204,79 +1204,28 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
flushSubscription(sub, Long.MAX_VALUE);
}
- public boolean flushSubscription(Subscription sub, long deliveries) throws AMQException
+ public boolean flushSubscription(Subscription sub, Long deliveries) throws AMQException
{
boolean atTail = false;
- boolean advanced;
while (!sub.isSuspended() && !atTail && deliveries != 0)
{
-
- advanced = false;
- sub.getSendLock();
- try
+ try
{
- if (sub.isActive())
+ sub.getSendLock();
+ atTail = attemptDelivery(sub, deliveries);
+ if (atTail && sub.isAutoClose())
{
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (!(node.isAcquired() || node.isDeleted()))
- {
- if (!sub.isSuspended())
- {
- if (sub.hasInterest(node))
- {
- if (!sub.wouldSuspend(node))
- {
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
-
- }
- else
- {
- deliveries--;
- deliverMessage(sub, node);
-
- if (sub.isBrowser())
- {
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- advanced = true;
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- }
- }
- }
-
- }
- else
- {
- break;
- }
- }
- else
- {
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
- }
- }
-
- }
- atTail = (_entries.next(node) == null) && !advanced;
+ unregisterSubscription(sub);
+ ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
}
}
finally
{
sub.releaseSendLock();
}
-
}
// if there's (potentially) more than one subscription the others will potentially not have been advanced to the
@@ -1287,16 +1236,72 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
advanceAllSubscriptions();
}
+ return atTail;
+ }
- if (atTail && sub.isAutoClose())
+ private boolean attemptDelivery(Subscription sub, Long deliveries) throws AMQException
+ {
+ boolean atTail = false;
+ boolean advanced = false;
+ boolean subActive = sub.isActive();
+ if (subActive)
{
- unregisterSubscription(sub);
+ QueueEntry node = moveSubscriptionToNextNode(sub);
+ if (!(node.isAcquired() || node.isDeleted()))
+ {
+ if (!sub.isSuspended())
+ {
+ if (sub.hasInterest(node))
+ {
+ if (!sub.wouldSuspend(node))
+ {
+ if (!sub.isBrowser() && !node.acquire(sub))
+ {
+ sub.restoreCredit(node);
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
- }
+ }
+ else
+ {
+ deliveries--;
+ deliverMessage(sub, node);
- return atTail;
+ if (sub.isBrowser())
+ {
+ QueueEntry newNode = _entries.next(node);
+
+ if (newNode != null)
+ {
+ advanced = true;
+ sub.setLastSeenEntry(node, newNode);
+ node = sub.getLastSeenEntry();
+ }
+ }
+ }
+
+ }
+ else // Not enough Credit for message and wouldSuspend
+ {
+ //QPID-1187 - Treat the subscription as suspended for this message
+ // and wait for the message to be removed to continue delivery.
+ subActive = false;
+ node.addStateChangeListener(new QueueEntryListener(sub, node));
+ }
+ }
+ else
+ {
+ // this subscription is not interested in this node so we can skip over it
+ QueueEntry newNode = _entries.next(node);
+ if (newNode != null)
+ {
+ sub.setLastSeenEntry(node, newNode);
+ }
+ }
+ }
+
+ }
+ atTail = (_entries.next(node) == null) && !advanced;
+ }
+ return atTail || !subActive;
}
protected void advanceAllSubscriptions() throws AMQException
@@ -1347,7 +1352,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
boolean deliveryIncomplete = true;
int extraLoops = 1;
- int deliveries = MAX_ASYNC_DELIVERIES;
+ Long deliveries = new Long(MAX_ASYNC_DELIVERIES);
_asynchronousRunner.compareAndSet(runner, null);
@@ -1372,110 +1377,46 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
boolean closeConsumer = false;
Subscription sub = subscriptionIter.getNode().getSubscription();
- if (sub != null)
+ sub.getSendLock();
+ try
{
- sub.getSendLock();
- try
+ if (sub != null)
{
- QueueEntry node = moveSubscriptionToNextNode(sub);
- if (node != null && sub.isActive())
+ QueueEntry node = moveSubscriptionToNextNode(sub);
+ if (node != null)
{
- boolean advanced = false;
- boolean subActive = false;
-
- if (!(node.isAcquired() || node.isDeleted()))
- {
- if (!sub.isSuspended())
- {
- subActive = true;
- if (sub.hasInterest(node))
- {
- if (!sub.wouldSuspend(node))
- {
- if (!sub.isBrowser() && !node.acquire(sub))
- {
- sub.restoreCredit(node);
-
- }
- else
- {
- deliverMessage(sub, node);
- deliveries--;
-
- if (sub.isBrowser())
- {
- QueueEntry newNode = _entries.next(node);
-
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- node = sub.getLastSeenEntry();
- advanced = true;
- }
-
- }
- }
- done = false;
- }
- else // Not enough Credit for message and wouldSuspend
- {
- //QPID-1187 - Treat the subscription as suspended for this message
- // and wait for the message to be removed to continue delivery.
- subActive = false;
-
- node.addStateChangeListener(new QueueEntryListener(sub, node));
- }
- }
- else
- {
- // this subscription is not interested in this node so we can skip over it
- QueueEntry newNode = _entries.next(node);
- if (newNode != null)
- {
- sub.setLastSeenEntry(node, newNode);
- }
- }
- }
- }
- final boolean atTail = (_entries.next(node) == null);
-
- done = done && (!subActive || atTail);
-
- closeConsumer = (atTail && !advanced && sub.isAutoClose());
+ done = attemptDelivery(sub, deliveries);
}
}
- finally
+ if (done)
{
- sub.releaseSendLock();
- }
-
- if (closeConsumer)
- {
- unregisterSubscription(sub);
-
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
- }
+ if (extraLoops == 0)
+ {
+ deliveryIncomplete = false;
+ if (sub.isAutoClose())
+ {
+ unregisterSubscription(sub);
- }
- if (done)
- {
- if (extraLoops == 0)
- {
- deliveryIncomplete = false;
+ ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
+ }
+ }
+ else
+ {
+ extraLoops--;
+ }
}
else
{
- extraLoops--;
+ extraLoops = 1;
}
}
- else
+ finally
{
- extraLoops = 1;
+ sub.releaseSendLock();
}
}
-
_asynchronousRunner.set(null);
}