summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-11 13:10:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-11 13:10:36 +0000
commit2c5e10a0f46335acf8b09cebd889d5920cda8d48 (patch)
treeac1141f56aeddf2fc6e406d24e3355484660f19f /java
parentc9e5b84950a75530b82ac72acf71ee3141d8bef1 (diff)
downloadqpid-python-2c5e10a0f46335acf8b09cebd889d5920cda8d48.tar.gz
QPID-3720 : Add alternative (C++ style) grouping and apply comments from Robbie Gemmel
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1229996 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java88
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java150
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java270
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java125
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java263
-rwxr-xr-xjava/test-profiles/CPPExcludes6
6 files changed, 693 insertions, 209 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index f1efbc575a..c5aee5dbe3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -42,9 +42,7 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.subscription.MessageGroupManager;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
+import org.apache.qpid.server.subscription.*;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -67,10 +65,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
+public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, MessageGroupManager.SubscriptionResetHelper
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
+ private static final String QPID_SHARED_MSG_GROUP = "qpid.shared_msg_group";
+ private static final String QPID_DEFAULT_MESSAGE_GROUP = "qpid.default-message-group";
+ private static final String QPID_NO_GROUP = "qpid.no-group";
+ // TODO - should make this configurable at the vhost / broker level
+ private static final int DEFAULT_MAX_GROUPS = 255;
private final VirtualHost _virtualHost;
@@ -265,7 +268,18 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
{
- _messageGroupManager = new MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255);
+ if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
+ {
+ String defaultGroup = String.valueOf(arguments.get(QPID_DEFAULT_MESSAGE_GROUP));
+ _messageGroupManager =
+ new DefinedGroupMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)),
+ defaultGroup == null ? QPID_NO_GROUP : defaultGroup,
+ this);
+ }
+ else
+ {
+ _messageGroupManager = new AssignedSubscriptionMessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), DEFAULT_MAX_GROUPS);
+ }
}
else
{
@@ -488,28 +502,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if(_messageGroupManager != null)
{
- QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
- _messageGroupManager.clearAssignments(subscription);
-
- if(entry != null)
- {
- SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while (subscriberIter.advance())
- {
- Subscription sub = subscriberIter.getNode().getSubscription();
-
- // we don't make browsers send the same stuff twice
- if (sub.seesRequeues())
- {
- updateSubRequeueEntry(sub, entry);
- }
- }
-
- deliverAsync();
-
- }
-
+ resetSubPointersForGroups(subscription, true);
}
// auto-delete queues must be deleted if there are no remaining subscribers
@@ -531,6 +524,34 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments)
+ {
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ if(clearAssignments)
+ {
+ _messageGroupManager.clearAssignments(subscription);
+ }
+
+ if(entry != null)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues())
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+
+ }
+ }
+
public boolean getDeleteOnNoConsumers()
{
return _deleteOnNoConsumers;
@@ -1855,6 +1876,19 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ public boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub)
+ {
+ QueueContext context = (QueueContext) sub.getQueueContext();
+ if(context != null)
+ {
+ QueueEntry releasedNode = context._releasedEntry;
+ return releasedNode == null || releasedNode.compareTo(entry) < 0;
+ }
+ else
+ {
+ return false;
+ }
+ }
/**
* Used by queue Runners to asynchronously deliver messages to consumers.
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
new file mode 100644
index 0000000000..f511cc0dc9
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class AssignedSubscriptionMessageGroupManager implements MessageGroupManager
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AssignedSubscriptionMessageGroupManager.class);
+
+
+ private final String _groupId;
+ private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
+ private final int _groupMask;
+
+ public AssignedSubscriptionMessageGroupManager(final String groupId, final int maxGroups)
+ {
+ _groupId = groupId;
+ _groupMask = pow2(maxGroups)-1;
+ }
+
+ private static int pow2(final int i)
+ {
+ int val = 1;
+ while(val < i) val<<=1;
+ return val;
+ }
+
+ public Subscription getAssignedSubscription(final QueueEntry entry)
+ {
+ Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
+ }
+
+ public boolean acceptMessage(Subscription sub, QueueEntry entry)
+ {
+ Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupVal == null)
+ {
+ return true;
+ }
+ else
+ {
+ Integer group = groupVal.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == sub)
+ {
+ return true;
+ }
+ else
+ {
+ if(assignedSub == null)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Assigning group " + groupVal + " to sub " + sub);
+ }
+ assignedSub = _groupMap.putIfAbsent(group, sub);
+ return assignedSub == null || assignedSub == sub;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ }
+
+ public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+ {
+ EntryFinder visitor = new EntryFinder(sub);
+ sub.getQueue().visit(visitor);
+ return visitor.getEntry();
+ }
+
+ private class EntryFinder implements AMQQueue.Visitor
+ {
+ private QueueEntry _entry;
+ private Subscription _sub;
+
+ public EntryFinder(final Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public boolean visit(final QueueEntry entry)
+ {
+ if(!entry.isAvailable())
+ return false;
+
+ Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupId == null)
+ return false;
+
+ Integer group = groupId.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == _sub)
+ {
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public QueueEntry getEntry()
+ {
+ return _entry;
+ }
+ }
+
+ public void clearAssignments(Subscription sub)
+ {
+ Iterator<Subscription> subIter = _groupMap.values().iterator();
+ while(subIter.hasNext())
+ {
+ if(subIter.next() == sub)
+ {
+ subIter.remove();
+ }
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
new file mode 100644
index 0000000000..42818db214
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
@@ -0,0 +1,270 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefinedGroupMessageGroupManager implements MessageGroupManager
+{
+ private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class);
+
+ private final String _groupId;
+ private final String _defaultGroup;
+ private final Map<Object, Group> _groupMap = new HashMap<Object, Group>();
+ private final SubscriptionResetHelper _resetHelper;
+
+ private final class Group
+ {
+ private final Object _group;
+ private Subscription _subscription;
+ private int _activeCount;
+
+ private Group(final Object key, final Subscription subscription)
+ {
+ _group = key;
+ _subscription = subscription;
+ }
+
+ public boolean add()
+ {
+ if(_subscription != null)
+ {
+ _activeCount++;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public void subtract()
+ {
+ if(--_activeCount == 0)
+ {
+ _resetHelper.resetSubPointersForGroups(_subscription, false);
+ _subscription = null;
+ _groupMap.remove(_group);
+ }
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ Group group = (Group) o;
+
+ return _group.equals(group._group);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _group.hashCode();
+ }
+
+ public boolean isValid()
+ {
+ return !(_subscription == null || (_activeCount == 0 && _subscription.isClosed()));
+ }
+
+ public Subscription getSubscription()
+ {
+ return _subscription;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Group{" +
+ "_group=" + _group +
+ ", _subscription=" + _subscription +
+ ", _activeCount=" + _activeCount +
+ '}';
+ }
+ }
+
+ public DefinedGroupMessageGroupManager(final String groupId, String defaultGroup, SubscriptionResetHelper resetHelper)
+ {
+ _groupId = groupId;
+ _defaultGroup = defaultGroup;
+ _resetHelper = resetHelper;
+ }
+
+ public synchronized Subscription getAssignedSubscription(final QueueEntry entry)
+ {
+ Object groupId = getKey(entry);
+
+ Group group = _groupMap.get(groupId);
+ return group == null || !group.isValid() ? null : group.getSubscription();
+ }
+
+ public synchronized boolean acceptMessage(final Subscription sub, final QueueEntry entry)
+ {
+ Object groupId = getKey(entry);
+ Group group = _groupMap.get(groupId);
+
+ if(group == null || !group.isValid())
+ {
+ group = new Group(groupId, sub);
+
+ _groupMap.put(groupId, group);
+
+ // there's a small change that the group became empty between the point at which getNextAvailable() was
+ // called on the subscription, and when accept message is called... in that case we want to avoid delivering
+ // out of order
+ if(_resetHelper.isEntryAheadOfSubscription(entry, sub))
+ {
+ return false;
+ }
+
+ }
+
+ Subscription assignedSub = group.getSubscription();
+
+ if(assignedSub == sub)
+ {
+ entry.addStateChangeListener(new GroupStateChangeListener(group, entry));
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+
+ public synchronized QueueEntry findEarliestAssignedAvailableEntry(final Subscription sub)
+ {
+ EntryFinder visitor = new EntryFinder(sub);
+ sub.getQueue().visit(visitor);
+ _logger.debug("Earliest available entry for " + sub + " is " + visitor.getEntry() + (visitor.getEntry() == null ? "" : " : " + getKey(visitor.getEntry())));
+ return visitor.getEntry();
+ }
+
+ private class EntryFinder implements AMQQueue.Visitor
+ {
+ private QueueEntry _entry;
+ private Subscription _sub;
+
+ public EntryFinder(final Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public boolean visit(final QueueEntry entry)
+ {
+ if(!entry.isAvailable())
+ return false;
+
+ Object groupId = getKey(entry);
+
+ Group group = _groupMap.get(groupId);
+ if(group != null && group.getSubscription() == _sub)
+ {
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public QueueEntry getEntry()
+ {
+ return _entry;
+ }
+ }
+
+
+ public void clearAssignments(final Subscription sub)
+ {
+ }
+
+ private Object getKey(QueueEntry entry)
+ {
+ ServerMessage message = entry.getMessage();
+ AMQMessageHeader messageHeader = message == null ? null : message.getMessageHeader();
+ Object groupVal = messageHeader == null ? _defaultGroup : messageHeader.getHeader(_groupId);
+ if(groupVal == null)
+ {
+ groupVal = _defaultGroup;
+ }
+ return groupVal;
+ }
+
+ private class GroupStateChangeListener implements QueueEntry.StateChangeListener
+ {
+ private final Group _group;
+
+ public GroupStateChangeListener(final Group group,
+ final QueueEntry entry)
+ {
+ _group = group;
+ }
+
+ public void stateChanged(final QueueEntry entry,
+ final QueueEntry.State oldState,
+ final QueueEntry.State newState)
+ {
+ synchronized (DefinedGroupMessageGroupManager.this)
+ {
+ if(_group.isValid())
+ {
+ if(oldState != newState)
+ {
+ if(newState == QueueEntry.State.ACQUIRED)
+ {
+ _logger.debug("Adding to " + _group);
+ _group.add();
+ }
+ else if(oldState == QueueEntry.State.ACQUIRED)
+ {
+ _logger.debug("Subtracting from " + _group);
+ _group.subtract();
+ }
+ }
+ }
+ else
+ {
+ entry.removeStateChangeListener(this);
+ }
+ }
+ }
+ }
+}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
index 1999d655c9..8ce4ce3344 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
@@ -20,131 +20,22 @@
*/
package org.apache.qpid.server.subscription;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class MessageGroupManager
+public interface MessageGroupManager
{
- private static final Logger _logger = LoggerFactory.getLogger(MessageGroupManager.class);
-
-
- private final String _groupId;
- private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
- private final int _groupMask;
-
- public MessageGroupManager(final String groupId, final int maxGroups)
- {
- _groupId = groupId;
- _groupMask = pow2(maxGroups)-1;
- }
-
- private static int pow2(final int i)
- {
- int val = 1;
- while(val < i) val<<=1;
- return val;
- }
-
- public Subscription getAssignedSubscription(final QueueEntry entry)
+ public interface SubscriptionResetHelper
{
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
- return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
- }
+ public void resetSubPointersForGroups(Subscription subscription, boolean clearAssignments);
- public boolean acceptMessage(Subscription sub, QueueEntry entry)
- {
- Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
- if(groupVal == null)
- {
- return true;
- }
- else
- {
- Integer group = groupVal.hashCode() & _groupMask;
- Subscription assignedSub = _groupMap.get(group);
- if(assignedSub == sub)
- {
- return true;
- }
- else
- {
- if(assignedSub == null)
- {
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Assigning group " + groupVal + " to sub " + sub);
- }
- assignedSub = _groupMap.putIfAbsent(group, sub);
- return assignedSub == null || assignedSub == sub;
- }
- else
- {
- return false;
- }
- }
- }
- }
-
- public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
- {
- EntryFinder visitor = new EntryFinder(sub);
- sub.getQueue().visit(visitor);
- return visitor.getEntry();
+ boolean isEntryAheadOfSubscription(QueueEntry entry, Subscription sub);
}
- private class EntryFinder implements AMQQueue.Visitor
- {
- private QueueEntry _entry;
- private Subscription _sub;
-
- public EntryFinder(final Subscription sub)
- {
- _sub = sub;
- }
+ Subscription getAssignedSubscription(QueueEntry entry);
- public boolean visit(final QueueEntry entry)
- {
- if(!entry.isAvailable())
- return false;
+ boolean acceptMessage(Subscription sub, QueueEntry entry);
- Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
- if(groupId == null)
- return false;
+ QueueEntry findEarliestAssignedAvailableEntry(Subscription sub);
- Integer group = groupId.hashCode() & _groupMask;
- Subscription assignedSub = _groupMap.get(group);
- if(assignedSub == _sub)
- {
- _entry = entry;
- return true;
- }
- else
- {
- return false;
- }
- }
-
- public QueueEntry getEntry()
- {
- return _entry;
- }
- }
-
- public void clearAssignments(Subscription sub)
- {
- Iterator<Subscription> subIter = _groupMap.values().iterator();
- while(subIter.hasNext())
- {
- if(subIter.next() == sub)
- {
- subIter.remove();
- }
- }
- }
+ void clearAssignments(Subscription sub);
}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
index dc29ef378e..08a932eba1 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
@@ -34,18 +34,13 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.naming.NamingException;
import java.util.HashMap;
import java.util.Map;
public class MessageGroupQueueTest extends QpidBrokerTestCase
{
- private static final int TIMEOUT = 1500;
-
protected final String QUEUE = "MessageGroupQueue";
- private static final int MSG_COUNT = 50;
-
private Connection producerConnection;
private MessageProducer producer;
private Session producerSession;
@@ -73,38 +68,53 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
super.tearDown();
}
+
+ public void testSimpleGroupAssignment() throws Exception
+ {
+ simpleGroupAssignment(false);
+ }
+
+ public void testSharedGroupSimpleGroupAssignment() throws Exception
+ {
+ simpleGroupAssignment(true);
+ }
+
+
/**
* Pre populate the queue with messages with groups as follows
- *
+ *
* ONE
* TWO
* ONE
* TWO
- *
+ *
* Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second
* consumer assigned group TWO if they are started in sequence.
- *
+ *
* Thus doing
- *
+ *
* c1 <--- (ONE)
* c2 <--- (TWO)
* c2 ack --->
- *
+ *
* c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE)
- *
+ *
* i.e.
- *
+ *
* c2 <--- (TWO)
* c2 ack --->
* c1 <--- (ONE)
* c1 ack --->
- *
+ *
*/
- public void testSimpleGroupAssignment() throws Exception
+ private void simpleGroupAssignment(boolean sharedGroups) throws AMQException, JMSException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.group_header_key","group");
- arguments.put("qpid.shared_msg_group","1");
+ if(sharedGroups)
+ {
+ arguments.put("qpid.shared_msg_group","1");
+ }
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
@@ -112,7 +122,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
producer = producerSession.createProducer(queue);
String[] groups = { "ONE", "TWO"};
-
+
for (int msg = 0; msg < 4; msg++)
{
producer.send(createMessage(msg, groups[msg % groups.length]));
@@ -125,7 +135,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
-
+
MessageConsumer consumer1 = cs1.createConsumer(queue);
MessageConsumer consumer2 = cs2.createConsumer(queue);
@@ -154,33 +164,47 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
cs1Received2.acknowledge();
cs2Received2.acknowledge();
-
+
assertNull(consumer1.receive(1000));
assertNull(consumer2.receive(1000));
}
+
+ public void testConsumerCloseGroupAssignment() throws Exception
+ {
+ consumerCloseGroupAssignment(false);
+ }
+
+ public void testSharedGroupConsumerCloseGroupAssignment() throws Exception
+ {
+ consumerCloseGroupAssignment(true);
+ }
+
/**
- *
+ *
* Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different
* consumer.
- *
+ *
* Pre-populate the queue as ONE, ONE, TWO, ONE
- *
+ *
* create in sequence two consumers
- *
+ *
* receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
- *
+ *
* Then close c1 before acking.
- *
+ *
* If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which
* requires c2 to go "backwards" in the queue).
- *
- * */
- public void testConsumerCloseGroupAssignment() throws Exception
+ *
+ **/
+ private void consumerCloseGroupAssignment(boolean sharedGroups) throws AMQException, JMSException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.group_header_key","group");
- arguments.put("qpid.shared_msg_group","1");
+ if(sharedGroups)
+ {
+ arguments.put("qpid.shared_msg_group","1");
+ }
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
@@ -197,9 +221,8 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
producerSession.close();
producerConnection.close();
- Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
- Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
-
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
MessageConsumer consumer1 = cs1.createConsumer(queue);
@@ -208,40 +231,54 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
Message cs1Received = consumer1.receive(1000);
assertNotNull("Consumer 1 should have received first message", cs1Received);
+ assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
Message cs2Received = consumer2.receive(1000);
assertNotNull("Consumer 2 should have received first message", cs2Received);
- cs2Received.acknowledge();
+ assertEquals("incorrect message received", 3, cs2Received.getIntProperty("msg"));
+ cs2.commit();
Message cs2Received2 = consumer2.receive(1000);
- assertNull("Consumer 2 should not have received second message", cs2Received2);
+ assertNull("Consumer 2 should not yet have received a second message", cs2Received2);
consumer1.close();
- cs1Received.acknowledge();
+ cs1.commit();
Message cs2Received3 = consumer2.receive(1000);
assertNotNull("Consumer 2 should have received second message", cs2Received3);
- assertEquals("Unexpected group", cs2Received3.getStringProperty("group"),
- "ONE");
+ assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group"));
+ assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg"));
- cs2Received3.acknowledge();
+ cs2.commit();
Message cs2Received4 = consumer2.receive(1000);
assertNotNull("Consumer 2 should have received third message", cs2Received4);
- assertEquals("Unexpected group", cs2Received4.getStringProperty("group"),
- "ONE");
-
- cs2Received4.acknowledge();
+ assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group"));
+ assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg"));
+ cs2.commit();
assertNull(consumer2.receive(1000));
}
+
+
+ public void testConsumerCloseWithRelease() throws Exception
+ {
+ consumerCloseWithRelease(false);
+ }
+
+ public void testSharedGroupConsumerCloseWithRelease() throws Exception
+ {
+ consumerCloseWithRelease(true);
+ }
+
+
/**
*
* Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned
@@ -259,12 +296,14 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
* requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered
*
*/
-
- public void testConsumerCloseWithRelease() throws Exception
+ private void consumerCloseWithRelease(boolean sharedGroups) throws AMQException, JMSException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.group_header_key","group");
- arguments.put("qpid.shared_msg_group","1");
+ if(sharedGroups)
+ {
+ arguments.put("qpid.shared_msg_group","1");
+ }
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
@@ -282,61 +321,155 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
producerSession.close();
producerConnection.close();
- Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
- Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
MessageConsumer consumer1 = cs1.createConsumer(queue);
- MessageConsumer consumer2 = cs2.createConsumer(queue);
consumerConnection.start();
+
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
Message cs1Received = consumer1.receive(1000);
- assertNotNull("Consumer 1 should have received first message", cs1Received);
+ assertNotNull("Consumer 1 should have received its first message", cs1Received);
+ assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
Message received = consumer2.receive(1000);
- assertNotNull("Consumer 2 should have received first message", received);
- Message first = received;
+ assertNotNull("Consumer 2 should have received its first message", received);
+ assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
received = consumer2.receive(1000);
- assertNull("Consumer 2 should not have received second message", received);
+ assertNull("Consumer 2 should not yet have received second message", received);
consumer1.close();
cs1.close();
- first.acknowledge();
+ cs2.commit();
received = consumer2.receive(1000);
- assertNotNull("Consumer 2 should have received second message", received);
- assertEquals("Unexpected group", received.getStringProperty("group"),
- "ONE");
+ assertNotNull("Consumer 2 should now have received second message", received);
+ assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
received.getJMSRedelivered());
- received.acknowledge();
+ cs2.commit();
received = consumer2.receive(1000);
- assertNotNull("Consumer 2 should have received third message", received);
- assertEquals("Unexpected group", received.getStringProperty("group"),
- "ONE");
+ assertNotNull("Consumer 2 should have received a third message", received);
+ assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
- received.acknowledge();
+ cs2.commit();
received = consumer2.receive(1000);
- assertNotNull("Consumer 2 should have received fourth message", received);
- assertEquals("Unexpected group", received.getStringProperty("group"),
- "ONE");
+ assertNotNull("Consumer 2 should have received a fourth message", received);
+ assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
- received.acknowledge();
+ cs2.commit();
assertNull(consumer2.receive(1000));
}
-
+ public void testGroupAssignmentSurvivesEmpty() throws JMSException, AMQException
+ {
+ groupAssignmentOnEmpty(false);
+ }
+
+ public void testSharedGroupAssignmentDoesNotSurviveEmpty() throws JMSException, AMQException
+ {
+ groupAssignmentOnEmpty(true);
+ }
+
+ private void groupAssignmentOnEmpty(boolean sharedGroups) throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.group_header_key","group");
+ if(sharedGroups)
+ {
+ arguments.put("qpid.shared_msg_group","1");
+ }
+
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ producer.send(createMessage(1, "ONE"));
+ producer.send(createMessage(2, "TWO"));
+ producer.send(createMessage(3, "THREE"));
+ producer.send(createMessage(4, "ONE"));
+
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+
+ consumerConnection.start();
+
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ Message received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its first message", received);
+ assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
+
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received its first message", received);
+ assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
+
+ cs1.commit();
+
+ received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its second message", received);
+ assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
+
+ // We expect different behaviours from "shared groups": here the assignment of a subscription to a group
+ // is terminated when there are no outstanding delivered but unacknowledged messages. In contrast, with a
+ // standard message grouping queue the assignment will be retained until the subscription is no longer
+ // registered
+ if(sharedGroups)
+ {
+ cs2.commit();
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received its second message", received);
+ assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
+
+ cs2.commit();
+ }
+ else
+ {
+ cs2.commit();
+ received = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not have received a second message", received);
+
+ cs1.commit();
+
+ received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its third message", received);
+ assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
+
+ }
+
+ }
+
+
private Message createMessage(int msg, String group) throws JMSException
{
Message send = producerSession.createTextMessage("Message: " + msg);
diff --git a/java/test-profiles/CPPExcludes b/java/test-profiles/CPPExcludes
index 66a20bcfc1..d8c463b810 100755
--- a/java/test-profiles/CPPExcludes
+++ b/java/test-profiles/CPPExcludes
@@ -172,3 +172,9 @@ org.apache.qpid.server.management.AMQUserManagementMBeanTest#*
// QPID-3133: On 0-10, the exception listener is currently not invoked when reconnection fails to occurs.
org.apache.qpid.server.failover.FailoverMethodTest#*
+// CPP Broker does not implement non-"shared group" message groups
+org.apache.qpid.server.queue.MessageGroupQueueTest#testSimpleGroupAssignment
+org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseGroupAssignment
+org.apache.qpid.server.queue.MessageGroupQueueTest#testConsumerCloseWithRelease
+org.apache.qpid.server.queue.MessageGroupQueueTest#testGroupAssignmentSurvivesEmpty
+