summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-03 19:48:46 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-03 19:48:46 +0000
commit02bbab932f5f845bfa8eac6069bc4159bbe53d07 (patch)
tree7a31c2804f9b9fa5f1dbabc80cbe219d9e735890 /java/broker/src
parent7b0c33ff443deb937d26f07c039bd483e9bcbe29 (diff)
downloadqpid-python-02bbab932f5f845bfa8eac6069bc4159bbe53d07.tar.gz
QPID-3720 : [Java Broker] Implement Message Grouping
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1226930 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java127
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java150
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java6
8 files changed, 301 insertions, 44 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index a1f1c037ec..32d9c4878a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -149,7 +149,13 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer
void removeMessagesFromQueue(long fromMessageId, long toMessageId);
-
+ static interface Visitor
+ {
+ boolean visit(QueueEntry entry);
+ }
+
+ void visit(Visitor visitor);
+
long getMaximumMessageSize();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 37fad54c07..142cfddb39 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -75,6 +75,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.AVAILABLE;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -85,6 +90,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.DEQUEUED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -95,6 +105,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.DELETED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
public final class ExpiredState extends EntryState
@@ -104,6 +119,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.EXPIRED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
@@ -113,6 +133,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return State.ACQUIRED;
}
+
+ public String toString()
+ {
+ return getState().name();
+ }
}
public final class SubscriptionAcquiredState extends EntryState
@@ -134,6 +159,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return _subscription;
}
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
}
public final class SubscriptionAssignedState extends EntryState
@@ -155,6 +185,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable
{
return _subscription;
}
+
+
+ public String toString()
+ {
+ return "{" + getState().name() + " : " + _subscription +"}";
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
index 77c4b912e0..641aaa0a08 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
@@ -35,4 +35,6 @@ public interface QueueEntryList<Q extends QueueEntry>
Q getHead();
void entryDeleted(Q queueEntry);
+
+ int getPriorities();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index ebed781a1a..d48445930a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -25,7 +25,6 @@ import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReadWriteRunnable;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
-import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.binding.Binding;
@@ -33,7 +32,6 @@ import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -45,6 +43,7 @@ import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.AuthorizationHolder;
+import org.apache.qpid.server.subscription.MessageGroupManager;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.txn.AutoCommitTransaction;
@@ -68,11 +67,11 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
+ private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key";
private final VirtualHost _virtualHost;
@@ -189,6 +188,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
+ private final MessageGroupManager _messageGroupManager;
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
{
@@ -242,25 +242,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_logSubject = new QueueLogSubject(this);
_logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger());
- // Log the correct creation message
-
- // Extract the number of priorities for this Queue.
- // Leave it as 0 if we are a SimpleQueueEntryList
- int priorities = 0;
- if (entryListFactory instanceof PriorityQueueList.Factory)
- {
- priorities = ((PriorityQueueList)_entries).getPriorities();
- }
-
// Log the creation of this Queue.
// The priorities display is toggled on if we set priorities > 0
CurrentActor.get().message(_logSubject,
QueueMessages.CREATED(String.valueOf(_owner),
- priorities,
- _owner != null,
- autoDelete,
- durable, !durable,
- priorities > 0));
+ _entries.getPriorities(),
+ _owner != null,
+ autoDelete,
+ durable, !durable,
+ _entries.getPriorities() > 0));
getConfigStore().addConfiguredObject(this);
@@ -274,6 +264,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
_logger.error("AMQQueue MBean creation has failed ", e);
}
+ if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
+ {
+ _messageGroupManager = new MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255);
+ }
+ else
+ {
+ _messageGroupManager = null;
+ }
+
resetNotifications();
}
@@ -488,6 +487,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
setExclusiveSubscriber(null);
subscription.setQueueContext(null);
+ if(_messageGroupManager != null)
+ {
+ QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription);
+ _messageGroupManager.clearAssignments(subscription);
+
+ if(entry != null)
+ {
+ SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+ // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+ while (subscriberIter.advance())
+ {
+ Subscription sub = subscriberIter.getNode().getSubscription();
+
+ // we don't make browsers send the same stuff twice
+ if (sub.seesRequeues())
+ {
+ updateSubRequeueEntry(sub, entry);
+ }
+ }
+
+ deliverAsync();
+
+ }
+
+ }
+
// auto-delete queues must be deleted if there are no remaining subscribers
if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 )
@@ -691,21 +716,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
try
{
- if (subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended())
+ if (!sub.isSuspended()
+ && subscriptionReadyAndHasInterest(sub, entry)
+ && mightAssign(sub, entry)
+ && !sub.wouldSuspend(entry))
{
- if (!sub.wouldSuspend(entry))
+ if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub)))
{
- if (sub.acquires() && !entry.acquire(sub))
- {
- // restore credit here that would have been taken away by wouldSuspend since we didn't manage
- // to acquire the entry for this subscription
- sub.restoreCredit(entry);
- }
- else
- {
- deliverMessage(sub, entry, false);
- }
+ // restore credit here that would have been taken away by wouldSuspend since we didn't manage
+ // to acquire the entry for this subscription
+ sub.restoreCredit(entry);
+ }
+ else
+ {
+ deliverMessage(sub, entry, false);
}
}
}
@@ -716,6 +740,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
}
+ private boolean assign(final Subscription sub, final QueueEntry entry)
+ {
+ return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry);
+ }
+
+
+ private boolean mightAssign(final Subscription sub, final QueueEntry entry)
+ {
+ if(_messageGroupManager == null || !sub.acquires())
+ return true;
+ Subscription assigned = _messageGroupManager.getAssignedSubscription(entry);
+ return (assigned == null) || (assigned == sub);
+ }
+
protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
{
// This method is only required for queues which mess with ordering
@@ -1020,6 +1058,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean filterComplete();
}
+
+
public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
{
return getMessagesOnTheQueue(new QueueEntryFilter()
@@ -1074,6 +1114,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
+ public void visit(final Visitor visitor)
+ {
+ QueueEntryIterator queueListIterator = _entries.iterator();
+
+ while(queueListIterator.advance())
+ {
+ QueueEntry node = queueListIterator.getNode();
+
+ if(!node.isDispensed())
+ {
+ if(visitor.visit(node))
+ {
+ break;
+ }
+ }
+ }
+ }
+
/**
* Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue.
*
@@ -1708,11 +1766,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
if (node != null && node.isAvailable())
{
- if (sub.hasInterest(node))
+ if (sub.hasInterest(node) && mightAssign(sub, node))
{
if (!sub.wouldSuspend(node))
{
- if (sub.acquires() && !node.acquire(sub))
+ if (sub.acquires() && !(assign(sub, node) && node.acquire(sub)))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
@@ -1769,7 +1827,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
boolean expired = false;
- while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node)))
+ while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) ||
+ !mightAssign(sub,node)))
{
if (expired)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
index 0bb5dcc219..b40e5a28c2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
@@ -185,6 +185,11 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl
advanceHead();
}
+ public int getPriorities()
+ {
+ return 0;
+ }
+
static class Factory implements QueueEntryListFactory
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
index 5f8ab16c06..414a123c43 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
@@ -51,13 +51,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
_propertyName = propertyName;
}
- @Override
public AMQQueue getQueue()
{
return _queue;
}
- @Override
public SortedQueueEntryImpl add(final ServerMessage message)
{
synchronized(_lock)
@@ -286,7 +284,6 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
return (node == null ? Colour.BLACK : node.getColour()) == colour;
}
- @Override
public SortedQueueEntryImpl next(final SortedQueueEntryImpl node)
{
synchronized(_lock)
@@ -316,13 +313,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
}
- @Override
public QueueEntryIterator<SortedQueueEntryImpl> iterator()
{
return new QueueEntryIteratorImpl(_head);
}
- @Override
public SortedQueueEntryImpl getHead()
{
return _head;
@@ -333,7 +328,6 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
return _root;
}
- @Override
public void entryDeleted(final SortedQueueEntryImpl entry)
{
synchronized(_lock)
@@ -431,6 +425,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl
}
}
+ public int getPriorities()
+ {
+ return 0;
+ }
+
/**
* Swaps the position of the node in the tree with it's successor
* (that is the node with the next highest key)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
new file mode 100644
index 0000000000..1999d655c9
--- /dev/null
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.subscription;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class MessageGroupManager
+{
+ private static final Logger _logger = LoggerFactory.getLogger(MessageGroupManager.class);
+
+
+ private final String _groupId;
+ private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>();
+ private final int _groupMask;
+
+ public MessageGroupManager(final String groupId, final int maxGroups)
+ {
+ _groupId = groupId;
+ _groupMask = pow2(maxGroups)-1;
+ }
+
+ private static int pow2(final int i)
+ {
+ int val = 1;
+ while(val < i) val<<=1;
+ return val;
+ }
+
+ public Subscription getAssignedSubscription(final QueueEntry entry)
+ {
+ Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask);
+ }
+
+ public boolean acceptMessage(Subscription sub, QueueEntry entry)
+ {
+ Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupVal == null)
+ {
+ return true;
+ }
+ else
+ {
+ Integer group = groupVal.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == sub)
+ {
+ return true;
+ }
+ else
+ {
+ if(assignedSub == null)
+ {
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Assigning group " + groupVal + " to sub " + sub);
+ }
+ assignedSub = _groupMap.putIfAbsent(group, sub);
+ return assignedSub == null || assignedSub == sub;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ }
+
+ public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub)
+ {
+ EntryFinder visitor = new EntryFinder(sub);
+ sub.getQueue().visit(visitor);
+ return visitor.getEntry();
+ }
+
+ private class EntryFinder implements AMQQueue.Visitor
+ {
+ private QueueEntry _entry;
+ private Subscription _sub;
+
+ public EntryFinder(final Subscription sub)
+ {
+ _sub = sub;
+ }
+
+ public boolean visit(final QueueEntry entry)
+ {
+ if(!entry.isAvailable())
+ return false;
+
+ Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId);
+ if(groupId == null)
+ return false;
+
+ Integer group = groupId.hashCode() & _groupMask;
+ Subscription assignedSub = _groupMap.get(group);
+ if(assignedSub == _sub)
+ {
+ _entry = entry;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public QueueEntry getEntry()
+ {
+ return _entry;
+ }
+ }
+
+ public void clearAssignments(Subscription sub)
+ {
+ Iterator<Subscription> subIter = _groupMap.values().iterator();
+ while(subIter.hasNext())
+ {
+ if(subIter.next() == sub)
+ {
+ subIter.remove();
+ }
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index 8b029f9a51..f97ac5659e 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -601,20 +601,20 @@ public class MockAMQQueue implements AMQQueue
}
- @Override
public int getMaximumDeliveryCount()
{
return 0;
}
- @Override
public void setMaximumDeliveryCount(int maximumDeliveryCount)
{
}
- @Override
public void setAlternateExchange(String exchangeName)
{
}
+ public void visit(final Visitor visitor)
+ {
+ }
}