diff options
Diffstat (limited to 'java/broker/src')
3 files changed, 28 insertions, 72 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java deleted file mode 100644 index 6596da1f8f..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMethodListener.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol; - -import org.apache.qpid.AMQException; -import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.framing.AMQMethodBody; - -/** - * Interface that allows classes to register for interest in protocol method frames. - * - */ -public interface AMQMethodListener -{ - /** - * Invoked when a method frame has been received - * @param evt the event that contains the method and channel - * @param protocolSession the protocol session associated with the event - * @return true if the handler has processed the method frame, false otherwise. Note - * that this does not prohibit the method event being delivered to subsequent listeners - * but can be used to determine if nobody has dealt with an incoming method frame. - * @throws AMQException if an error has occurred. This exception will be delivered - * to all registered listeners using the error() method (see below) allowing them to - * perform cleanup if necessary. - */ - <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt, - AMQProtocolSession protocolSession, - QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry) throws AMQException; - - /** - * Callback when an error has occurred. Allows listeners to clean up. - * @param e - */ - void error(AMQException e); -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 415b7a3c68..fa43b8809d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -29,6 +29,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.codec.AMQDecoder; @@ -99,10 +100,19 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQCodecFactory codecFactory) throws AMQException { - this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); + _stateManager = new AMQStateManager(queueRegistry, exchangeRegistry, this); + _minaProtocolSession = session; + session.setAttachment(this); + + _queueRegistry = queueRegistry; + _exchangeRegistry = exchangeRegistry; + _codecFactory = codecFactory; + _managedObject = createMBean(); + _managedObject.register(); +// this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } - public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, + public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQCodecFactory codecFactory, AMQStateManager stateManager) throws AMQException { @@ -208,13 +218,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, (AMQMethodBody) frame.bodyFrame); try { - boolean wasAnyoneInterested = _stateManager.methodReceived(evt, this, _queueRegistry, _exchangeRegistry); + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); if(!_frameListeners.isEmpty()) { for (AMQMethodListener listener : _frameListeners) { - wasAnyoneInterested = listener.methodReceived(evt, this, _queueRegistry, _exchangeRegistry) || + wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested; } } @@ -233,7 +243,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _logger.error("Closing connection due to: " + e.getMessage()); writeFrame(e.getCloseFrame(frame.channel)); } - catch (AMQException e) + catch (Exception e) { _stateManager.error(e); for (AMQMethodListener listener : _frameListeners) diff --git a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java index fb78b0d8b7..70e530699e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java @@ -25,7 +25,7 @@ import org.apache.qpid.framing.*; import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.handler.*; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.server.protocol.AMQMethodListener; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueRegistry; import org.apache.log4j.Logger; @@ -43,7 +43,9 @@ import java.util.concurrent.CopyOnWriteArraySet; public class AMQStateManager implements AMQMethodListener { private static final Logger _logger = Logger.getLogger(AMQStateManager.class); - + private final QueueRegistry _queueRegistry; + private final ExchangeRegistry _exchangeRegistry; + private final AMQProtocolSession _protocolSession; /** * The current state */ @@ -58,13 +60,16 @@ public class AMQStateManager implements AMQMethodListener private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>(); - public AMQStateManager() + public AMQStateManager(QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { - this(AMQState.CONNECTION_NOT_STARTED, true); + this(AMQState.CONNECTION_NOT_STARTED, true, queueRegistry, exchangeRegistry, protocolSession); } - protected AMQStateManager(AMQState initial, boolean register) + protected AMQStateManager(AMQState initial, boolean register, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession) { + _queueRegistry = queueRegistry; + _exchangeRegistry = exchangeRegistry; + _protocolSession = protocolSession; _currentState = initial; if (register) { @@ -149,7 +154,7 @@ public class AMQStateManager implements AMQMethodListener } } - public void error(AMQException e) + public void error(Exception e) { _logger.error("State manager received error notification: " + e, e); for (StateListener l : _stateListeners) @@ -158,15 +163,12 @@ public class AMQStateManager implements AMQMethodListener } } - public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt, - AMQProtocolSession protocolSession, - QueueRegistry queueRegistry, - ExchangeRegistry exchangeRegistry) throws AMQException + public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException { StateAwareMethodListener<B> handler = findStateTransitionHandler(_currentState, evt.getMethod()); if (handler != null) { - handler.methodReceived(this, queueRegistry, exchangeRegistry, protocolSession, evt); + handler.methodReceived(this, _queueRegistry, _exchangeRegistry, _protocolSession, evt); return true; } return false; |
