diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 14:20:37 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 14:20:37 +0000 |
| commit | 9ba2ca90c9127ea98372a9758e731dd9fe19c212 (patch) | |
| tree | d49fa569caeb1f37ae6b9ecd8d003da1252f1857 /java/cluster/src | |
| parent | 2fc9bb58d66e5490c1432ac4b8f8a96731f7a8f3 (diff) | |
| download | qpid-python-9ba2ca90c9127ea98372a9758e731dd9fe19c212.tar.gz | |
Merged the refactor to a common AMQMethodListener class on trunk, plus the race condition fix of Robert Godfrey. This opens the way for Request and Response managers to use a common event dispatch for both client and server.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496326 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/cluster/src')
6 files changed, 23 insertions, 13 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java index 16dd5d8fa7..e95cf3406a 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java @@ -52,7 +52,7 @@ class ClientAdapter implements MethodHandler public void handle(int channel, AMQMethodBody method) throws AMQException { AMQMethodEvent evt = new AMQMethodEvent(channel, method); - _stateMgr.methodReceived(evt, _session); + _stateMgr.methodReceived(evt); } private class SessionAdapter extends AMQProtocolSession diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java index c604709078..0c72dee984 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -29,6 +29,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.IllegalStateTransitionException; import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.ConnectionCloseBody; @@ -49,9 +50,9 @@ public class ClientHandlerRegistry extends AMQStateManager private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>(); private final MemberHandle _identity; - protected ClientHandlerRegistry(MemberHandle local) + protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false); + super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession); _identity = local; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java index 89f402c1b9..89dcfc080f 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java @@ -46,7 +46,8 @@ class ClusterBuilder ServerHandlerRegistry getHandlerRegistry() { - return new ServerHandlerRegistry(getHandlerFactory()); + // TODO - FIX THIS! + return new ServerHandlerRegistry(getHandlerFactory(), null, null, null); } private MethodHandlerFactory getHandlerFactory() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index 6e7efb3659..c1306b4c13 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.ClusterMembershipBody; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -73,9 +74,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements _handlers = handler._handlers; } - protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException + protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException { - new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers)); + new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession)); } void connect(String join) throws Exception diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index 275ed39b5f..69fee079cf 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -63,7 +63,8 @@ public class MinaBrokerProxy extends Broker implements MethodHandler { super(host, port); _local = local; - _legacyHandler = new ClientHandlerRegistry(local); + // TODO - FIX THIS + _legacyHandler = new ClientHandlerRegistry(local, null); } private void init(IoSession session) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java index 71c53146a8..cab020b448 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -27,6 +27,9 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.IllegalStateTransitionException; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.HashMap; import java.util.Map; @@ -40,20 +43,23 @@ class ServerHandlerRegistry extends AMQStateManager private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class); private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); - ServerHandlerRegistry() + ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, ++ AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false); + super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession); } - ServerHandlerRegistry(ServerHandlerRegistry s) + ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry, ++ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(); + this(queueRegistry, exchangeRegistry, protocolSession); _handlers.putAll(s._handlers); } - ServerHandlerRegistry(MethodHandlerFactory factory) + ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry, ++ ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(); + this(queueRegistry, exchangeRegistry, protocolSession); init(factory); } |
