summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2014-01-07 00:57:57 +0000
committerRobert Gemmell <robbie@apache.org>2014-01-07 00:57:57 +0000
commitcddbd677ec47e459f76b83825aa98d3c9ab782dc (patch)
tree7b6189a87a54acd5e28ed64c982c2e9a7e0f9333 /qpid/java/broker-core
parent269b89106b7ac9fe9407bc3becaf80ae944a25d6 (diff)
downloadqpid-python-cddbd677ec47e459f76b83825aa98d3c9ab782dc.tar.gz
QPID-5450: have the group manager try to acquire the message at the time it is accepted into the group, so that the associated shared-group state change occurs within the single synchronization block
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1556096 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java22
3 files changed, 40 insertions, 10 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 2aa3fd6ce8..b002419064 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -733,7 +733,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
&& mightAssign(sub, entry)
&& !sub.wouldSuspend(entry))
{
- if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
+ if (sub.acquires() && !assign(sub, entry))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
@@ -754,10 +754,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private boolean assign(final Subscription sub, final QueueEntry entry)
{
- return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+ if(_messageGroupManager == null)
+ {
+ //no grouping, try to acquire immediately.
+ return entry.acquire(sub);
+ }
+ else
+ {
+ //the group manager is responsible for acquiring the message if/when appropriate
+ return _messageGroupManager.acceptMessage(sub, entry);
+ }
}
-
private boolean mightAssign(final Subscription sub, final QueueEntry entry)
{
if(_messageGroupManager == null || !sub.acquires())
@@ -1645,7 +1653,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
{
if (!sub.wouldSuspend(node))
{
- if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
+ if (sub.acquires() && !assign(sub, node))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
index efedad1181..ae7e11afa4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
@@ -63,6 +63,18 @@ public class AssignedSubscriptionMessageGroupManager implements MessageGroupMana
public boolean acceptMessage(Subscription sub, QueueEntry entry)
{
+ if(assignMessage(sub, entry))
+ {
+ return entry.acquire(sub);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private boolean assignMessage(Subscription sub, QueueEntry entry)
+ {
Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
if(groupVal == null)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
index f38e23b342..55110c46de 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
@@ -136,9 +136,21 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry)
{
+ if(assignMessage(sub, entry))
+ {
+ return entry.acquire(sub);
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private boolean assignMessage(final Subscription sub, final QueueEntry entry)
+ {
Object groupId = getKey(entry);
Group group = _groupMap.get(groupId);
-
+
if(group == null || !group.isValid())
{
group = new Group(groupId, sub);
@@ -152,11 +164,10 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
{
return false;
}
-
}
-
+
Subscription assignedSub = group.getSubscription();
-
+
if(assignedSub == sub)
{
entry.addStateChangeListener(new GroupStateChangeListener(group, entry));
@@ -167,8 +178,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager
return false;
}
}
-
-
+
public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub)
{
EntryFinder visitor = new EntryFinder(sub);