summaryrefslogtreecommitdiff
path: root/java/cluster/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-15 14:20:37 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-15 14:20:37 +0000
commit9ba2ca90c9127ea98372a9758e731dd9fe19c212 (patch)
treed49fa569caeb1f37ae6b9ecd8d003da1252f1857 /java/cluster/src
parent2fc9bb58d66e5490c1432ac4b8f8a96731f7a8f3 (diff)
downloadqpid-python-9ba2ca90c9127ea98372a9758e731dd9fe19c212.tar.gz
Merged the refactor to a common AMQMethodListener class on trunk, plus the race condition fix of Robert Godfrey. This opens the way for Request and Response managers to use a common event dispatch for both client and server.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496326 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java2
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java3
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java5
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java3
-rw-r--r--java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java18
6 files changed, 23 insertions, 13 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
index 16dd5d8fa7..e95cf3406a 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java
@@ -52,7 +52,7 @@ class ClientAdapter implements MethodHandler
public void handle(int channel, AMQMethodBody method) throws AMQException
{
AMQMethodEvent evt = new AMQMethodEvent(channel, method);
- _stateMgr.methodReceived(evt, _session);
+ _stateMgr.methodReceived(evt);
}
private class SessionAdapter extends AMQProtocolSession
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
index c604709078..0c72dee984 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java
@@ -29,6 +29,7 @@ import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.IllegalStateTransitionException;
import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionCloseBody;
@@ -49,9 +50,9 @@ public class ClientHandlerRegistry extends AMQStateManager
private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>();
private final MemberHandle _identity;
- protected ClientHandlerRegistry(MemberHandle local)
+ protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession);
_identity = local;
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
index 89f402c1b9..89dcfc080f 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java
@@ -46,7 +46,8 @@ class ClusterBuilder
ServerHandlerRegistry getHandlerRegistry()
{
- return new ServerHandlerRegistry(getHandlerFactory());
+ // TODO - FIX THIS!
+ return new ServerHandlerRegistry(getHandlerFactory(), null, null, null);
}
private MethodHandlerFactory getHandlerFactory()
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
index 6e7efb3659..c1306b4c13 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java
@@ -33,6 +33,7 @@ import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.framing.ClusterMembershipBody;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQPFastProtocolHandler;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -73,9 +74,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements
_handlers = handler._handlers;
}
- protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException
+ protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException
{
- new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers));
+ new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession));
}
void connect(String join) throws Exception
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
index 275ed39b5f..69fee079cf 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java
@@ -63,7 +63,8 @@ public class MinaBrokerProxy extends Broker implements MethodHandler
{
super(host, port);
_local = local;
- _legacyHandler = new ClientHandlerRegistry(local);
+ // TODO - FIX THIS
+ _legacyHandler = new ClientHandlerRegistry(local, null);
}
private void init(IoSession session)
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
index 71c53146a8..cab020b448 100644
--- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
+++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java
@@ -27,6 +27,9 @@ import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.IllegalStateTransitionException;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.cluster.util.LogMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import java.util.HashMap;
import java.util.Map;
@@ -40,20 +43,23 @@ class ServerHandlerRegistry extends AMQStateManager
private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class);
private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>();
- ServerHandlerRegistry()
+ ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
++ AMQProtocolSession protocolSession)
{
- super(AMQState.CONNECTION_NOT_STARTED, false);
+ super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession);
}
- ServerHandlerRegistry(ServerHandlerRegistry s)
+ ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry,
++ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
_handlers.putAll(s._handlers);
}
- ServerHandlerRegistry(MethodHandlerFactory factory)
+ ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry,
++ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession)
{
- this();
+ this(queueRegistry, exchangeRegistry, protocolSession);
init(factory);
}