diff options
| author | Alan Conway <aconway@apache.org> | 2011-03-28 23:18:16 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2011-03-28 23:18:16 +0000 |
| commit | 73b7ec00e546b27d2bed22d30ad48a8735cfe6f0 (patch) | |
| tree | 45900f0e0a964b814207fd0adfbacc36b0bbab26 /qpid/java/broker | |
| parent | ea98033c3df7fbc1364df69c4edb5cf1e808e87e (diff) | |
| download | qpid-python-73b7ec00e546b27d2bed22d30ad48a8735cfe6f0.tar.gz | |
Merge branch 'trunk' into qpid-2920
Conflicts:
qpid/cpp/src/cluster.mk
qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1086439 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java | 91 | ||||
| -rw-r--r-- | qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java | 14 |
2 files changed, 80 insertions, 25 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index b003152db6..0e3f7b2625 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -1808,14 +1808,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + /** + * Used by queue Runners to asynchronously deliver messages to consumers. + * + * A queue Runner is started whenever a state change occurs, e.g when a new + * message arrives on the queue and cannot be immediately delivered to a + * subscription (i.e. asynchronous delivery is required). Unless there are + * SubFlushRunners operating (due to subscriptions unsuspending) which are + * capable of accepting/delivering all messages then these messages would + * otherwise remain on the queue. + * + * processQueue should be running while there are messages on the queue AND + * there are subscriptions that can deliver them. If there are no + * subscriptions capable of delivering the remaining messages on the queue + * then processQueue should stop to prevent spinning. + * + * Since processQueue is runs in a fixed size Executor, it should not run + * indefinitely to prevent starving other tasks of CPU (e.g jobs to process + * incoming messages may not be able to be scheduled in the thread pool + * because all threads are working on clearing down large queues). To solve + * this problem, after an arbitrary number of message deliveries the + * processQueue job stops iterating, resubmits itself to the executor, and + * ends the current instance + * + * @param runner the Runner to schedule + * @throws AMQException + */ private void processQueue(Runnable runner) throws AMQException { long stateChangeCount; long previousStateChangeCount = Long.MIN_VALUE; boolean deliveryIncomplete = true; - int extraLoops = 1; - long iterations = MAX_ASYNC_DELIVERIES; + boolean lastLoop = false; + int iterations = MAX_ASYNC_DELIVERIES; _asynchronousRunner.compareAndSet(runner, null); @@ -1832,12 +1858,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (previousStateChangeCount != stateChangeCount) { - extraLoops = 1; + //further asynchronous delivery is required since the + //previous loop. keep going if iteration slicing allows. + lastLoop = false; } previousStateChangeCount = stateChangeCount; - deliveryIncomplete = _subscriptionList.size() != 0; - boolean done; + boolean allSubscriptionsDone = true; + boolean subscriptionDone; SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator(); //iterate over the subscribers and try to advance their pointer @@ -1847,30 +1875,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.getSendLock(); try { - - done = attemptDelivery(sub); - - if (done) + //attempt delivery. returns true if no further delivery currently possible to this sub + subscriptionDone = attemptDelivery(sub); + if (subscriptionDone) { - if (extraLoops == 0) + //close autoClose subscriptions if we are not currently intent on continuing + if (lastLoop && sub.isAutoClose()) { - deliveryIncomplete = false; - if (sub.isAutoClose()) - { - unregisterSubscription(sub); + unregisterSubscription(sub); - sub.confirmAutoClose(); - } - } - else - { - extraLoops--; + sub.confirmAutoClose(); } } else { + //this subscription can accept additional deliveries, so we must + //keep going after this (if iteration slicing allows it) + allSubscriptionsDone = false; + lastLoop = false; iterations--; - extraLoops = 1; } } finally @@ -1878,10 +1901,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener sub.releaseSendLock(); } } + + if(allSubscriptionsDone && lastLoop) + { + //We have done an extra loop already and there are again + //again no further delivery attempts possible, only + //keep going if state change demands it. + deliveryIncomplete = false; + } + else if(allSubscriptionsDone) + { + //All subscriptions reported being done, but we have to do + //an extra loop if the iterations are not exhausted and + //there is still any work to be done + deliveryIncomplete = _subscriptionList.size() != 0; + lastLoop = true; + } + else + { + //some subscriptions can still accept more messages, + //keep going if iteration count allows. + lastLoop = false; + deliveryIncomplete = true; + } + _asynchronousRunner.set(null); } - // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit + // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit // therefore we should schedule this runner again (unless someone beats us to it :-) ). if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner)) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java index 75bd50e3a2..54cd709af3 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java @@ -88,8 +88,18 @@ public class ServerConnection extends Connection implements AMQConnectionModel, _onOpenTask.run(); } _actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true)); + + getVirtualHost().getConnectionRegistry().registerConnection(this); } - + + if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING) + { + if(_virtualHost != null) + { + _virtualHost.getConnectionRegistry().deregisterConnection(this); + } + } + if (state == State.CLOSED) { logClosed(); @@ -126,7 +136,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, public void setVirtualHost(VirtualHost virtualHost) { _virtualHost = virtualHost; - _virtualHost.getConnectionRegistry().registerConnection(this); initialiseStatistics(); } @@ -253,7 +262,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel, // Ignore } close(replyCode, message); - getVirtualHost().getConnectionRegistry().deregisterConnection(this); } @Override |
