summaryrefslogtreecommitdiff
path: root/java/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'java/cluster')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java6
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java18
7 files changed, 25 insertions, 15 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
index be5f705ffa..1b4a3e8327 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
@@ -52,7 +52,7 @@ class ClientAdapter implements MethodHandler
public void handle(int channel, AMQMethodBody method) throws AMQException
{
AMQMethodEvent evt = new AMQMethodEvent(channel, method);
- _stateMgr.methodReceived(evt, _session);
+ _stateMgr.methodReceived(evt);
}
private class SessionAdapter extends AMQProtocolSession
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
index 3c1b50fb99..a371748b9e 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
@@ -29,6 +29,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.IllegalStateTransitionException;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.*;
import java.util.HashMap;
@@ -43,9 +44,9 @@ public class ClientHandlerRegistry extends AMQStateManager
private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>();
private final MemberHandle _identity;
- protected ClientHandlerRegistry(MemberHandle local)
+ protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession);
_identity = local;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
index 89f402c1b9..352928b121 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
@@ -46,7 +46,7 @@ class ClusterBuilder
ServerHandlerRegistry getHandlerRegistry()
{
- return new ServerHandlerRegistry(getHandlerFactory());
+ return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
}
private MethodHandlerFactory getHandlerFactory()
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
index 6e7efb3659..c1306b4c13 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.ClusterMembershipBody;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -73,9 +74,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements
_handlers = handler._handlers;
}
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
{
- new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers));
+ new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
}
void connect(String join) throws Exception
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
index 1763bcd03f..04c5f7b451 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
@@ -37,10 +37,12 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession
{
private MemberHandle _peer;
- public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager)
- throws AMQException
+ public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException
+// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry,
+// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException
{
super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager);
+// super(session, queueRegistry, exchangeRegistry, codecFactory);
}
public boolean isPeerSession()
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index 8557fc17c7..f447895013 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -63,7 +63,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
{
super(host, port);
_local = local;
- _legacyHandler = new ClientHandlerRegistry(local);
+ _legacyHandler = new ClientHandlerRegistry(local, null);
}
private void init(IoSession session)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
index 71c53146a8..27d5629f27 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
@@ -27,6 +27,9 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.IllegalStateTransitionException;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.HashMap;
import java.util.Map;
@@ -40,20 +43,23 @@ class ServerHandlerRegistry extends AMQStateManager
private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
- ServerHandlerRegistry()
+ ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
+ AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
}
- ServerHandlerRegistry(ServerHandlerRegistry s)
+ ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
_handlers.putAll(s._handlers);
}
- ServerHandlerRegistry(MethodHandlerFactory factory)
+ ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
+ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
init(factory);
}