summaryrefslogtreecommitdiff
path: root/java/broker/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java87
2 files changed, 93 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 8faf5eedde..9a8fce7129 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -26,9 +26,11 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
{
@@ -51,11 +53,21 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
ChannelCloseBody body = evt.getMethod();
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
- session.closeChannel(evt.getChannelId());
+ int channelId = evt.getChannelId();
+
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
+ }
+
+ session.closeChannel(channelId);
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+ AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 309fa4663a..2de32c2f0f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -56,9 +56,11 @@ import org.apache.qpid.framing.MainRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -254,12 +256,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Method frame received: " + frame);
}
+
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
(AMQMethodBody) frame.getBodyFrame());
+
+ //Check that this channel is not closing
+ if (channelAwaitingClosure(frame.getChannel()))
+ {
+ if ((evt.getMethod() instanceof ChannelCloseOkBody))
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring");
+ }
+ return;
+ }
+ }
+
+
try
{
try
{
+
boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
if (!_frameListeners.isEmpty())
@@ -277,14 +303,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
catch (AMQChannelException e)
{
- _logger.error("Closing channel due to: " + e.getMessage());
- writeFrame(e.getCloseFrame(frame.getChannel()));
- closeChannel(frame.getChannel());
+ if (getChannel(frame.getChannel()) != null)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing channel due to: " + e.getMessage());
+ }
+ writeFrame(e.getCloseFrame(frame.getChannel()));
+ closeChannel(frame.getChannel());
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e.getMessage());
+ }
+ closeSession();
+
+ AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
+
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(ce.getCloseFrame(frame.getChannel()));
+ }
}
catch (AMQConnectionException e)
{
- _logger.error("Closing connection due to: " + e.getMessage());
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e.getMessage());
+ }
closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(frame.getChannel()));
}
}
@@ -325,8 +379,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content header frame received: " + frame);
}
- //fixme what happens if getChannel returns null
- getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
+
+ AMQChannel channel = getChannel(frame.getChannel());
+
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
+ }
+ else
+ {
+ channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
+ }
}
private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -335,8 +398,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content body frame received: " + frame);
}
- //fixme what happens if getChannel returns null
- getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this);
+ AMQChannel channel = getChannel(frame.getChannel());
+
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
+ }
+ else
+ {
+ channel.publishContentBody((ContentBody) frame.getBodyFrame(), this);
+ }
}
/**