diff options
Diffstat (limited to 'java/cluster')
7 files changed, 25 insertions, 15 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java index be5f705ffa..1b4a3e8327 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java @@ -52,7 +52,7 @@ class ClientAdapter implements MethodHandler public void handle(int channel, AMQMethodBody method) throws AMQException { AMQMethodEvent evt = new AMQMethodEvent(channel, method); - _stateMgr.methodReceived(evt, _session); + _stateMgr.methodReceived(evt); } private class SessionAdapter extends AMQProtocolSession diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java index 3c1b50fb99..a371748b9e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -29,6 +29,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.IllegalStateTransitionException; import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.*; import java.util.HashMap; @@ -43,9 +44,9 @@ public class ClientHandlerRegistry extends AMQStateManager private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>(); private final MemberHandle _identity; - protected ClientHandlerRegistry(MemberHandle local) + protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false); + super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession); _identity = local; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java index 89f402c1b9..352928b121 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java @@ -46,7 +46,7 @@ class ClusterBuilder ServerHandlerRegistry getHandlerRegistry() { - return new ServerHandlerRegistry(getHandlerFactory()); + return new ServerHandlerRegistry(getHandlerFactory(), null, null, null); } private MethodHandlerFactory getHandlerFactory() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index 6e7efb3659..c1306b4c13 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.ClusterMembershipBody; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -73,9 +74,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements _handlers = handler._handlers; } - protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException + protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException { - new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers)); + new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession)); } void connect(String join) throws Exception diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java index 1763bcd03f..04c5f7b451 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -37,10 +37,12 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession { private MemberHandle _peer; - public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) - throws AMQException + public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException +// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, +// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException { super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager); +// super(session, queueRegistry, exchangeRegistry, codecFactory); } public boolean isPeerSession() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index 8557fc17c7..f447895013 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -63,7 +63,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler { super(host, port); _local = local; - _legacyHandler = new ClientHandlerRegistry(local); + _legacyHandler = new ClientHandlerRegistry(local, null); } private void init(IoSession session) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java index 71c53146a8..27d5629f27 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -27,6 +27,9 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.IllegalStateTransitionException; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.HashMap; import java.util.Map; @@ -40,20 +43,23 @@ class ServerHandlerRegistry extends AMQStateManager private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class); private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); - ServerHandlerRegistry() + ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false); + super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession); } - ServerHandlerRegistry(ServerHandlerRegistry s) + ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(); + this(queueRegistry, exchangeRegistry, protocolSession); _handlers.putAll(s._handlers); } - ServerHandlerRegistry(MethodHandlerFactory factory) + ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(); + this(queueRegistry, exchangeRegistry, protocolSession); init(factory); } |
