summaryrefslogtreecommitdiff
path: root/java/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'java/cluster')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java7
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java25
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java35
3 files changed, 60 insertions, 7 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
index fa20e9ab76..39ae7e3c3e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.util.LogMessage;
+import java.util.List;
+
class ClusteredSubscriptionManager extends SubscriptionSet
{
private static final Logger _logger = Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -82,6 +84,11 @@ class ClusteredSubscriptionManager extends SubscriptionSet
return ClusteredSubscriptionManager.this.getWeight();
}
+ public List<Subscription> getSubscriptions()
+ {
+ return ClusteredSubscriptionManager.super.getSubscriptions();
+ }
+
public boolean hasActiveSubscribers()
{
return ClusteredSubscriptionManager.super.hasActiveSubscribers();
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
index d01ebb5ba2..0566c5203b 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
@@ -21,12 +21,12 @@
package org.apache.qpid.server.queue;
import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Distributes messages among a list of subsscription managers, using their
* weighting.
- *
*/
class NestedSubscriptionManager implements SubscriptionManager
{
@@ -44,11 +44,24 @@ class NestedSubscriptionManager implements SubscriptionManager
_subscribers.remove(s);
}
+
+ public List<Subscription> getSubscriptions()
+ {
+ List<Subscription> allSubs = new LinkedList<Subscription>();
+
+ for (WeightedSubscriptionManager subMans : _subscribers)
+ {
+ allSubs.addAll(subMans.getSubscriptions());
+ }
+
+ return allSubs;
+ }
+
public boolean hasActiveSubscribers()
{
- for(WeightedSubscriptionManager s : _subscribers)
+ for (WeightedSubscriptionManager s : _subscribers)
{
- if(s.hasActiveSubscribers())
+ if (s.hasActiveSubscribers())
{
return true;
}
@@ -59,9 +72,9 @@ class NestedSubscriptionManager implements SubscriptionManager
public Subscription nextSubscriber(AMQMessage msg)
{
WeightedSubscriptionManager start = current();
- for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+ for (WeightedSubscriptionManager s = start; s != null; s = next(start))
{
- if(hasMore(s))
+ if (hasMore(s))
{
return nextSubscriber(s);
}
@@ -94,7 +107,7 @@ class NestedSubscriptionManager implements SubscriptionManager
private WeightedSubscriptionManager next()
{
_iterations = 0;
- if(++_index >= _subscribers.size())
+ if (++_index >= _subscribers.size())
{
_index = 0;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
index 0268ff2171..cc7f6ecd2a 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
@@ -25,6 +25,9 @@ import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.AMQException;
+import java.util.Queue;
+import java.util.List;
+
class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager
{
private final GroupManager _groupMgr;
@@ -76,6 +79,11 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
return _count;
}
+ public List<Subscription> getSubscriptions()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean hasActiveSubscribers()
{
return getWeight() == 0;
@@ -88,9 +96,34 @@ class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManage
public void queueDeleted(AMQQueue queue)
{
- if(queue instanceof ClusteredQueue)
+ if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer);
}
}
+
+ public boolean hasFilters()
+ {
+ return false;
+ }
+
+ public boolean hasInterest(AMQMessage msg)
+ {
+ return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl
+ }
+
+ public void sendNextMessage(AMQQueue queue)
+ {
+
+ }
}