diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-03 19:48:46 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-03 19:48:46 +0000 |
| commit | 02bbab932f5f845bfa8eac6069bc4159bbe53d07 (patch) | |
| tree | 7a31c2804f9b9fa5f1dbabc80cbe219d9e735890 /java/broker/src | |
| parent | 7b0c33ff443deb937d26f07c039bd483e9bcbe29 (diff) | |
| download | qpid-python-02bbab932f5f845bfa8eac6069bc4159bbe53d07.tar.gz | |
QPID-3720 : [Java Broker] Implement Message Grouping
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1226930 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
8 files changed, 301 insertions, 44 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index a1f1c037ec..32d9c4878a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -149,7 +149,13 @@ public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeRefer void removeMessagesFromQueue(long fromMessageId, long toMessageId); - + static interface Visitor + { + boolean visit(QueueEntry entry); + } + + void visit(Visitor visitor); + long getMaximumMessageSize(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 37fad54c07..142cfddb39 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -75,6 +75,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.AVAILABLE; } + + public String toString() + { + return getState().name(); + } } @@ -85,6 +90,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.DEQUEUED; } + + public String toString() + { + return getState().name(); + } } @@ -95,6 +105,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.DELETED; } + + public String toString() + { + return getState().name(); + } } public final class ExpiredState extends EntryState @@ -104,6 +119,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.EXPIRED; } + + public String toString() + { + return getState().name(); + } } @@ -113,6 +133,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return State.ACQUIRED; } + + public String toString() + { + return getState().name(); + } } public final class SubscriptionAcquiredState extends EntryState @@ -134,6 +159,11 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return _subscription; } + + public String toString() + { + return "{" + getState().name() + " : " + _subscription +"}"; + } } public final class SubscriptionAssignedState extends EntryState @@ -155,6 +185,12 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable { return _subscription; } + + + public String toString() + { + return "{" + getState().name() + " : " + _subscription +"}"; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 77c4b912e0..641aaa0a08 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -35,4 +35,6 @@ public interface QueueEntryList<Q extends QueueEntry> Q getHead(); void entryDeleted(Q queueEntry); + + int getPriorities(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index ebed781a1a..d48445930a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -25,7 +25,6 @@ import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.pool.ReadWriteRunnable; import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.binding.Binding; @@ -33,7 +32,6 @@ import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.QueueConfigType; import org.apache.qpid.server.configuration.QueueConfiguration; -import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; @@ -45,6 +43,7 @@ import org.apache.qpid.server.management.ManagedObject; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.AuthorizationHolder; +import org.apache.qpid.server.subscription.MessageGroupManager; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -68,11 +67,11 @@ import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); + private static final String QPID_GROUP_HEADER_KEY = "qpid.group_header_key"; private final VirtualHost _virtualHost; @@ -189,6 +188,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount(); + private final MessageGroupManager _messageGroupManager; protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) { @@ -242,25 +242,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _logSubject = new QueueLogSubject(this); _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); - // Log the correct creation message - - // Extract the number of priorities for this Queue. - // Leave it as 0 if we are a SimpleQueueEntryList - int priorities = 0; - if (entryListFactory instanceof PriorityQueueList.Factory) - { - priorities = ((PriorityQueueList)_entries).getPriorities(); - } - // Log the creation of this Queue. // The priorities display is toggled on if we set priorities > 0 CurrentActor.get().message(_logSubject, QueueMessages.CREATED(String.valueOf(_owner), - priorities, - _owner != null, - autoDelete, - durable, !durable, - priorities > 0)); + _entries.getPriorities(), + _owner != null, + autoDelete, + durable, !durable, + _entries.getPriorities() > 0)); getConfigStore().addConfiguredObject(this); @@ -274,6 +264,15 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener _logger.error("AMQQueue MBean creation has failed ", e); } + if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY)) + { + _messageGroupManager = new MessageGroupManager(String.valueOf(arguments.get(QPID_GROUP_HEADER_KEY)), 255); + } + else + { + _messageGroupManager = null; + } + resetNotifications(); } @@ -488,6 +487,32 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener setExclusiveSubscriber(null); subscription.setQueueContext(null); + if(_messageGroupManager != null) + { + QueueEntry entry = _messageGroupManager.findEarliestAssignedAvailableEntry(subscription); + _messageGroupManager.clearAssignments(subscription); + + if(entry != null) + { + SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator(); + // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards + while (subscriberIter.advance()) + { + Subscription sub = subscriberIter.getNode().getSubscription(); + + // we don't make browsers send the same stuff twice + if (sub.seesRequeues()) + { + updateSubRequeueEntry(sub, entry); + } + } + + deliverAsync(); + + } + + } + // auto-delete queues must be deleted if there are no remaining subscribers if (_autoDelete && getDeleteOnNoConsumers() && !subscription.isTransient() && getConsumerCount() == 0 ) @@ -691,21 +716,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { try { - if (subscriptionReadyAndHasInterest(sub, entry) - && !sub.isSuspended()) + if (!sub.isSuspended() + && subscriptionReadyAndHasInterest(sub, entry) + && mightAssign(sub, entry) + && !sub.wouldSuspend(entry)) { - if (!sub.wouldSuspend(entry)) + if (sub.acquires() && !(assign(sub, entry) && entry.acquire(sub))) { - if (sub.acquires() && !entry.acquire(sub)) - { - // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this subscription - sub.restoreCredit(entry); - } - else - { - deliverMessage(sub, entry, false); - } + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this subscription + sub.restoreCredit(entry); + } + else + { + deliverMessage(sub, entry, false); } } } @@ -716,6 +740,20 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } } + private boolean assign(final Subscription sub, final QueueEntry entry) + { + return _messageGroupManager == null || _messageGroupManager.acceptMessage(sub, entry); + } + + + private boolean mightAssign(final Subscription sub, final QueueEntry entry) + { + if(_messageGroupManager == null || !sub.acquires()) + return true; + Subscription assigned = _messageGroupManager.getAssignedSubscription(entry); + return (assigned == null) || (assigned == sub); + } + protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry) { // This method is only required for queues which mess with ordering @@ -1020,6 +1058,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean filterComplete(); } + + public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) { return getMessagesOnTheQueue(new QueueEntryFilter() @@ -1074,6 +1114,24 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } + public void visit(final Visitor visitor) + { + QueueEntryIterator queueListIterator = _entries.iterator(); + + while(queueListIterator.advance()) + { + QueueEntry node = queueListIterator.getNode(); + + if(!node.isDispensed()) + { + if(visitor.visit(node)) + { + break; + } + } + } + } + /** * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. * @@ -1708,11 +1766,11 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener if (node != null && node.isAvailable()) { - if (sub.hasInterest(node)) + if (sub.hasInterest(node) && mightAssign(sub, node)) { if (!sub.wouldSuspend(node)) { - if (sub.acquires() && !node.acquire(sub)) + if (sub.acquires() && !(assign(sub, node) && node.acquire(sub))) { // restore credit here that would have been taken away by wouldSuspend since we didn't manage // to acquire the entry for this subscription @@ -1769,7 +1827,8 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); boolean expired = false; - while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node))) + while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) || + !mightAssign(sub,node))) { if (expired) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java index 0bb5dcc219..b40e5a28c2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java @@ -185,6 +185,11 @@ public class SimpleQueueEntryList implements QueueEntryList<SimpleQueueEntryImpl advanceHead(); } + public int getPriorities() + { + return 0; + } + static class Factory implements QueueEntryListFactory { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 5f8ab16c06..414a123c43 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -51,13 +51,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl _propertyName = propertyName; } - @Override public AMQQueue getQueue() { return _queue; } - @Override public SortedQueueEntryImpl add(final ServerMessage message) { synchronized(_lock) @@ -286,7 +284,6 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl return (node == null ? Colour.BLACK : node.getColour()) == colour; } - @Override public SortedQueueEntryImpl next(final SortedQueueEntryImpl node) { synchronized(_lock) @@ -316,13 +313,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } } - @Override public QueueEntryIterator<SortedQueueEntryImpl> iterator() { return new QueueEntryIteratorImpl(_head); } - @Override public SortedQueueEntryImpl getHead() { return _head; @@ -333,7 +328,6 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl return _root; } - @Override public void entryDeleted(final SortedQueueEntryImpl entry) { synchronized(_lock) @@ -431,6 +425,11 @@ public class SortedQueueEntryList implements QueueEntryList<SortedQueueEntryImpl } } + public int getPriorities() + { + return 0; + } + /** * Swaps the position of the node in the tree with it's successor * (that is the node with the next highest key) diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java new file mode 100644 index 0000000000..1999d655c9 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageGroupManager.java @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.subscription; + +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; + + +public class MessageGroupManager +{ + private static final Logger _logger = LoggerFactory.getLogger(MessageGroupManager.class); + + + private final String _groupId; + private final ConcurrentHashMap<Integer, Subscription> _groupMap = new ConcurrentHashMap<Integer, Subscription>(); + private final int _groupMask; + + public MessageGroupManager(final String groupId, final int maxGroups) + { + _groupId = groupId; + _groupMask = pow2(maxGroups)-1; + } + + private static int pow2(final int i) + { + int val = 1; + while(val < i) val<<=1; + return val; + } + + public Subscription getAssignedSubscription(final QueueEntry entry) + { + Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); + return groupVal == null ? null : _groupMap.get(groupVal.hashCode() & _groupMask); + } + + public boolean acceptMessage(Subscription sub, QueueEntry entry) + { + Object groupVal = entry.getMessage().getMessageHeader().getHeader(_groupId); + if(groupVal == null) + { + return true; + } + else + { + Integer group = groupVal.hashCode() & _groupMask; + Subscription assignedSub = _groupMap.get(group); + if(assignedSub == sub) + { + return true; + } + else + { + if(assignedSub == null) + { + if(_logger.isDebugEnabled()) + { + _logger.debug("Assigning group " + groupVal + " to sub " + sub); + } + assignedSub = _groupMap.putIfAbsent(group, sub); + return assignedSub == null || assignedSub == sub; + } + else + { + return false; + } + } + } + } + + public QueueEntry findEarliestAssignedAvailableEntry(Subscription sub) + { + EntryFinder visitor = new EntryFinder(sub); + sub.getQueue().visit(visitor); + return visitor.getEntry(); + } + + private class EntryFinder implements AMQQueue.Visitor + { + private QueueEntry _entry; + private Subscription _sub; + + public EntryFinder(final Subscription sub) + { + _sub = sub; + } + + public boolean visit(final QueueEntry entry) + { + if(!entry.isAvailable()) + return false; + + Object groupId = entry.getMessage().getMessageHeader().getHeader(_groupId); + if(groupId == null) + return false; + + Integer group = groupId.hashCode() & _groupMask; + Subscription assignedSub = _groupMap.get(group); + if(assignedSub == _sub) + { + _entry = entry; + return true; + } + else + { + return false; + } + } + + public QueueEntry getEntry() + { + return _entry; + } + } + + public void clearAssignments(Subscription sub) + { + Iterator<Subscription> subIter = _groupMap.values().iterator(); + while(subIter.hasNext()) + { + if(subIter.next() == sub) + { + subIter.remove(); + } + } + } +} diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index 8b029f9a51..f97ac5659e 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -601,20 +601,20 @@ public class MockAMQQueue implements AMQQueue } - @Override public int getMaximumDeliveryCount() { return 0; } - @Override public void setMaximumDeliveryCount(int maximumDeliveryCount) { } - @Override public void setAlternateExchange(String exchangeName) { } + public void visit(final Visitor visitor) + { + } } |
