summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-03-28 23:18:16 +0000
committerAlan Conway <aconway@apache.org>2011-03-28 23:18:16 +0000
commit73b7ec00e546b27d2bed22d30ad48a8735cfe6f0 (patch)
tree45900f0e0a964b814207fd0adfbacc36b0bbab26 /qpid/java/broker
parentea98033c3df7fbc1364df69c4edb5cf1e808e87e (diff)
downloadqpid-python-73b7ec00e546b27d2bed22d30ad48a8735cfe6f0.tar.gz
Merge branch 'trunk' into qpid-2920
Conflicts: qpid/cpp/src/cluster.mk qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920@1086439 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java91
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java14
2 files changed, 80 insertions, 25 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index b003152db6..0e3f7b2625 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1808,14 +1808,40 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ /**
+ * Used by queue Runners to asynchronously deliver messages to consumers.
+ *
+ * A queue Runner is started whenever a state change occurs, e.g when a new
+ * message arrives on the queue and cannot be immediately delivered to a
+ * subscription (i.e. asynchronous delivery is required). Unless there are
+ * SubFlushRunners operating (due to subscriptions unsuspending) which are
+ * capable of accepting/delivering all messages then these messages would
+ * otherwise remain on the queue.
+ *
+ * processQueue should be running while there are messages on the queue AND
+ * there are subscriptions that can deliver them. If there are no
+ * subscriptions capable of delivering the remaining messages on the queue
+ * then processQueue should stop to prevent spinning.
+ *
+ * Since processQueue is runs in a fixed size Executor, it should not run
+ * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+ * incoming messages may not be able to be scheduled in the thread pool
+ * because all threads are working on clearing down large queues). To solve
+ * this problem, after an arbitrary number of message deliveries the
+ * processQueue job stops iterating, resubmits itself to the executor, and
+ * ends the current instance
+ *
+ * @param runner the Runner to schedule
+ * @throws AMQException
+ */
private void processQueue(Runnable runner) throws AMQException
{
long stateChangeCount;
long previousStateChangeCount = Long.MIN_VALUE;
boolean deliveryIncomplete = true;
- int extraLoops = 1;
- long iterations = MAX_ASYNC_DELIVERIES;
+ boolean lastLoop = false;
+ int iterations = MAX_ASYNC_DELIVERIES;
_asynchronousRunner.compareAndSet(runner, null);
@@ -1832,12 +1858,14 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (previousStateChangeCount != stateChangeCount)
{
- extraLoops = 1;
+ //further asynchronous delivery is required since the
+ //previous loop. keep going if iteration slicing allows.
+ lastLoop = false;
}
previousStateChangeCount = stateChangeCount;
- deliveryIncomplete = _subscriptionList.size() != 0;
- boolean done;
+ boolean allSubscriptionsDone = true;
+ boolean subscriptionDone;
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
//iterate over the subscribers and try to advance their pointer
@@ -1847,30 +1875,25 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
sub.getSendLock();
try
{
-
- done = attemptDelivery(sub);
-
- if (done)
+ //attempt delivery. returns true if no further delivery currently possible to this sub
+ subscriptionDone = attemptDelivery(sub);
+ if (subscriptionDone)
{
- if (extraLoops == 0)
+ //close autoClose subscriptions if we are not currently intent on continuing
+ if (lastLoop && sub.isAutoClose())
{
- deliveryIncomplete = false;
- if (sub.isAutoClose())
- {
- unregisterSubscription(sub);
+ unregisterSubscription(sub);
- sub.confirmAutoClose();
- }
- }
- else
- {
- extraLoops--;
+ sub.confirmAutoClose();
}
}
else
{
+ //this subscription can accept additional deliveries, so we must
+ //keep going after this (if iteration slicing allows it)
+ allSubscriptionsDone = false;
+ lastLoop = false;
iterations--;
- extraLoops = 1;
}
}
finally
@@ -1878,10 +1901,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
sub.releaseSendLock();
}
}
+
+ if(allSubscriptionsDone && lastLoop)
+ {
+ //We have done an extra loop already and there are again
+ //again no further delivery attempts possible, only
+ //keep going if state change demands it.
+ deliveryIncomplete = false;
+ }
+ else if(allSubscriptionsDone)
+ {
+ //All subscriptions reported being done, but we have to do
+ //an extra loop if the iterations are not exhausted and
+ //there is still any work to be done
+ deliveryIncomplete = _subscriptionList.size() != 0;
+ lastLoop = true;
+ }
+ else
+ {
+ //some subscriptions can still accept more messages,
+ //keep going if iteration count allows.
+ lastLoop = false;
+ deliveryIncomplete = true;
+ }
+
_asynchronousRunner.set(null);
}
- // If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
+ // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index 75bd50e3a2..54cd709af3 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -88,8 +88,18 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
_onOpenTask.run();
}
_actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", true, true));
+
+ getVirtualHost().getConnectionRegistry().registerConnection(this);
}
-
+
+ if (state == State.CLOSE_RCVD || state == State.CLOSED || state == State.CLOSING)
+ {
+ if(_virtualHost != null)
+ {
+ _virtualHost.getConnectionRegistry().deregisterConnection(this);
+ }
+ }
+
if (state == State.CLOSED)
{
logClosed();
@@ -126,7 +136,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
public void setVirtualHost(VirtualHost virtualHost)
{
_virtualHost = virtualHost;
- _virtualHost.getConnectionRegistry().registerConnection(this);
initialiseStatistics();
}
@@ -253,7 +262,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
// Ignore
}
close(replyCode, message);
- getVirtualHost().getConnectionRegistry().deregisterConnection(this);
}
@Override