From e3ff96a57778ea2325551dff110e4bedf7e1d4d5 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 19 Feb 2007 11:55:47 +0000 Subject: QPID-372, QPID-376 Broker now ignores all frames for closing channels. When a close-ok is received the channel can be reopened and used All uses of getChannel check the return type is not null and throw a NOT_FOUND AMQException. If the channel is not found during a method handler then the Channel will be closed. ChannelCloseHandler - Now throws a connection exception if trying to close a a non exisitant channel. AMQMinaProtocolSession - Added pre-check for closing channels to ignore all but Close-OK methods - Updated ChannelException method to close connection if the CE was a result of not having a valid channel. - Changed state to CLOSING when writing out a connection close frame. AMQConnection - Wrapped all _logging calls , Updated comment formatting AMQSession - called startDispatcherIfRequired when receiving a message as without it a producer will not get a returned message. This is because there is no consumer setup to consume. ConnectionCloseMethodHandler - Wrapped code in try finally so that the protocol session would always be closed correctly. AMQStateManager - Added state to the logging values Modified AMQTimeoutException to include a new constant value to identify the failure reason. AMQConstant - Added 408 REQUEST_TIMEOUT fixed error with NOT_ALLOWED value was 530 should be 507. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509172 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/handler/ChannelCloseHandler.java | 16 +++- .../server/protocol/AMQMinaProtocolSession.java | 87 ++++++++++++++++++++-- 2 files changed, 93 insertions(+), 10 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 8faf5eedde..9a8fce7129 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -26,9 +26,11 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class ChannelCloseHandler implements StateAwareMethodListener { @@ -51,11 +53,21 @@ public class ChannelCloseHandler implements StateAwareMethodListener evt = new AMQMethodEvent(frame.getChannel(), (AMQMethodBody) frame.getBodyFrame()); + + //Check that this channel is not closing + if (channelAwaitingClosure(frame.getChannel())) + { + if ((evt.getMethod() instanceof ChannelCloseOkBody)) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok"); + } + } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring"); + } + return; + } + } + + try { try { + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); if (!_frameListeners.isEmpty()) @@ -277,14 +303,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } catch (AMQChannelException e) { - _logger.error("Closing channel due to: " + e.getMessage()); - writeFrame(e.getCloseFrame(frame.getChannel())); - closeChannel(frame.getChannel()); + if (getChannel(frame.getChannel()) != null) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing channel due to: " + e.getMessage()); + } + writeFrame(e.getCloseFrame(frame.getChannel())); + closeChannel(frame.getChannel()); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); + } + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + closeSession(); + + AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); + + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(ce.getCloseFrame(frame.getChannel())); + } } catch (AMQConnectionException e) { - _logger.error("Closing connection due to: " + e.getMessage()); + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(frame.getChannel())); } } @@ -325,8 +379,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } - //fixme what happens if getChannel returns null - getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); + + AMQChannel channel = getChannel(frame.getChannel()); + + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); + } + else + { + channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); + } } private void contentBodyReceived(AMQFrame frame) throws AMQException @@ -335,8 +398,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } - //fixme what happens if getChannel returns null - getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this); + AMQChannel channel = getChannel(frame.getChannel()); + + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); + } + else + { + channel.publishContentBody((ContentBody) frame.getBodyFrame(), this); + } } /** -- cgit v1.2.1