summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-11 13:10:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-11 13:10:36 +0000
commit2c5e10a0f46335acf8b09cebd889d5920cda8d48 (patch)
treeac1141f56aeddf2fc6e406d24e3355484660f19f /java/broker
parentc9e5b84950a75530b82ac72acf71ee3141d8bef1 (diff)
downloadqpid-python-2c5e10a0f46335acf8b09cebd889d5920cda8d48.tar.gz
QPID-3720 : Add alternative (C++ style) grouping and apply comments from Robbie Gemmel
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1229996 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java88
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java150
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java270
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java125
4 files changed, 489 insertions, 144 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index f1efbc575a..c5aee5dbe3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -42,9 +42,7 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.MessageGroupManager;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.subscription.*;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -67,10 +65,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
+public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+ private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+ private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group";
+ private static final String QPID_NO_GROUP = "qpid.no-group";
+ // TODO - should make this configurable at the vhost / broker level
+ private static final int DEFAULT_MAX_GROUPS = 255;
private final VirtualHost _virtualHost;
@@ -265,7 +268,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
{
- _messageGroupManager = new MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255);
+ if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
+ {
+ String defaultGroup = String.valueOf(arguments.get(QPID_DEFAULT_MESSAGE_GROUP));
+ _messageGroupManager =
+ new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
+ defaultGroup == null ? QPID_NO_GROUP : defaultGroup,
+ this);
+ }
+ else
+ {
+ _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS);
+ }
}
else
{
@@ -488,28 +502,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(_messageGroupManager != null)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
- _messageGroupManager.clearAssignments(subscription);
-
- if(entry != null)
- {
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- Subscription sub = subscriberIter.getNode().getSubscription();
-
- // we don't make browsers send the same stuff twice
- if (sub.seesRequeues())
- {
- updateSubRequeueEntry(sub, entry);
- }
- }
-
- deliverAsync();
-
- }
-
+ resetSubPointersForGroups(subscription, true);
}
// auto-delete queues must be deleted if there are no remaining subscribers
@@ -531,6 +524,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
+ {
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ if(clearAssignments)
+ {
+ _messageGroupManager.clearAssignments(subscription);
+ }
+
+ if(entry != null)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues())
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+
+ }
+ }
+
public boolean getDeleteOnNoConsumers()
{
return _deleteOnNoConsumers;
@@ -1855,6 +1876,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub)
+ {
+ QueueContext context = (QueueContext) sub.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry releasedNode = context._releasedEntry;
+ return releasedNode == null || releasedNode.compareTo(entry) < 0;
+ }
+ else
+ {
+ return false;
+ }
+ }
/**
* Used by queue Runners to asynchronously deliver messages to consumers.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
new file mode 100644
index 0000000000..f511cc0dc9
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class AssignedSubscriptionMessageGroupManager implements MessageGroupManager
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AssignedSubscriptionMessageGroupManager.class);
+
+
+ private final String _groupId;
+ private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
+ private final int _groupMask;
+
+ public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups)
+ {
+ _groupId = groupId;
+ _groupMask = pow2(maxGroups)-1;
+ }
+
+ private static int pow2(final int i)
+ {
+ int val = 1;
+ while(val < i) val<<=1;
+ return val;
+ }
+
+ public Subscription getAssignedSubscription(final QueueEntry entry)
+ {
+ Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
+ }
+
+ public boolean acceptMessage(Subscription sub, QueueEntry entry)
+ {
+ Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupVal == null)
+ {
+ return true;
+ }
+ else
+ {
+ Integer group = groupVal.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == sub)
+ {
+ return true;
+ }
+ else
+ {
+ if(assignedSub == null)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Assigning group " + groupVal + " to sub " + sub);
+ }
+ assignedSub = _groupMap.putIfAbsent(group, sub);
+ return assignedSub == null || assignedSub == sub;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ }
+
+ public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+ {
+ EntryFinder visitor = new EntryFinder(sub);
+ sub.getQueue().visit(visitor);
+ return visitor.getEntry();
+ }
+
+ private class EntryFinder implements AMQQueue.Visitor
+ {
+ private QueueEntry _entry;
+ private Subscription _sub;
+
+ public EntryFinder(final Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public boolean visit(final QueueEntry entry)
+ {
+ if(!entry.isAvailable())
+ return false;
+
+ Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupId == null)
+ return false;
+
+ Integer group = groupId.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == _sub)
+ {
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public QueueEntry getEntry()
+ {
+ return _entry;
+ }
+ }
+
+ public void clearAssignments(Subscription sub)
+ {
+ Iterator<Subscription> subIter = _groupMap.values().iterator();
+ while(subIter.hasNext())
+ {
+ if(subIter.next() == sub)
+ {
+ subIter.remove();
+ }
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
new file mode 100644
index 0000000000..42818db214
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
@@ -0,0 +1,270 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefinedGroupMessageGroupManager implements MessageGroupManager
+{
+ private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
+
+ private final String _groupId;
+ private final String _defaultGroup;
+ private final Map<Object, Group> _groupMap = new HashMap<Object, Group>();
+ private final SubscriptionResetHelper _resetHelper;
+
+ private final class Group
+ {
+ private final Object _group;
+ private Subscription _subscription;
+ private int _activeCount;
+
+ private Group(final Object key, final Subscription subscription)
+ {
+ _group = key;
+ _subscription = subscription;
+ }
+
+ public boolean add()
+ {
+ if(_subscription != null)
+ {
+ _activeCount++;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public void subtract()
+ {
+ if(--_activeCount == 0)
+ {
+ _resetHelper.resetSubPointersForGroups(_subscription, false);
+ _subscription = null;
+ _groupMap.remove(_group);
+ }
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ Group group = (Group) o;
+
+ return _group.equals(group._group);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _group.hashCode();
+ }
+
+ public boolean isValid()
+ {
+ return !(_subscription == null || (_activeCount == 0 && _subscription.isClosed()));
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Group{" +
+ "_group=" + _group +
+ ", _subscription=" + _subscription +
+ ", _activeCount=" + _activeCount +
+ '}';
+ }
+ }
+
+ public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, SubscriptionResetHelper resetHelper)
+ {
+ _groupId = groupId;
+ _defaultGroup = defaultGroup;
+ _resetHelper = resetHelper;
+ }
+
+ public synchronized Subscription getAssignedSubscription(final QueueEntry entry)
+ {
+ Object groupId = getKey(entry);
+
+ Group group = _groupMap.get(groupId);
+ return group == null || !group.isValid() ? null : group.getSubscription();
+ }
+
+ public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry)
+ {
+ Object groupId = getKey(entry);
+ Group group = _groupMap.get(groupId);
+
+ if(group == null || !group.isValid())
+ {
+ group = new Group(groupId, sub);
+
+ _groupMap.put(groupId, group);
+
+ // there's a small change that the group became empty between the point at which getNextAvailable() was
+ // called on the subscription, and when accept message is called... in that case we want to avoid delivering
+ // out of order
+ if(_resetHelper.isEntryAheadOfSubscription(entry, sub))
+ {
+ return false;
+ }
+
+ }
+
+ Subscription assignedSub = group.getSubscription();
+
+ if(assignedSub == sub)
+ {
+ entry.addStateChangeListener(new GroupStateChangeListener(group, entry));
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+
+ public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub)
+ {
+ EntryFinder visitor = new EntryFinder(sub);
+ sub.getQueue().visit(visitor);
+ _logger.debug("Earliest available entry for " + sub + " is " + visitor.getEntry() + (visitor.getEntry() == null ? "" : " : " + getKey(visitor.getEntry())));
+ return visitor.getEntry();
+ }
+
+ private class EntryFinder implements AMQQueue.Visitor
+ {
+ private QueueEntry _entry;
+ private Subscription _sub;
+
+ public EntryFinder(final Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public boolean visit(final QueueEntry entry)
+ {
+ if(!entry.isAvailable())
+ return false;
+
+ Object groupId = getKey(entry);
+
+ Group group = _groupMap.get(groupId);
+ if(group != null && group.getSubscription() == _sub)
+ {
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public QueueEntry getEntry()
+ {
+ return _entry;
+ }
+ }
+
+
+ public void clearAssignments(final Subscription sub)
+ {
+ }
+
+ private Object getKey(QueueEntry entry)
+ {
+ ServerMessage message = entry.getMessage();
+ AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
+ Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
+ if(groupVal == null)
+ {
+ groupVal = _defaultGroup;
+ }
+ return groupVal;
+ }
+
+ private class GroupStateChangeListener implements QueueEntry.StateChangeListener
+ {
+ private final Group _group;
+
+ public GroupStateChangeListener(final Group group,
+ final QueueEntry entry)
+ {
+ _group = group;
+ }
+
+ public void stateChanged(final QueueEntry entry,
+ final QueueEntry.State oldState,
+ final QueueEntry.State newState)
+ {
+ synchronized (DefinedGroupMessageGroupManager.this)
+ {
+ if(_group.isValid())
+ {
+ if(oldState != newState)
+ {
+ if(newState == QueueEntry.State.ACQUIRED)
+ {
+ _logger.debug("Adding to " + _group);
+ _group.add();
+ }
+ else if(oldState == QueueEntry.State.ACQUIRED)
+ {
+ _logger.debug("Subtracting from " + _group);
+ _group.subtract();
+ }
+ }
+ }
+ else
+ {
+ entry.removeStateChangeListener(this);
+ }
+ }
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
index 1999d655c9..8ce4ce3344 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
@@ -20,131 +20,22 @@
*/
package org.apache.qpid.server.subscription;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class MessageGroupManager
+public interface MessageGroupManager
{
- private static final Logger _logger = LoggerFactory.getLogger(MessageGroupManager.class);
-
-
- private final String _groupId;
- private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
- private final int _groupMask;
-
- public MessageGroupManager(final String groupId, final int maxGroups)
- {
- _groupId = groupId;
- _groupMask = pow2(maxGroups)-1;
- }
-
- private static int pow2(final int i)
- {
- int val = 1;
- while(val < i) val<<=1;
- return val;
- }
-
- public Subscription getAssignedSubscription(final QueueEntry entry)
+ public interface SubscriptionResetHelper
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
- return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
- }
+ public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments);
- public boolean acceptMessage(Subscription sub, QueueEntry entry)
- {
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
- if(groupVal == null)
- {
- return true;
- }
- else
- {
- Integer group = groupVal.hashCode() & _groupMask;
- Subscription assignedSub = _groupMap.get(group);
- if(assignedSub == sub)
- {
- return true;
- }
- else
- {
- if(assignedSub == null)
- {
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Assigning group " + groupVal + " to sub " + sub);
- }
- assignedSub = _groupMap.putIfAbsent(group, sub);
- return assignedSub == null || assignedSub == sub;
- }
- else
- {
- return false;
- }
- }
- }
- }
-
- public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
- {
- EntryFinder visitor = new EntryFinder(sub);
- sub.getQueue().visit(visitor);
- return visitor.getEntry();
+ boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub);
}
- private class EntryFinder implements AMQQueue.Visitor
- {
- private QueueEntry _entry;
- private Subscription _sub;
-
- public EntryFinder(final Subscription sub)
- {
- _sub = sub;
- }
+ Subscription getAssignedSubscription(QueueEntry entry);
- public boolean visit(final QueueEntry entry)
- {
- if(!entry.isAvailable())
- return false;
+ boolean acceptMessage(Subscription sub, QueueEntry entry);
- Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
- if(groupId == null)
- return false;
+ QueueEntry findEarliestAssignedAvailableEntry(Subscription sub);
- Integer group = groupId.hashCode() & _groupMask;
- Subscription assignedSub = _groupMap.get(group);
- if(assignedSub == _sub)
- {
- _entry = entry;
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public QueueEntry getEntry()
- {
- return _entry;
- }
- }
-
- public void clearAssignments(Subscription sub)
- {
- Iterator<Subscription> subIter = _groupMap.values().iterator();
- while(subIter.hasNext())
- {
- if(subIter.next() == sub)
- {
- subIter.remove();
- }
- }
- }
+ void clearAssignments(Subscription sub);
}