From 73b7ec00e546b27d2bed22d30ad48a8735cfe6f0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 28 Mar 2011 23:18:16 +0000 Subject: Merge branch 'trunk' into qpid-2920 Conflicts: qpid/cpp/src/cluster.mk qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1086439 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/queue/SimpleAMQQueue.java | 91 ++++++++++++++++------ .../qpid/server/transport/ServerConnection.java | 14 +++- 2 files changed, 80 insertions(+), 25 deletions(-) (limited to 'qpid/java/broker') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b003152db6..0e3f7b2625 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1808,14 +1808,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + /** + * Used by queue Runners to asynchronously deliver messages to consumers. + * + * A queue Runner is started whenever a state change occurs, e.g when a new + * message arrives on the queue and cannot be immediately delivered to a + * subscription (i.e. asynchronous delivery is required). Unless there are + * SubFlushRunners operating (due to subscriptions unsuspending) which are + * capable of accepting/delivering all messages then these messages would + * otherwise remain on the queue. + * + * processQueue should be running while there are messages on the queue AND + * there are subscriptions that can deliver them. If there are no + * subscriptions capable of delivering the remaining messages on the queue + * then processQueue should stop to prevent spinning. + * + * Since processQueue is runs in a fixed size Executor, it should not run + * indefinitely to prevent starving other tasks of CPU (e.g jobs to process + * incoming messages may not be able to be scheduled in the thread pool + * because all threads are working on clearing down large queues). To solve + * this problem, after an arbitrary number of message deliveries the + * processQueue job stops iterating, resubmits itself to the executor, and + * ends the current instance + * + * @param runner the Runner to schedule + * @throws AMQException + */ private void processQueue(Runnable runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; boolean deliveryIncomplete = true; - int extraLoops = 1; - long iterations = MAX_ASYNC_DELIVERIES; + boolean lastLoop = false; + int iterations = MAX_ASYNC_DELIVERIES; _asynchronousRunner.compareAndSet(runner, null); @@ -1832,12 +1858,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (previousStateChangeCount != stateChangeCount) { - extraLoops = 1; + //further asynchronous delivery is required since the + //previous loop. keep going if iteration slicing allows. + lastLoop = false; } previousStateChangeCount = stateChangeCount; - deliveryIncomplete = _subscriptionList.size() != 0; - boolean done; + boolean allSubscriptionsDone = true; + boolean subscriptionDone; SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); //iterate over the subscribers and try to advance their pointer @@ -1847,30 +1875,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.getSendLock(); try { - - done = attemptDelivery(sub); - - if (done) + //attempt delivery. returns true if no further delivery currently possible to this sub + subscriptionDone = attemptDelivery(sub); + if (subscriptionDone) { - if (extraLoops == 0) + //close autoClose subscriptions if we are not currently intent on continuing + if (lastLoop && sub.isAutoClose()) { - deliveryIncomplete = false; - if (sub.isAutoClose()) - { - unregisterSubscription(sub); + unregisterSubscription(sub); - sub.confirmAutoClose(); - } - } - else - { - extraLoops--; + sub.confirmAutoClose(); } } else { + //this subscription can accept additional deliveries, so we must + //keep going after this (if iteration slicing allows it) + allSubscriptionsDone = false; + lastLoop = false; iterations--; - extraLoops = 1; } } finally @@ -1878,10 +1901,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.releaseSendLock(); } } + + if(allSubscriptionsDone && lastLoop) + { + //We have done an extra loop already and there are again + //again no further delivery attempts possible, only + //keep going if state change demands it. + deliveryIncomplete = false; + } + else if(allSubscriptionsDone) + { + //All subscriptions reported being done, but we have to do + //an extra loop if the iterations are not exhausted and + //there is still any work to be done + deliveryIncomplete = _subscriptionList.size() != 0; + lastLoop = true; + } + else + { + //some subscriptions can still accept more messages, + //keep going if iteration count allows. + lastLoop = false; + deliveryIncomplete = true; + } + _asynchronousRunner.set(null); } - // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit + // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 75bd50e3a2..54cd709af3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -88,8 +88,18 @@ public class ServerConnection extends Connection implements AMQConnectionModel, _onOpenTask.run(); } _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true)); + + getVirtualHost().getConnectionRegistry().registerConnection(this); } - + + if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING) + { + if(_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + } + if (state == State.CLOSED) { logClosed(); @@ -126,7 +136,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void setVirtualHost(VirtualHost virtualHost) { _virtualHost = virtualHost; - _virtualHost.getConnectionRegistry().registerConnection(this); initialiseStatistics(); } @@ -253,7 +262,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, // Ignore } close(replyCode, message); - getVirtualHost().getConnectionRegistry().deregisterConnection(this); } @Override -- cgit v1.2.1