diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2007-02-19 11:55:47 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2007-02-19 11:55:47 +0000 |
| commit | e3ff96a57778ea2325551dff110e4bedf7e1d4d5 (patch) | |
| tree | e77ab4b679d132a86bb206df8f730658a7c74b87 /java/broker/src | |
| parent | 81fa6193269cf9f6cacd03966d21d7448126653a (diff) | |
| download | qpid-python-e3ff96a57778ea2325551dff110e4bedf7e1d4d5.tar.gz | |
QPID-372, QPID-376 Broker now ignores all frames for closing channels.
When a close-ok is received the channel can be reopened and used
All uses of getChannel check the return type is not null and throw a NOT_FOUND AMQException. If the channel is not found during a method handler then the Channel will be closed.
ChannelCloseHandler - Now throws a connection exception if trying to close a a non exisitant channel.
AMQMinaProtocolSession - Added pre-check for closing channels to ignore all but Close-OK methods
- Updated ChannelException method to close connection if the CE was a result of not having a valid channel.
- Changed state to CLOSING when writing out a connection close frame.
AMQConnection - Wrapped all _logging calls , Updated comment formatting
AMQSession - called startDispatcherIfRequired when receiving a message as without it a producer will not get a returned message. This is because there is no consumer setup to consume.
ConnectionCloseMethodHandler - Wrapped code in try finally so that the protocol session would always be closed correctly.
AMQStateManager - Added state to the logging values
Modified AMQTimeoutException to include a new constant value to identify the failure reason.
AMQConstant - Added 408 REQUEST_TIMEOUT fixed error with NOT_ALLOWED value was 530 should be 507.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509172 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java | 16 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java | 87 |
2 files changed, 93 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 8faf5eedde..9a8fce7129 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -26,9 +26,11 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody> { @@ -51,11 +53,21 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos ChannelCloseBody body = evt.getMethod(); _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + " and method " + body.methodId); - session.closeChannel(evt.getChannelId()); + int channelId = evt.getChannelId(); + + AMQChannel channel = session.getChannel(channelId); + + if (channel == null) + { + throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel"); + } + + session.closeChannel(channelId); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); + AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0); session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 309fa4663a..2de32c2f0f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -56,9 +56,11 @@ import org.apache.qpid.framing.MainRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -254,12 +256,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Method frame received: " + frame); } + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) frame.getBodyFrame()); + + //Check that this channel is not closing + if (channelAwaitingClosure(frame.getChannel())) + { + if ((evt.getMethod() instanceof ChannelCloseOkBody)) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok"); + } + } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring"); + } + return; + } + } + + try { try { + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); if (!_frameListeners.isEmpty()) @@ -277,14 +303,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } catch (AMQChannelException e) { - _logger.error("Closing channel due to: " + e.getMessage()); - writeFrame(e.getCloseFrame(frame.getChannel())); - closeChannel(frame.getChannel()); + if (getChannel(frame.getChannel()) != null) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing channel due to: " + e.getMessage()); + } + writeFrame(e.getCloseFrame(frame.getChannel())); + closeChannel(frame.getChannel()); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); + } + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + closeSession(); + + AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); + + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(ce.getCloseFrame(frame.getChannel())); + } } catch (AMQConnectionException e) { - _logger.error("Closing connection due to: " + e.getMessage()); + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(frame.getChannel())); } } @@ -325,8 +379,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } - //fixme what happens if getChannel returns null - getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); + + AMQChannel channel = getChannel(frame.getChannel()); + + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); + } + else + { + channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); + } } private void contentBodyReceived(AMQFrame frame) throws AMQException @@ -335,8 +398,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } - //fixme what happens if getChannel returns null - getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this); + AMQChannel channel = getChannel(frame.getChannel()); + + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); + } + else + { + channel.publishContentBody((ContentBody) frame.getBodyFrame(), this); + } } /** |
