summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-02-19 11:55:47 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-02-19 11:55:47 +0000
commite3ff96a57778ea2325551dff110e4bedf7e1d4d5 (patch)
treee77ab4b679d132a86bb206df8f730658a7c74b87 /java/broker
parent81fa6193269cf9f6cacd03966d21d7448126653a (diff)
downloadqpid-python-e3ff96a57778ea2325551dff110e4bedf7e1d4d5.tar.gz
QPID-372, QPID-376 Broker now ignores all frames for closing channels.
When a close-ok is received the channel can be reopened and used All uses of getChannel check the return type is not null and throw a NOT_FOUND AMQException. If the channel is not found during a method handler then the Channel will be closed. ChannelCloseHandler - Now throws a connection exception if trying to close a a non exisitant channel. AMQMinaProtocolSession - Added pre-check for closing channels to ignore all but Close-OK methods - Updated ChannelException method to close connection if the CE was a result of not having a valid channel. - Changed state to CLOSING when writing out a connection close frame. AMQConnection - Wrapped all _logging calls , Updated comment formatting AMQSession - called startDispatcherIfRequired when receiving a message as without it a producer will not get a returned message. This is because there is no consumer setup to consume. ConnectionCloseMethodHandler - Wrapped code in try finally so that the protocol session would always be closed correctly. AMQStateManager - Added state to the logging values Modified AMQTimeoutException to include a new constant value to identify the failure reason. AMQConstant - Added 408 REQUEST_TIMEOUT fixed error with NOT_ALLOWED value was 530 should be 507. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@509172 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java87
2 files changed, 93 insertions, 10 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index 8faf5eedde..9a8fce7129 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -26,9 +26,11 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
{
@@ -51,11 +53,21 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
ChannelCloseBody body = evt.getMethod();
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
- session.closeChannel(evt.getChannelId());
+ int channelId = evt.getChannelId();
+
+ AMQChannel channel = session.getChannel(channelId);
+
+ if (channel == null)
+ {
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
+ }
+
+ session.closeChannel(channelId);
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
+ AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
session.writeFrame(response);
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 309fa4663a..2de32c2f0f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -56,9 +56,11 @@ import org.apache.qpid.framing.MainRegistry;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersionList;
import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -254,12 +256,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Method frame received: " + frame);
}
+
final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
(AMQMethodBody) frame.getBodyFrame());
+
+ //Check that this channel is not closing
+ if (channelAwaitingClosure(frame.getChannel()))
+ {
+ if ((evt.getMethod() instanceof ChannelCloseOkBody))
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + frame.getChannel() + "] awaiting closure - processing close-ok");
+ }
+ }
+ else
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Channel[" + frame.getChannel() + "] awaiting closure ignoring");
+ }
+ return;
+ }
+ }
+
+
try
{
try
{
+
boolean wasAnyoneInterested = _stateManager.methodReceived(evt);
if (!_frameListeners.isEmpty())
@@ -277,14 +303,42 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
catch (AMQChannelException e)
{
- _logger.error("Closing channel due to: " + e.getMessage());
- writeFrame(e.getCloseFrame(frame.getChannel()));
- closeChannel(frame.getChannel());
+ if (getChannel(frame.getChannel()) != null)
+ {
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing channel due to: " + e.getMessage());
+ }
+ writeFrame(e.getCloseFrame(frame.getChannel()));
+ closeChannel(frame.getChannel());
+ }
+ else
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("ChannelException occured on non-existent channel:" + e.getMessage());
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e.getMessage());
+ }
+ closeSession();
+
+ AMQConnectionException ce = evt.getMethod().getConnectionException(AMQConstant.CHANNEL_ERROR,
+ AMQConstant.CHANNEL_ERROR.getName().toString());
+
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(ce.getCloseFrame(frame.getChannel()));
+ }
}
catch (AMQConnectionException e)
{
- _logger.error("Closing connection due to: " + e.getMessage());
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing connection due to: " + e.getMessage());
+ }
closeSession();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
writeFrame(e.getCloseFrame(frame.getChannel()));
}
}
@@ -325,8 +379,17 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content header frame received: " + frame);
}
- //fixme what happens if getChannel returns null
- getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
+
+ AMQChannel channel = getChannel(frame.getChannel());
+
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
+ }
+ else
+ {
+ channel.publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
+ }
}
private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -335,8 +398,16 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
{
_logger.debug("Content body frame received: " + frame);
}
- //fixme what happens if getChannel returns null
- getChannel(frame.getChannel()).publishContentBody((ContentBody) frame.getBodyFrame(), this);
+ AMQChannel channel = getChannel(frame.getChannel());
+
+ if (channel == null)
+ {
+ throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + frame.getChannel());
+ }
+ else
+ {
+ channel.publishContentBody((ContentBody) frame.getBodyFrame(), this);
+ }
}
/**