diff options
| author | Robert Gemmell <robbie@apache.org> | 2014-01-07 00:57:57 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2014-01-07 00:57:57 +0000 |
| commit | cddbd677ec47e459f76b83825aa98d3c9ab782dc (patch) | |
| tree | 7b6189a87a54acd5e28ed64c982c2e9a7e0f9333 /qpid/java/broker-core | |
| parent | 269b89106b7ac9fe9407bc3becaf80ae944a25d6 (diff) | |
| download | qpid-python-cddbd677ec47e459f76b83825aa98d3c9ab782dc.tar.gz | |
QPID-5450: have the group manager try to acquire the message at the time it is accepted into the group, so that the associated shared-group state change occurs within the single synchronization block
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556096 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
3 files changed, 40 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 2aa3fd6ce8..b002419064 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -733,7 +733,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes && mightAssign(sub, entry) && !sub.wouldSuspend(entry)) { - if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub))) + if (sub.acquires() && !assign(sub, entry)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage // to acquire the entry for this subscription @@ -754,10 +754,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private boolean assign(final Subscription sub, final QueueEntry entry) { - return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry); + if(_messageGroupManager == null) + { + //no grouping, try to acquire immediately. + return entry.acquire(sub); + } + else + { + //the group manager is responsible for acquiring the message if/when appropriate + return _messageGroupManager.acceptMessage(sub, entry); + } } - private boolean mightAssign(final Subscription sub, final QueueEntry entry) { if(_messageGroupManager == null || !sub.acquires()) @@ -1645,7 +1653,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes { if (!sub.wouldSuspend(node)) { - if (sub.acquires() && !(assign(sub, node) && node.acquire(sub))) + if (sub.acquires() && !assign(sub, node)) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage // to acquire the entry for this subscription diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java index efedad1181..ae7e11afa4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java @@ -63,6 +63,18 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana public boolean acceptMessage(Subscription sub, QueueEntry entry) { + if(assignMessage(sub, entry)) + { + return entry.acquire(sub); + } + else + { + return false; + } + } + + private boolean assignMessage(Subscription sub, QueueEntry entry) + { Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); if(groupVal == null) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java index f38e23b342..55110c46de 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java @@ -136,9 +136,21 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry) { + if(assignMessage(sub, entry)) + { + return entry.acquire(sub); + } + else + { + return false; + } + } + + private boolean assignMessage(final Subscription sub, final QueueEntry entry) + { Object groupId = getKey(entry); Group group = _groupMap.get(groupId); - + if(group == null || !group.isValid()) { group = new Group(groupId, sub); @@ -152,11 +164,10 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager { return false; } - } - + Subscription assignedSub = group.getSubscription(); - + if(assignedSub == sub) { entry.addStateChangeListener(new GroupStateChangeListener(group, entry)); @@ -167,8 +178,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager return false; } } - - + public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub) { EntryFinder visitor = new EntryFinder(sub); |
