diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-12 22:02:11 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-12 22:02:11 +0000 |
| commit | bd98b98b0dc2f15563a280967f9cd907bc7aa7c4 (patch) | |
| tree | e2708db8fcd1757accbc59565f36349dea00793e /java | |
| parent | d6de073a34a7781977e71f89dc9f9b66921993cb (diff) | |
| download | qpid-python-bd98b98b0dc2f15563a280967f9cd907bc7aa7c4.tar.gz | |
Created common AMQMethodListener class, allowing the Request and Response managers to use a common interface to dispatch events to both the client and servers. Refactoring of bothe the client and broker AMQStateManagers and AMQProtocolSession classes was performed. The refactoring has run aground in the clustering, however, and this still needs to be resolved. As the cluster tests are currently disabled (by whom, I'm not sure), this does not disrupt the overall test result. JIRAs will be opened for this issue.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@495754 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
16 files changed, 97 insertions, 104 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 415b7a3c68..fa43b8809d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -29,6 +29,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; @@ -99,10 +100,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQCodecFactory codecFactory) throws AMQException { - this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); + _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this); + _minaProtocolSession = session; + session.setAttachment(this); + + _queueRegistry = queueRegistry; + _exchangeRegistry = exchangeRegistry; + _codecFactory = codecFactory; + _managedObject = createMBean(); + _managedObject.register(); +// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException { @@ -208,13 +218,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, (AMQMethodBody) frame.bodyFrame); try { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt, this, _queueRegistry, _exchangeRegistry); + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); if(!_frameListeners.isEmpty()) { for (AMQMethodListener listener : _frameListeners) { - wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) || + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } @@ -233,7 +243,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _logger.error("Closing connection due to: " + e.getMessage()); writeFrame(e.getCloseFrame(frame.channel)); } - catch (AMQException e) + catch (Exception e) { _stateManager.error(e); for (AMQMethodListener listener : _frameListeners) diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index fb78b0d8b7..70e530699e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.*; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.handler.*; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQMethodListener; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.log4j.Logger; @@ -43,7 +43,9 @@ import java.util.concurrent.CopyOnWriteArraySet; public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - + private final QueueRegistry _queueRegistry; + private final ExchangeRegistry _exchangeRegistry; + private final AMQProtocolSession _protocolSession; /** * The current state */ @@ -58,13 +60,16 @@ public class AMQStateManager implements AMQMethodListener private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>(); - public AMQStateManager() + public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true); + this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession); } - protected AMQStateManager(AMQState initial, boolean register) + protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { + _queueRegistry = queueRegistry; + _exchangeRegistry = exchangeRegistry; + _protocolSession = protocolSession; _currentState = initial; if (register) { @@ -149,7 +154,7 @@ public class AMQStateManager implements AMQMethodListener } } - public void error(AMQException e) + public void error(Exception e) { _logger.error("State manager received error notification: " + e, e); for (StateListener l : _stateListeners) @@ -158,15 +163,12 @@ public class AMQStateManager implements AMQMethodListener } } - public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt, - AMQProtocolSession protocolSession, - QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry) throws AMQException + public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { - handler.methodReceived(this, queueRegistry, exchangeRegistry, protocolSession, evt); + handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt); return true; } return false; diff --git a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java index 53291a3fd0..50596d4bfc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java @@ -89,7 +89,7 @@ public class FailoverHandler implements Runnable // have a state waiter waiting until the connection is closed for some reason. Or in future we may have // a slightly more complex state model therefore I felt it was worthwhile doing this. AMQStateManager existingStateManager = _amqProtocolHandler.getStateManager(); - _amqProtocolHandler.setStateManager(new AMQStateManager()); + _amqProtocolHandler.setStateManager(new AMQStateManager(_amqProtocolHandler.getProtocolSession())); if (!_amqProtocolHandler.getConnection().firePreFailover(_host != null)) { _amqProtocolHandler.setStateManager(existingStateManager); diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java deleted file mode 100644 index 2cbd8f0e32..0000000000 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQMethodListener.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.client.protocol; - -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.client.protocol.AMQProtocolSession; - -public interface AMQMethodListener -{ - /** - * Invoked when a method frame has been received - * @param evt the event - * @return true if the handler has processed the method frame, false otherwise. Note - * that this does not prohibit the method event being delivered to subsequent listeners - * but can be used to determine if nobody has dealt with an incoming method frame. - * @throws AMQException if an error has occurred. This exception will be delivered - * to all registered listeners using the error() method (see below) allowing them to - * perform cleanup if necessary. - */ - boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException; - - /** - * Callback when an error has occurred. Allows listeners to clean up. - * @param e - */ - void error(Exception e); -} diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index 220f3c3b69..a0aa1d544b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -40,6 +40,7 @@ import org.apache.qpid.client.state.listener.SpecificMethodFrameListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.ssl.BogusSSLContextFactory; import java.util.Iterator; @@ -68,7 +69,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ private volatile AMQProtocolSession _protocolSession; - private AMQStateManager _stateManager = new AMQStateManager(); +// private AMQStateManager _stateManager = new AMQStateManager(); private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet(); @@ -277,7 +278,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void propagateExceptionToWaiters(Exception e) { - _stateManager.error(e); + _protocolSession.getStateManager().error(e); if(!_frameListeners.isEmpty()) { final Iterator it = _frameListeners.iterator(); @@ -316,14 +317,14 @@ public class AMQProtocolHandler extends IoHandlerAdapter try { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt, _protocolSession); + boolean wasAnyoneInterested = _protocolSession.getStateManager().methodReceived(evt); if(!_frameListeners.isEmpty()) { Iterator it = _frameListeners.iterator(); while (it.hasNext()) { final AMQMethodListener listener = (AMQMethodListener) it.next(); - wasAnyoneInterested = listener.methodReceived(evt, _protocolSession) || wasAnyoneInterested; + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } if (!wasAnyoneInterested) @@ -333,7 +334,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter } catch (AMQException e) { - _stateManager.error(e); + _protocolSession.getStateManager().error(e); if(!_frameListeners.isEmpty()) { Iterator it = _frameListeners.iterator(); @@ -394,7 +395,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter */ public void attainState(AMQState s) throws AMQException { - _stateManager.attainState(s); + _protocolSession.getStateManager().attainState(s); } /** @@ -486,7 +487,7 @@ public class AMQProtocolHandler extends IoHandlerAdapter public void closeConnection() throws AMQException { - _stateManager.changeState(AMQState.CONNECTION_CLOSING); + _protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSING); // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. @@ -556,12 +557,17 @@ public class AMQProtocolHandler extends IoHandlerAdapter public AMQStateManager getStateManager() { - return _stateManager; + return _protocolSession.getStateManager(); } public void setStateManager(AMQStateManager stateManager) { - _stateManager = stateManager; + _protocolSession.setStateManager(stateManager); + } + + public AMQProtocolSession getProtocolSession() + { + return _protocolSession; } FailoverState getFailoverState() diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 718d256a52..1af7ca55e6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -33,6 +33,7 @@ import org.apache.qpid.client.message.UnexpectedBodyReceivedException; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQProtocolWriter; +import org.apache.qpid.client.state.AMQStateManager; import org.apache.commons.lang.StringUtils; import javax.jms.JMSException; @@ -63,6 +64,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis protected final IoSession _minaProtocolSession; + private AMQStateManager _stateManager; + protected WriteFuture _lastWriteFuture; /** @@ -98,6 +101,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { _protocolHandler = null; _minaProtocolSession = null; + _stateManager = new AMQStateManager(this); } public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) @@ -106,6 +110,7 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _minaProtocolSession = protocolSession; // properties of the connection are made available to the event handlers _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection); + _stateManager = new AMQStateManager(this); } public void init() @@ -136,6 +141,16 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis { getAMQConnection().setClientID(clientID); } + + public AMQStateManager getStateManager() + { + return _stateManager; + } + + public void setStateManager(AMQStateManager stateManager) + { + _stateManager = stateManager; + } public String getVirtualHost() { diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java index 4fff4fab00..c05e6faf53 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java @@ -23,6 +23,7 @@ package org.apache.qpid.client.protocol; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.client.protocol.AMQProtocolSession; public abstract class BlockingMethodFrameListener implements AMQMethodListener @@ -55,7 +56,7 @@ public abstract class BlockingMethodFrameListener implements AMQMethodListener * @return true if the listener has dealt with this frame * @throws AMQException */ - public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException + public boolean methodReceived(AMQMethodEvent evt) throws AMQException { AMQMethodBody method = evt.getMethod(); diff --git a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java index c9b7593796..ea5cfce2ea 100644 --- a/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java +++ b/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java @@ -23,8 +23,8 @@ package org.apache.qpid.client.state; import org.apache.qpid.AMQException; import org.apache.qpid.client.handler.*; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.client.protocol.AMQProtocolSession; -import org.apache.qpid.client.protocol.AMQMethodListener; import org.apache.qpid.framing.*; import org.apache.log4j.Logger; @@ -41,6 +41,7 @@ import java.util.concurrent.CopyOnWriteArraySet; public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); + private final AMQProtocolSession _protocolSession; /** * The current state @@ -55,13 +56,14 @@ public class AMQStateManager implements AMQMethodListener private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet(); - public AMQStateManager() + public AMQStateManager(AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true); + this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession); } - protected AMQStateManager(AMQState state, boolean register) + protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession) { + _protocolSession = protocolSession; _currentState = state; if(register) { @@ -147,12 +149,12 @@ public class AMQStateManager implements AMQMethodListener } } - public boolean methodReceived(AMQMethodEvent evt, AMQProtocolSession protocolSession) throws AMQException + public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { - handler.methodReceived(this, protocolSession, evt); + handler.methodReceived(this, _protocolSession, evt); return true; } return false; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java index be5f705ffa..1b4a3e8327 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientAdapter.java @@ -52,7 +52,7 @@ class ClientAdapter implements MethodHandler public void handle(int channel, AMQMethodBody method) throws AMQException { AMQMethodEvent evt = new AMQMethodEvent(channel, method); - _stateMgr.methodReceived(evt, _session); + _stateMgr.methodReceived(evt); } private class SessionAdapter extends AMQProtocolSession diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java index 3c1b50fb99..a371748b9e 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClientHandlerRegistry.java @@ -29,6 +29,7 @@ import org.apache.qpid.client.state.AMQState; import org.apache.qpid.client.state.AMQStateManager; import org.apache.qpid.client.state.IllegalStateTransitionException; import org.apache.qpid.client.state.StateAwareMethodListener; +import org.apache.qpid.client.protocol.AMQProtocolSession; import org.apache.qpid.framing.*; import java.util.HashMap; @@ -43,9 +44,9 @@ public class ClientHandlerRegistry extends AMQStateManager private final Map<AMQState, ClientRegistry> _handlers = new HashMap<AMQState, ClientRegistry>(); private final MemberHandle _identity; - protected ClientHandlerRegistry(MemberHandle local) + protected ClientHandlerRegistry(MemberHandle local, AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false); + super(AMQState.CONNECTION_NOT_STARTED, false, protocolSession); _identity = local; diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java index 89f402c1b9..352928b121 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusterBuilder.java @@ -46,7 +46,7 @@ class ClusterBuilder ServerHandlerRegistry getHandlerRegistry() { - return new ServerHandlerRegistry(getHandlerFactory()); + return new ServerHandlerRegistry(getHandlerFactory(), null, null, null); } private MethodHandlerFactory getHandlerFactory() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java index 6e7efb3659..c1306b4c13 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolHandler.java @@ -33,6 +33,7 @@ import org.apache.qpid.framing.ConnectionTuneOkBody; import org.apache.qpid.framing.ClusterMembershipBody; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQPFastProtocolHandler; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -73,9 +74,9 @@ public class ClusteredProtocolHandler extends AMQPFastProtocolHandler implements _handlers = handler._handlers; } - protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQCodecFactory codec) throws AMQException + protected void createSession(IoSession session, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession protocolSession, AMQCodecFactory codec) throws AMQException { - new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers)); + new ClusteredProtocolSession(session, queues, exchanges, codec, new ServerHandlerRegistry(_handlers, queues, exchanges, protocolSession)); } void connect(String join) throws Exception diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java index 1763bcd03f..04c5f7b451 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java @@ -37,10 +37,12 @@ public class ClusteredProtocolSession extends AMQMinaProtocolSession { private MemberHandle _peer; - public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) - throws AMQException + public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException +// public ClusteredProtocolSession(IoSession session, QueueRegistry queueRegistry, +// ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory) throws AMQException { super(session, queueRegistry, exchangeRegistry, codecFactory, stateManager); +// super(session, queueRegistry, exchangeRegistry, codecFactory); } public boolean isPeerSession() diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java index 8557fc17c7..f447895013 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/MinaBrokerProxy.java @@ -63,7 +63,7 @@ public class MinaBrokerProxy extends Broker implements MethodHandler { super(host, port); _local = local; - _legacyHandler = new ClientHandlerRegistry(local); + _legacyHandler = new ClientHandlerRegistry(local, null); } private void init(IoSession session) diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java index 71c53146a8..27d5629f27 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/ServerHandlerRegistry.java @@ -27,6 +27,9 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.IllegalStateTransitionException; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.cluster.util.LogMessage; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.protocol.AMQProtocolSession; import java.util.HashMap; import java.util.Map; @@ -40,20 +43,23 @@ class ServerHandlerRegistry extends AMQStateManager private final Logger _logger = Logger.getLogger(ServerHandlerRegistry.class); private final Map<AMQState, MethodHandlerRegistry> _handlers = new HashMap<AMQState, MethodHandlerRegistry>(); - ServerHandlerRegistry() + ServerHandlerRegistry(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + AMQProtocolSession protocolSession) { - super(AMQState.CONNECTION_NOT_STARTED, false); + super(AMQState.CONNECTION_NOT_STARTED, false, queueRegistry, exchangeRegistry, protocolSession); } - ServerHandlerRegistry(ServerHandlerRegistry s) + ServerHandlerRegistry(ServerHandlerRegistry s, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(); + this(queueRegistry, exchangeRegistry, protocolSession); _handlers.putAll(s._handlers); } - ServerHandlerRegistry(MethodHandlerFactory factory) + ServerHandlerRegistry(MethodHandlerFactory factory, QueueRegistry queueRegistry, + ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(); + this(queueRegistry, exchangeRegistry, protocolSession); init(factory); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java index 6596da1f8f..f77b5084f3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java @@ -18,12 +18,8 @@ * under the License. * */ -package org.apache.qpid.server.protocol; +package org.apache.qpid.protocol; -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.framing.AMQMethodBody; /** @@ -43,14 +39,11 @@ public interface AMQMethodListener * to all registered listeners using the error() method (see below) allowing them to * perform cleanup if necessary. */ - <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt, - AMQProtocolSession protocolSession, - QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry) throws AMQException; + <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws Exception; /** * Callback when an error has occurred. Allows listeners to clean up. * @param e */ - void error(AMQException e); + void error(Exception e); } |
