summaryrefslogtreecommitdiff
path: root/java/cluster/src
diff options
context:
space:
mode:
authorRobert Greig <rgreig@apache.org>2007-01-24 15:41:48 +0000
committerRobert Greig <rgreig@apache.org>2007-01-24 15:41:48 +0000
commitb89531eed28cf2eee4fd841be57d27c0d5bcf744 (patch)
tree3e86825d875a7cae7943ae17e0350cee5aec0bc5 /java/cluster/src
parent7c43996f3c10426d6593b7224486a6b0331c7259 (diff)
downloadqpid-python-b89531eed28cf2eee4fd841be57d27c0d5bcf744.tar.gz
QPID-50 : Patch supplied by Rob Godfrey - Virtual Host implementation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499446 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java14
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java20
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java16
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java9
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java17
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java36
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java9
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java10
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java9
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java14
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java34
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java8
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java13
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java12
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java13
22 files changed, 132 insertions, 149 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
index 352928b121..80f9ef62b1 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
@@ -46,7 +46,7 @@ class ClusterBuilder
ServerHandlerRegistry getHandlerRegistry()
{
- return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
+ return new ServerHandlerRegistry(getHandlerFactory(), null, null);
}
private MethodHandlerFactory getHandlerFactory()
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
index c1306b4c13..8419ec5668 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.net.InetSocketAddress;
@@ -55,13 +56,8 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements
}
public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address)
- {
- this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address);
- }
-
- public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address)
- {
- super(queueRegistry, exchangeRegistry);
+ {
+ super(registry);
ClusterBuilder builder = new ClusterBuilder(address);
_groupMgr = builder.getGroupManager();
_handlers = builder.getHandlerRegistry();
@@ -74,9 +70,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements
_handlers = handler._handlers;
}
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
{
- new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
+ new ClusteredProtocolSession(session, virtualHostRegistry, codec, new ServerHandlerRegistry(_handlers, virtualHostRegistry, protocolSession));
}
void connect(String join) throws Exception
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
index 04c5f7b451..fc635cc7ea 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
@@ -24,6 +24,8 @@ import org.apache.mina.common.IoSession;
import org.apache.qpid.AMQException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -37,11 +39,11 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
{
private MemberHandle _peer;
- public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
+ public ClusteredProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry,
// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException
{
- super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager);
+ super(session, virtualHostRegistry, codecFactory, stateManager);
// super(session, queueRegistry, exchangeRegistry, codecFactory);
}
@@ -66,7 +68,7 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
AMQChannel channel = super.getChannel(channelId);
if (isPeerSession() && channel == null)
{
- channel = new OneUseChannel(channelId);
+ channel = new OneUseChannel(channelId, getVirtualHost());
addChannel(channel);
}
return channel;
@@ -102,18 +104,12 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
*/
private class OneUseChannel extends AMQChannel
{
- public OneUseChannel(int channelId)
- throws AMQException
- {
- this(channelId, ApplicationRegistry.getInstance());
- }
-
- public OneUseChannel(int channelId, IApplicationRegistry registry)
+ public OneUseChannel(int channelId, VirtualHost virtualHost)
throws AMQException
{
super(channelId,
- registry.getMessageStore(),
- registry.getExchangeRegistry());
+ virtualHost.getMessageStore(),
+ virtualHost.getExchangeRegistry());
}
protected void routeCurrentMessage() throws AMQException
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
index 27d5629f27..03b0dc7f2e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.util.HashMap;
import java.util.Map;
@@ -43,23 +44,20 @@ class ServerHandlerRegistry extends AMQStateManager
private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
- ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
- AMQProtocolSession protocolSession)
+ ServerHandlerRegistry(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
+ super(AMQState.CONNECTION_NOT_STARTED, false, virtualHostRegistry, protocolSession);
}
- ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ ServerHandlerRegistry(ServerHandlerRegistry s, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(queueRegistry, exchangeRegistry, protocolSession);
+ this(virtualHostRegistry, protocolSession);
_handlers.putAll(s._handlers);
}
- ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
+ ServerHandlerRegistry(MethodHandlerFactory factory, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession)
{
- this(queueRegistry, exchangeRegistry, protocolSession);
+ this(virtualHostRegistry, protocolSession);
init(factory);
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
index c4107a435b..86710e8a31 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java
@@ -24,6 +24,7 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -54,19 +55,19 @@ public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends Clust
}
}
- protected final void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected final void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
for(ClusterMethodHandler<A> handler : _handlers)
{
- handler.peer(stateMgr, queues, exchanges, session, evt);
+ handler.peer(stateMgr, evt);
}
}
- protected final void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected final void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
for(ClusterMethodHandler<A> handler : _handlers)
{
- handler.client(stateMgr, queues, exchanges, session, evt);
+ handler.client(stateMgr, evt);
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
index 27d3e28b88..c9f6dbfb37 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
import org.apache.log4j.Logger;
@@ -79,22 +80,22 @@ class ChannelQueueManager
private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
set(evt.getChannelId(), evt.getMethod().queue);
}
}
private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
@@ -104,11 +105,11 @@ class ChannelQueueManager
}
private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
@@ -119,11 +120,11 @@ class ChannelQueueManager
private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody>
{
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
if(evt.getMethod().queue == null)
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
index 971fa5393b..faab99b0f6 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java
@@ -32,18 +32,20 @@ import org.apache.qpid.AMQException;
public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A>
{
- public final void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
+ AMQProtocolSession session = stateMgr.getProtocolSession();
+
if (ClusteredProtocolSession.isPeerSession(session))
{
- peer(stateMgr, queues, exchanges, session, evt);
+ peer(stateMgr, evt);
}
else
{
- client(stateMgr, queues, exchanges, session, evt);
+ client(stateMgr, evt);
}
}
- protected abstract void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
- protected abstract void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException;
+ protected abstract void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
+ protected abstract void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException;
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
index 483096f29d..cd897671cc 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java
@@ -135,19 +135,15 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException
{
- _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session));
+ _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(stateManager.getProtocolSession()));
}
}
private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException
{
_groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker));
}
@@ -155,9 +151,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException
{
_groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker));
}
@@ -165,9 +159,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
- AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException
{
_groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker));
}
@@ -175,9 +167,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException
{
ClusterMembershipBody body = evt.getMethod();
_groupMgr.handleMembershipAnnouncement(new String(body.members));
@@ -186,16 +176,14 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
private class PingHandler implements StateAwareMethodListener<ClusterPingBody>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
- ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
- AMQMethodEvent<ClusterPingBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterPingBody> evt) throws AMQException
{
MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker);
_groupMgr.handlePing(peer, evt.getMethod().load);
if (evt.getMethod().responseRequired)
{
evt.getMethod().load = _loadTable.getLocalLoad();
- session.writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
+ stateManager.getProtocolSession().writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod()));
}
}
}
@@ -207,12 +195,12 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
super(ConnectionOpenMethodHandler.getInstance());
}
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt)
+ void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionOpenBody> evt)
{
AMQShortString capabilities = evt.getMethod().capabilities;
if (ClusterCapability.contains(capabilities))
{
- ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities));
+ ClusteredProtocolSession.setSessionPeer(stateMgr.getProtocolSession(), ClusterCapability.getPeer(capabilities));
}
else
{
@@ -228,9 +216,9 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory
super(ConnectionCloseMethodHandler.getInstance());
}
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionCloseBody> evt)
+ void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionCloseBody> evt)
{
- if (!ClusteredProtocolSession.isPeerSession(session))
+ if (!ClusteredProtocolSession.isPeerSession(stateMgr.getProtocolSession()))
{
_loadTable.decrementLocalLoad();
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
index 7eb3d7291c..a2f62f714b 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java
@@ -38,18 +38,18 @@ class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListen
_base = base;
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- preHandle(stateMgr, session, evt);
- _base.methodReceived(stateMgr, queues, exchanges, session, evt);
- postHandle(stateMgr, session, evt);
+ preHandle(stateMgr, evt);
+ _base.methodReceived(stateMgr, evt);
+ postHandle(stateMgr, evt);
}
- void preHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ void preHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
}
- void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ void postHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
index 6b876095a4..f01a8349f2 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.queue.ClusteredQueue;
import org.apache.qpid.server.queue.PrivateQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.queue.RemoteQueueProxy;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class LocalQueueDeclareHandler extends QueueDeclareHandler
{
@@ -51,7 +52,7 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler
return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails());
}
- protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException
+ protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, AMQProtocolSession session) throws AMQException
{
//is it private or shared:
if (body.exclusive)
@@ -61,18 +62,18 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler
//need to get peer from the session...
MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session);
_logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer));
- return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, registry);
+ return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, virtualHost);
}
else
{
_logger.debug(new LogMessage("Creating local private queue {0}", body.queue));
- return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, registry);
+ return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, virtualHost);
}
}
else
{
_logger.debug(new LogMessage("Creating local shared queue {0}", body.queue));
- return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, registry);
+ return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, virtualHost);
}
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
index 7f19569dbc..8b0bb4b127 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java
@@ -31,7 +31,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener;
public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T>
{
- public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException
{
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
index 150b707071..447e51ccd9 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java
@@ -47,14 +47,14 @@ public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
_client = client;
}
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- _peer.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _peer.methodReceived(stateMgr, evt);
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- _client.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _client.methodReceived(stateMgr, evt);
}
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
index 6668faca65..a669171d3c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java
@@ -41,12 +41,11 @@ class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody>
_handler = handler;
}
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
{
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges,
- AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt)
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt)
throws AMQException
{
setName(evt.getMethod());//need to set the name before propagating this method
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
index 0699678c9f..f09763e1ad 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java
@@ -32,15 +32,21 @@ import org.apache.qpid.server.queue.ClusteredQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody>
{
private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class);
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
//By convention, consumers setup between brokers use the queue name as the consumer tag:
- AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag);
+ AMQQueue queue = queueRegistry.getQueue(evt.getMethod().consumerTag);
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
index c58ae291dd..073b13688c 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.ClusteredQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
/**
* Handles consume requests from other cluster members.
@@ -42,9 +43,13 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu
{
private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class);
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
- AMQQueue queue = queues.getQueue(evt.getMethod().queue);
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ AMQQueue queue = queueRegistry.getQueue(evt.getMethod().queue);
if (queue instanceof ClusteredQueue)
{
((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session));
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
index 26a4967417..897f8e4fb7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody>
{
@@ -46,17 +47,22 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBo
super(groupMgr, base(), policy);
}
- protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+ protected void replicate(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
//only replicate if the queue in question is a shared queue
- if (isShared(queues.getQueue(evt.getMethod().queue)))
+ if (isShared(queueRegistry.getQueue(evt.getMethod().queue)))
{
- super.replicate(stateMgr, queues, exchanges, session, evt);
+ super.replicate(stateManager, evt);
}
else
{
_logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod()));
- local(stateMgr, queues, exchanges, session, evt);
+ local(stateManager, evt);
_logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod()));
}
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
index 84f97e7f59..888fa4e426 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.List;
@@ -60,52 +61,51 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
_policy = policy;
}
- protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void peer(AMQStateManager stateManager, AMQMethodEvent<A> evt) throws AMQException
{
- local(stateMgr, queues, exchanges, session, evt);
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+ local(stateManager, evt);
_logger.debug(new LogMessage("Handled {0} locally", evt.getMethod()));
}
- protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- replicate(stateMgr, queues, exchanges, session, evt);
+ replicate(stateMgr, evt);
}
- protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void replicate(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
if (_policy == null)
{
//asynch delivery
_groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()));
- local(stateMgr, queues, exchanges, session, evt);
+ local(stateMgr, evt);
}
else
{
- Callback callback = new Callback(stateMgr, queues, exchanges, session, evt);
+ Callback callback = new Callback(stateMgr, evt);
_groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()), _policy, callback);
}
_logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod()));
}
- protected void local(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException
+ protected void local(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException
{
- _base.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _base.methodReceived(stateMgr, evt);
}
private class Callback implements GroupResponseHandler
{
private final AMQStateManager _stateMgr;
- private final QueueRegistry _queues;
- private final ExchangeRegistry _exchanges;
- private final AMQProtocolSession _session;
private final AMQMethodEvent<A> _evt;
- Callback(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt)
+ Callback(AMQStateManager stateMgr, AMQMethodEvent<A> evt)
{
_stateMgr = stateMgr;
- _queues = queues;
- _exchanges = exchanges;
- _session = session;
_evt = evt;
}
@@ -113,7 +113,7 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A
{
try
{
- local(_stateMgr, _queues, _exchanges, _session, _evt);
+ local(_stateMgr, _evt);
_logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod()));
}
catch (AMQException e)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
index 2561da36a8..8b0c638d63 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java
@@ -42,11 +42,11 @@ public class WrappedListener<T extends AMQMethodBody> implements StateAwareMetho
_primary = check(primary);
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<T> evt) throws AMQException
{
- _pre.methodReceived(stateMgr, queues, exchanges, session, evt);
- _primary.methodReceived(stateMgr, queues, exchanges, session, evt);
- _post.methodReceived(stateMgr, queues, exchanges, session, evt);
+ _pre.methodReceived(stateMgr, evt);
+ _primary.methodReceived(stateMgr, evt);
+ _post.methodReceived(stateMgr, evt);
}
private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
index ad9de8d93f..8ac4b9b2c7 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.HashMap;
@@ -73,8 +74,11 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener
_localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder());
}
- public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
+
_logger.debug(new LogMessage("Replay store received {0}", evt.getMethod()));
AMQMethodBody request = evt.getMethod();
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
index 5cf6d5c3ff..19be638051 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.cluster.*;
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -46,22 +47,14 @@ public class ClusteredQueue extends AMQQueue
private final GroupManager _groupMgr;
private final NestedSubscriptionManager _subscriptions;
- public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry)
+ public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- super(name, durable, owner, autoDelete, queueRegistry, new ClusteredSubscriptionManager());
+ super(name, durable, owner, autoDelete, virtualHost, new ClusteredSubscriptionManager());
_groupMgr = groupMgr;
_subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
}
- public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new ClusteredSubscriptionManager(),
- new SubscriptionImpl.Factory());
- _groupMgr = groupMgr;
- _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers();
- }
public void process(StoreContext storeContext, AMQMessage msg) throws AMQException
{
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
index 568de62d1b..95ab34ccf9 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.SimpleBodySendable;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.AMQShortString;
@@ -37,21 +38,14 @@ public class PrivateQueue extends AMQQueue
{
private final GroupManager _groupMgr;
- public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry)
+ public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- super(name, durable, owner, autoDelete, queueRegistry);
+ super(name, durable, owner, autoDelete, virtualHost);
_groupMgr = groupMgr;
}
- public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
- _groupMgr = groupMgr;
- }
-
protected void autodelete() throws AMQException
{
//delete locally:
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
index a6cce05a03..d0a64c7d6f 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.cluster.GroupManager;
import org.apache.qpid.server.cluster.MemberHandle;
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.concurrent.Executor;
@@ -43,23 +44,15 @@ public class RemoteQueueProxy extends AMQQueue
private final MemberHandle _target;
private final GroupManager _groupMgr;
- public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry)
+ public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- super(name, durable, owner, autoDelete, queueRegistry);
+ super(name, durable, owner, autoDelete, virtualHost);
_target = target;
_groupMgr = groupMgr;
_groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
}
- public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery)
- throws AMQException
- {
- super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery);
- _target = target;
- _groupMgr = groupMgr;
- _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this));
- }
public void deliver(AMQMessage msg) throws NoConsumersException
{