diff options
Diffstat (limited to 'java/broker/src/main')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java | 16 | ||||
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java | 87 |
2 files changed, 93 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index 8faf5eedde..9a8fce7129 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -26,9 +26,11 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.AMQChannel; public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody> { @@ -51,11 +53,21 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos ChannelCloseBody body = evt.getMethod(); _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + " and method " + body.methodId); - session.closeChannel(evt.getChannelId()); + int channelId = evt.getChannelId(); + + AMQChannel channel = session.getChannel(channelId); + + if (channel == null) + { + throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel"); + } + + session.closeChannel(channelId); + // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. // Be aware of possible changes to parameter order as versions change. - AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0); + AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0); session.writeFrame(response); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 309fa4663a..2de32c2f0f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -56,9 +56,11 @@ import org.apache.qpid.framing.MainRegistry; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ProtocolVersionList; import org.apache.qpid.framing.VersionSpecificRegistry; +import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.pool.ReadWriteThreadModel; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; +import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -254,12 +256,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Method frame received: " + frame); } + final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) frame.getBodyFrame()); + + //Check that this channel is not closing + if (channelAwaitingClosure(frame.getChannel())) + { + if ((evt.getMethod() instanceof ChannelCloseOkBody)) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok"); + } + } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring"); + } + return; + } + } + + try { try { + boolean wasAnyoneInterested = _stateManager.methodReceived(evt); if (!_frameListeners.isEmpty()) @@ -277,14 +303,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } catch (AMQChannelException e) { - _logger.error("Closing channel due to: " + e.getMessage()); - writeFrame(e.getCloseFrame(frame.getChannel())); - closeChannel(frame.getChannel()); + if (getChannel(frame.getChannel()) != null) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Closing channel due to: " + e.getMessage()); + } + writeFrame(e.getCloseFrame(frame.getChannel())); + closeChannel(frame.getChannel()); + } + else + { + if (_logger.isDebugEnabled()) + { + _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage()); + } + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } + closeSession(); + + AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR, + AMQConstant.CHANNEL_ERROR.getName().toString()); + + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + writeFrame(ce.getCloseFrame(frame.getChannel())); + } } catch (AMQConnectionException e) { - _logger.error("Closing connection due to: " + e.getMessage()); + if (_logger.isInfoEnabled()) + { + _logger.info("Closing connection due to: " + e.getMessage()); + } closeSession(); + _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(frame.getChannel())); } } @@ -325,8 +379,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content header frame received: " + frame); } - //fixme what happens if getChannel returns null - getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); + + AMQChannel channel = getChannel(frame.getChannel()); + + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); + } + else + { + channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame()); + } } private void contentBodyReceived(AMQFrame frame) throws AMQException @@ -335,8 +398,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, { _logger.debug("Content body frame received: " + frame); } - //fixme what happens if getChannel returns null - getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this); + AMQChannel channel = getChannel(frame.getChannel()); + + if (channel == null) + { + throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel()); + } + else + { + channel.publishContentBody((ContentBody) frame.getBodyFrame(), this); + } } /** |
