From d3459b6f6e751e77eecac781e4701a4d15290a43 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Tue, 19 Dec 2006 10:51:39 +0000 Subject: QPID-21 Added: SelectorParser.jj - ActiveMQ selector javacc grammar used to generate SelectorParser.java server/filter - Selector Filtering code from ActiveMQ project adjusted to suite our class and package structure. server/message - Decorator classes to allow access to the JMSMessage inside the AMQMessage ConcurrentSelectorDeliveryManager.java - A new DeliveryManager that utilises PreDeliveryQueues to implement selectors AMQInvalidSelectorException.java - thrown on client and broker when the Selector text is invalid. Common: log4j.properties to remove error log4j warnings on Common tests. Modified: broker/pom.xml - to generate SelectorParser.java AMQChannel.java - Addition of argument fieldtable for filter setup. BasicConsumeMethodHandler.java - writing of InvalidSelector channel close exception. AMQMessage.java - Added decorator to get access to the enclosed JMSMessage AMQQueue.java - Enhanced 'deliverymanager' property to allow the selection of the ConcurrentSelectorDeliveryManager. Subscription.java - Enhanced interface to allow a subscription to state an 'interest' in a given message. SubscriptionFactory.java - Added method to allow passing of filter arguments. SubscriptionImpl.java - Implemented new Subscription.java methods. SubscriptionManager.java - Added ability to get a list of current subscribers. SubscriptionSet.java - augmented nextSubscriber to allow the subscriber to exert the new hasInterest feature. SynchronizedDeliveryManager.java - fixed Logging class AMQSession - Added filter extraction from consume call and pass it on to the registration. ChannelCloseMethodHandler.java - Handle the reception and correct raising of the InvalidSelector Exception AbstractJMSMessage.java - Expanded imports BlockingMethodFrameListener.java - added extra info to a debug output line. SocketTransportConnection.java - made output an info not a warn. PropertiesFileInitialContextFactory.java - updated to allow the PROVIDER_URL to specify a property file to read in for the initial values. ClusteredSubscriptionManager.java - Implementation of SubscriptionSet.java NestedSubscriptionManager.java - Implementation of SubscriptionManager.java RemoteSubscriptionImpl.java - Implementation Subscription.java AMQConstant.java - Added '322' "Invalid Selector" SubscriptionTestHelper.java - Implementation of Subscription.java Edited specs/amqp-8.0.xml to add field table to consume method. Thanks to the ActiveMQ project for writing the initial SelectorParser.jj and associated filter Expressions. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@488624 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/queue/ClusteredSubscriptionManager.java | 7 +++++ .../server/queue/NestedSubscriptionManager.java | 25 ++++++++++++---- .../qpid/server/queue/RemoteSubscriptionImpl.java | 35 +++++++++++++++++++++- 3 files changed, 60 insertions(+), 7 deletions(-) (limited to 'java/cluster') diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java index fa20e9ab76..39ae7e3c3e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java @@ -23,6 +23,8 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; import org.apache.qpid.server.cluster.util.LogMessage; +import java.util.List; + class ClusteredSubscriptionManager extends SubscriptionSet { private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class); @@ -82,6 +84,11 @@ class ClusteredSubscriptionManager extends SubscriptionSet return ClusteredSubscriptionManager.this.getWeight(); } + public List getSubscriptions() + { + return ClusteredSubscriptionManager.super.getSubscriptions(); + } + public boolean hasActiveSubscribers() { return ClusteredSubscriptionManager.super.hasActiveSubscribers(); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java index d01ebb5ba2..0566c5203b 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java @@ -21,12 +21,12 @@ package org.apache.qpid.server.queue; import java.util.List; +import java.util.LinkedList; import java.util.concurrent.CopyOnWriteArrayList; /** * Distributes messages among a list of subsscription managers, using their * weighting. - * */ class NestedSubscriptionManager implements SubscriptionManager { @@ -44,11 +44,24 @@ class NestedSubscriptionManager implements SubscriptionManager _subscribers.remove(s); } + + public List getSubscriptions() + { + List allSubs = new LinkedList(); + + for (WeightedSubscriptionManager subMans : _subscribers) + { + allSubs.addAll(subMans.getSubscriptions()); + } + + return allSubs; + } + public boolean hasActiveSubscribers() { - for(WeightedSubscriptionManager s : _subscribers) + for (WeightedSubscriptionManager s : _subscribers) { - if(s.hasActiveSubscribers()) + if (s.hasActiveSubscribers()) { return true; } @@ -59,9 +72,9 @@ class NestedSubscriptionManager implements SubscriptionManager public Subscription nextSubscriber(AMQMessage msg) { WeightedSubscriptionManager start = current(); - for(WeightedSubscriptionManager s = start; s != null; s = next(start)) + for (WeightedSubscriptionManager s = start; s != null; s = next(start)) { - if(hasMore(s)) + if (hasMore(s)) { return nextSubscriber(s); } @@ -94,7 +107,7 @@ class NestedSubscriptionManager implements SubscriptionManager private WeightedSubscriptionManager next() { _iterations = 0; - if(++_index >= _subscribers.size()) + if (++_index >= _subscribers.size()) { _index = 0; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java index 0268ff2171..cc7f6ecd2a 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java @@ -25,6 +25,9 @@ import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.AMQException; +import java.util.Queue; +import java.util.List; + class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager { private final GroupManager _groupMgr; @@ -76,6 +79,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage return _count; } + public List getSubscriptions() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public boolean hasActiveSubscribers() { return getWeight() == 0; @@ -88,9 +96,34 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage public void queueDeleted(AMQQueue queue) { - if(queue instanceof ClusteredQueue) + if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer); } } + + public boolean hasFilters() + { + return false; + } + + public boolean hasInterest(AMQMessage msg) + { + return true; + } + + public Queue getPreDeliveryQueue() + { + return null; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl + } + + public void sendNextMessage(AMQQueue queue) + { + + } } -- cgit v1.2.1