diff options
author | Robert Greig <rgreig@apache.org> | 2007-01-24 15:41:48 +0000 |
---|---|---|
committer | Robert Greig <rgreig@apache.org> | 2007-01-24 15:41:48 +0000 |
commit | b89531eed28cf2eee4fd841be57d27c0d5bcf744 (patch) | |
tree | 3e86825d875a7cae7943ae17e0350cee5aec0bc5 /java/cluster/src | |
parent | 7c43996f3c10426d6593b7224486a6b0331c7259 (diff) | |
download | qpid-python-b89531eed28cf2eee4fd841be57d27c0d5bcf744.tar.gz |
QPID-50 : Patch supplied by Rob Godfrey - Virtual Host implementation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@499446 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
22 files changed, 132 insertions, 149 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java index 352928b121..80f9ef62b1 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java @@ -46,7 +46,7 @@ class ClusterBuilder ServerHandlerRegistry getHandlerRegistry() { - return new ServerHandlerRegistry(getHandlerFactory(), null, null, null); + return new ServerHandlerRegistry(getHandlerFactory(), null, null); } private MethodHandlerFactory getHandlerFactory() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index c1306b4c13..8419ec5668 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import java.net.InetSocketAddress; @@ -55,13 +56,8 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements } public ClusteredProtocolHandler(IApplicationRegistry registry, InetSocketAddress address) - { - this(registry.getQueueRegistry(), registry.getExchangeRegistry(), address); - } - - public ClusteredProtocolHandler(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, InetSocketAddress address) - { - super(queueRegistry, exchangeRegistry); + { + super(registry); ClusterBuilder builder = new ClusterBuilder(address); _groupMgr = builder.getGroupManager(); _handlers = builder.getHandlerRegistry(); @@ -74,9 +70,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements _handlers = handler._handlers; } - protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException + protected void createSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException { - new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession)); + new ClusteredProtocolSession(session, virtualHostRegistry, codec, new ServerHandlerRegistry(_handlers, virtualHostRegistry, protocolSession)); } void connect(String join) throws Exception diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java index 04c5f7b451..fc635cc7ea 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -24,6 +24,8 @@ import org.apache.mina.common.IoSession; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQMinaProtocolSession; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -37,11 +39,11 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession { private MemberHandle _peer; - public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException + public ClusteredProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException // public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, // ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException { - super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager); + super(session, virtualHostRegistry, codecFactory, stateManager); // super(session, queueRegistry, exchangeRegistry, codecFactory); } @@ -66,7 +68,7 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession AMQChannel channel = super.getChannel(channelId); if (isPeerSession() && channel == null) { - channel = new OneUseChannel(channelId); + channel = new OneUseChannel(channelId, getVirtualHost()); addChannel(channel); } return channel; @@ -102,18 +104,12 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession */ private class OneUseChannel extends AMQChannel { - public OneUseChannel(int channelId) - throws AMQException - { - this(channelId, ApplicationRegistry.getInstance()); - } - - public OneUseChannel(int channelId, IApplicationRegistry registry) + public OneUseChannel(int channelId, VirtualHost virtualHost) throws AMQException { super(channelId, - registry.getMessageStore(), - registry.getExchangeRegistry()); + virtualHost.getMessageStore(), + virtualHost.getExchangeRegistry()); } protected void routeCurrentMessage() throws AMQException diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java index 27d5629f27..03b0dc7f2e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import java.util.HashMap; import java.util.Map; @@ -43,23 +44,20 @@ class ServerHandlerRegistry extends AMQStateManager private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class); private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); - ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, - AMQProtocolSession protocolSession) + ServerHandlerRegistry(VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession); + super(AMQState.CONNECTION_NOT_STARTED, false, virtualHostRegistry, protocolSession); } - ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + ServerHandlerRegistry(ServerHandlerRegistry s, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - this(queueRegistry, exchangeRegistry, protocolSession); + this(virtualHostRegistry, protocolSession); _handlers.putAll(s._handlers); } - ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) + ServerHandlerRegistry(MethodHandlerFactory factory, VirtualHostRegistry virtualHostRegistry, AMQProtocolSession protocolSession) { - this(queueRegistry, exchangeRegistry, protocolSession); + this(virtualHostRegistry, protocolSession); init(factory); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java index c4107a435b..86710e8a31 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChainedClusterMethodHandler.java @@ -24,6 +24,7 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; @@ -54,19 +55,19 @@ public class ChainedClusterMethodHandler <A extends AMQMethodBody> extends Clust } } - protected final void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected final void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { for(ClusterMethodHandler<A> handler : _handlers) { - handler.peer(stateMgr, queues, exchanges, session, evt); + handler.peer(stateMgr, evt); } } - protected final void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected final void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { for(ClusterMethodHandler<A> handler : _handlers) { - handler.client(stateMgr, queues, exchanges, session, evt); + handler.client(stateMgr, evt); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java index 27d3e28b88..c9f6dbfb37 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ChannelQueueManager.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.log4j.Logger; @@ -79,22 +80,22 @@ class ChannelQueueManager private class QueueDeclareHandler extends ClusterMethodHandler<QueueDeclareBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { set(evt.getChannelId(), evt.getMethod().queue); } } private class QueueBindHandler extends ClusterMethodHandler<QueueBindBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueBindBody> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueBindBody> evt) throws AMQException { if(evt.getMethod().queue == null) { @@ -104,11 +105,11 @@ class ChannelQueueManager } private class QueueDeleteHandler extends ClusterMethodHandler<QueueDeleteBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException { if(evt.getMethod().queue == null) { @@ -119,11 +120,11 @@ class ChannelQueueManager private class BasicConsumeHandler extends ClusterMethodHandler<BasicConsumeBody> { - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { if(evt.getMethod().queue == null) { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java index 971fa5393b..faab99b0f6 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandler.java @@ -32,18 +32,20 @@ import org.apache.qpid.AMQException; public abstract class ClusterMethodHandler<A extends AMQMethodBody> implements StateAwareMethodListener<A> { - public final void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + public final void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { + AMQProtocolSession session = stateMgr.getProtocolSession(); + if (ClusteredProtocolSession.isPeerSession(session)) { - peer(stateMgr, queues, exchanges, session, evt); + peer(stateMgr, evt); } else { - client(stateMgr, queues, exchanges, session, evt); + client(stateMgr, evt); } } - protected abstract void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; - protected abstract void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException; + protected abstract void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException; + protected abstract void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException; } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java index 483096f29d..cd897671cc 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ClusterMethodHandlerFactory.java @@ -135,19 +135,15 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class SynchHandler implements StateAwareMethodListener<ClusterSynchBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<ClusterSynchBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSynchBody> evt) throws AMQException { - _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(session)); + _groupMgr.handleSynch(ClusteredProtocolSession.getSessionPeer(stateManager.getProtocolSession())); } } private class JoinHandler implements StateAwareMethodListener<ClusterJoinBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<ClusterJoinBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterJoinBody> evt) throws AMQException { _groupMgr.handleJoin(new SimpleMemberHandle(evt.getMethod().broker)); } @@ -155,9 +151,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class LeaveHandler implements StateAwareMethodListener<ClusterLeaveBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterLeaveBody> evt) throws AMQException { _groupMgr.handleLeave(new SimpleMemberHandle(evt.getMethod().broker)); } @@ -165,9 +159,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class SuspectHandler implements StateAwareMethodListener<ClusterSuspectBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, - AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterSuspectBody> evt) throws AMQException { _groupMgr.handleSuspect(new SimpleMemberHandle(evt.getMethod().broker)); } @@ -175,9 +167,7 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class MembershipHandler implements StateAwareMethodListener<ClusterMembershipBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterMembershipBody> evt) throws AMQException { ClusterMembershipBody body = evt.getMethod(); _groupMgr.handleMembershipAnnouncement(new String(body.members)); @@ -186,16 +176,14 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory private class PingHandler implements StateAwareMethodListener<ClusterPingBody> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry, AMQProtocolSession session, - AMQMethodEvent<ClusterPingBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ClusterPingBody> evt) throws AMQException { MemberHandle peer = new SimpleMemberHandle(evt.getMethod().broker); _groupMgr.handlePing(peer, evt.getMethod().load); if (evt.getMethod().responseRequired) { evt.getMethod().load = _loadTable.getLocalLoad(); - session.writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod())); + stateManager.getProtocolSession().writeFrame(new AMQFrame(evt.getChannelId(), evt.getMethod())); } } } @@ -207,12 +195,12 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory super(ConnectionOpenMethodHandler.getInstance()); } - void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionOpenBody> evt) + void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionOpenBody> evt) { AMQShortString capabilities = evt.getMethod().capabilities; if (ClusterCapability.contains(capabilities)) { - ClusteredProtocolSession.setSessionPeer(session, ClusterCapability.getPeer(capabilities)); + ClusteredProtocolSession.setSessionPeer(stateMgr.getProtocolSession(), ClusterCapability.getPeer(capabilities)); } else { @@ -228,9 +216,9 @@ public class ClusterMethodHandlerFactory implements MethodHandlerFactory super(ConnectionCloseMethodHandler.getInstance()); } - void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<ConnectionCloseBody> evt) + void postHandle(AMQStateManager stateMgr, AMQMethodEvent<ConnectionCloseBody> evt) { - if (!ClusteredProtocolSession.isPeerSession(session)) + if (!ClusteredProtocolSession.isPeerSession(stateMgr.getProtocolSession())) { _loadTable.decrementLocalLoad(); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java index 7eb3d7291c..a2f62f714b 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ExtendedHandler.java @@ -38,18 +38,18 @@ class ExtendedHandler<A extends AMQMethodBody> implements StateAwareMethodListen _base = base; } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - preHandle(stateMgr, session, evt); - _base.methodReceived(stateMgr, queues, exchanges, session, evt); - postHandle(stateMgr, session, evt); + preHandle(stateMgr, evt); + _base.methodReceived(stateMgr, evt); + postHandle(stateMgr, evt); } - void preHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + void preHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { } - void postHandle(AMQStateManager stateMgr, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + void postHandle(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java index 6b876095a4..f01a8349f2 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/LocalQueueDeclareHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.queue.ClusteredQueue; import org.apache.qpid.server.queue.PrivateQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.queue.RemoteQueueProxy; +import org.apache.qpid.server.virtualhost.VirtualHost; public class LocalQueueDeclareHandler extends QueueDeclareHandler { @@ -51,7 +52,7 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler return new AMQShortString(super.createName().toString() + "@" + _groupMgr.getLocal().getDetails()); } - protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session) throws AMQException + protected AMQQueue createQueue(QueueDeclareBody body, VirtualHost virtualHost, AMQProtocolSession session) throws AMQException { //is it private or shared: if (body.exclusive) @@ -61,18 +62,18 @@ public class LocalQueueDeclareHandler extends QueueDeclareHandler //need to get peer from the session... MemberHandle peer = ClusteredProtocolSession.getSessionPeer(session); _logger.debug(new LogMessage("Creating proxied queue {0} on behalf of {1}", body.queue, peer)); - return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, registry); + return new RemoteQueueProxy(peer, _groupMgr, body.queue, body.durable, new AMQShortString(peer.getDetails()), body.autoDelete, virtualHost); } else { _logger.debug(new LogMessage("Creating local private queue {0}", body.queue)); - return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, registry); + return new PrivateQueue(_groupMgr, body.queue, body.durable, session.getContextKey(), body.autoDelete, virtualHost); } } else { _logger.debug(new LogMessage("Creating local shared queue {0}", body.queue)); - return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, registry); + return new ClusteredQueue(_groupMgr, body.queue, body.durable, null, body.autoDelete, virtualHost); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java index 7f19569dbc..8b0bb4b127 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/NullListener.java @@ -31,7 +31,7 @@ import org.apache.qpid.server.state.StateAwareMethodListener; public class NullListener<T extends AMQMethodBody> implements StateAwareMethodListener<T> { - public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession, AMQMethodEvent<T> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<T> evt) throws AMQException { } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java index 150b707071..447e51ccd9 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/PeerHandler.java @@ -47,14 +47,14 @@ public class PeerHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A _client = client; } - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - _peer.methodReceived(stateMgr, queues, exchanges, session, evt); + _peer.methodReceived(stateMgr, evt); } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - _client.methodReceived(stateMgr, queues, exchanges, session, evt); + _client.methodReceived(stateMgr, evt); } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java index 6668faca65..a669171d3c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/QueueNameGenerator.java @@ -41,12 +41,11 @@ class QueueNameGenerator extends ClusterMethodHandler<QueueDeclareBody> _handler = handler; } - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException + protected void peer(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, - AMQProtocolSession session, AMQMethodEvent<QueueDeclareBody> evt) + protected void client(AMQStateManager stateMgr, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException { setName(evt.getMethod());//need to set the name before propagating this method diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java index 0699678c9f..f09763e1ad 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteCancelHandler.java @@ -32,15 +32,21 @@ import org.apache.qpid.server.queue.ClusteredQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class RemoteCancelHandler implements StateAwareMethodListener<BasicCancelBody> { private final Logger _logger = Logger.getLogger(RemoteCancelHandler.class); - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicCancelBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicCancelBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + //By convention, consumers setup between brokers use the queue name as the consumer tag: - AMQQueue queue = queues.getQueue(evt.getMethod().consumerTag); + AMQQueue queue = queueRegistry.getQueue(evt.getMethod().consumerTag); if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).removeRemoteSubscriber(ClusteredProtocolSession.getSessionPeer(session)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java index c58ae291dd..073b13688c 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.ClusteredQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; /** * Handles consume requests from other cluster members. @@ -42,9 +43,13 @@ public class RemoteConsumeHandler implements StateAwareMethodListener<BasicConsu { private final Logger _logger = Logger.getLogger(RemoteConsumeHandler.class); - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { - AMQQueue queue = queues.getQueue(evt.getMethod().queue); + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + AMQQueue queue = queueRegistry.getQueue(evt.getMethod().queue); if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).addRemoteSubcriber(ClusteredProtocolSession.getSessionPeer(session)); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java index 26a4967417..897f8e4fb7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingConsumeHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBody> { @@ -46,17 +47,22 @@ public class ReplicatingConsumeHandler extends ReplicatingHandler<BasicConsumeBo super(groupMgr, base(), policy); } - protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException + protected void replicate(AMQStateManager stateManager, AMQMethodEvent<BasicConsumeBody> evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + //only replicate if the queue in question is a shared queue - if (isShared(queues.getQueue(evt.getMethod().queue))) + if (isShared(queueRegistry.getQueue(evt.getMethod().queue))) { - super.replicate(stateMgr, queues, exchanges, session, evt); + super.replicate(stateManager, evt); } else { _logger.info(new LogMessage("Handling consume for private queue ({0}) locally", evt.getMethod())); - local(stateMgr, queues, exchanges, session, evt); + local(stateManager, evt); _logger.info(new LogMessage("Handled consume for private queue ({0}) locally", evt.getMethod())); } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java index 84f97e7f59..888fa4e426 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/ReplicatingHandler.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.List; @@ -60,52 +61,51 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A _policy = policy; } - protected void peer(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void peer(AMQStateManager stateManager, AMQMethodEvent<A> evt) throws AMQException { - local(stateMgr, queues, exchanges, session, evt); + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + local(stateManager, evt); _logger.debug(new LogMessage("Handled {0} locally", evt.getMethod())); } - protected void client(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void client(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - replicate(stateMgr, queues, exchanges, session, evt); + replicate(stateMgr, evt); } - protected void replicate(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void replicate(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { if (_policy == null) { //asynch delivery _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod())); - local(stateMgr, queues, exchanges, session, evt); + local(stateMgr, evt); } else { - Callback callback = new Callback(stateMgr, queues, exchanges, session, evt); + Callback callback = new Callback(stateMgr, evt); _groupMgr.broadcast(new SimpleBodySendable(evt.getMethod()), _policy, callback); } _logger.debug(new LogMessage("Replicated {0} to peers", evt.getMethod())); } - protected void local(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) throws AMQException + protected void local(AMQStateManager stateMgr, AMQMethodEvent<A> evt) throws AMQException { - _base.methodReceived(stateMgr, queues, exchanges, session, evt); + _base.methodReceived(stateMgr, evt); } private class Callback implements GroupResponseHandler { private final AMQStateManager _stateMgr; - private final QueueRegistry _queues; - private final ExchangeRegistry _exchanges; - private final AMQProtocolSession _session; private final AMQMethodEvent<A> _evt; - Callback(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<A> evt) + Callback(AMQStateManager stateMgr, AMQMethodEvent<A> evt) { _stateMgr = stateMgr; - _queues = queues; - _exchanges = exchanges; - _session = session; _evt = evt; } @@ -113,7 +113,7 @@ class ReplicatingHandler<A extends AMQMethodBody> extends ClusterMethodHandler<A { try { - local(_stateMgr, _queues, _exchanges, _session, _evt); + local(_stateMgr, _evt); _logger.debug(new LogMessage("Handled {0} locally, in response to completion of replication", _evt.getMethod())); } catch (AMQException e) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java index 2561da36a8..8b0c638d63 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/WrappedListener.java @@ -42,11 +42,11 @@ public class WrappedListener<T extends AMQMethodBody> implements StateAwareMetho _primary = check(primary); } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<T> evt) throws AMQException + public void methodReceived(AMQStateManager stateMgr, AMQMethodEvent<T> evt) throws AMQException { - _pre.methodReceived(stateMgr, queues, exchanges, session, evt); - _primary.methodReceived(stateMgr, queues, exchanges, session, evt); - _post.methodReceived(stateMgr, queues, exchanges, session, evt); + _pre.methodReceived(stateMgr, evt); + _primary.methodReceived(stateMgr, evt); + _post.methodReceived(stateMgr, evt); } private static <T extends AMQMethodBody> StateAwareMethodListener<T> check(StateAwareMethodListener<T> in) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java index ad9de8d93f..8ac4b9b2c7 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.ArrayList; import java.util.HashMap; @@ -73,8 +74,11 @@ public class ReplayStore implements ReplayManager, StateAwareMethodListener _localRecorders.put(ExchangeDeleteBody.class, new ExchangeDeleteRecorder()); } - public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent evt) throws AMQException + public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + _logger.debug(new LogMessage("Replay store received {0}", evt.getMethod())); AMQMethodBody request = evt.getMethod(); diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java index 5cf6d5c3ff..19be638051 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.cluster.*; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -46,22 +47,14 @@ public class ClusteredQueue extends AMQQueue private final GroupManager _groupMgr; private final NestedSubscriptionManager _subscriptions; - public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) + public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { - super(name, durable, owner, autoDelete, queueRegistry, new ClusteredSubscriptionManager()); + super(name, durable, owner, autoDelete, virtualHost, new ClusteredSubscriptionManager()); _groupMgr = groupMgr; _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); } - public ClusteredQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) - throws AMQException - { - super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery, new ClusteredSubscriptionManager(), - new SubscriptionImpl.Factory()); - _groupMgr = groupMgr; - _subscriptions = ((ClusteredSubscriptionManager) getSubscribers()).getAllSubscribers(); - } public void process(StoreContext storeContext, AMQMessage msg) throws AMQException { diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java index 568de62d1b..95ab34ccf9 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.SimpleBodySendable; +import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.framing.QueueDeleteBody; import org.apache.qpid.framing.AMQShortString; @@ -37,21 +38,14 @@ public class PrivateQueue extends AMQQueue { private final GroupManager _groupMgr; - public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) + public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { - super(name, durable, owner, autoDelete, queueRegistry); + super(name, durable, owner, autoDelete, virtualHost); _groupMgr = groupMgr; } - public PrivateQueue(GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) - throws AMQException - { - super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery); - _groupMgr = groupMgr; - } - protected void autodelete() throws AMQException { //delete locally: diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java index a6cce05a03..d0a64c7d6f 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.cluster.GroupManager; import org.apache.qpid.server.cluster.MemberHandle; import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.concurrent.Executor; @@ -43,23 +44,15 @@ public class RemoteQueueProxy extends AMQQueue private final MemberHandle _target; private final GroupManager _groupMgr; - public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry) + public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException { - super(name, durable, owner, autoDelete, queueRegistry); + super(name, durable, owner, autoDelete, virtualHost); _target = target; _groupMgr = groupMgr; _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this)); } - public RemoteQueueProxy(MemberHandle target, GroupManager groupMgr, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, QueueRegistry queueRegistry, Executor asyncDelivery) - throws AMQException - { - super(name, durable, owner, autoDelete, queueRegistry, asyncDelivery); - _target = target; - _groupMgr = groupMgr; - _groupMgr.addMemberhipChangeListener(new ProxiedQueueCleanup(target, this)); - } public void deliver(AMQMessage msg) throws NoConsumersException { |