summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-22 20:58:01 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-22 20:58:01 +0000
commitabfd652c09f7a267b175d7105ca0b9c1f618e368 (patch)
treebc62f61a3a7fe28b0ad7f95b10a05035370e3f14
parent9c249dfc99dd8c58c387bc085f0d71ec9b78ad9e (diff)
downloadqpid-python-abfd652c09f7a267b175d7105ca0b9c1f618e368.tar.gz
Added session close convinience methods to broker ProtocolSession, modified handlers that need to close a session to use new methods. Added logger to RequestManager and ResponseManager.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@498797 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java17
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java56
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java159
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java13
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java25
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestManager.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java17
10 files changed, 153 insertions, 177 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
index fc6a185aff..4bde15b4b0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
@@ -51,8 +51,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos
AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
{
ChannelCloseBody body = evt.getMethod();
- _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
- " and method " + body.methodId);
+ _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " +
+ body.classId + " and method " + body.methodId);
protocolSession.closeChannelResponse(evt.getChannelId(), evt.getRequestId());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
index c622309062..b47bb0ff34 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
@@ -51,19 +51,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C
AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
{
final ConnectionCloseBody body = evt.getMethod();
- _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
- body.replyText + " for " + protocolSession);
- // Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor())); // AMQP minor version
- try
- {
- protocolSession.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
+ _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode +
+ "/" + body.replyText + " for " + protocolSession);
+ protocolSession.closeSessionResponse(evt.getRequestId());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
index c772b6d7c1..941c1b7e76 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
@@ -49,17 +49,7 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<
public void methodReceived(AMQProtocolSession protocolSession,
AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
{
- //todo should this not do more than just log the method?
_logger.info("Received Connection-close-ok");
-
- try
- {
- protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
- protocolSession.closeSession();
- }
- catch (Exception e)
- {
- _logger.error("Error closing protocol session: " + e, e);
- }
+ protocolSession.closeSession();
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
index 599c58f5af..d7c927110a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
@@ -76,16 +76,9 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener
// Can't do this as we violate protocol. Need to send Close
// throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
_logger.info("Authentication failed");
- stateManager.changeState(AMQState.CONNECTION_CLOSING);
- // Be aware of possible changes to parameter order as versions change.
- AMQMethodBody close = ConnectionCloseBody.createMethodBody(
- major, minor, // AMQP version (major, minor)
- ConnectionCloseBody.getClazz(major, minor), // classId
- ConnectionCloseBody.getMethod(major, minor), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- AMQConstant.NOT_ALLOWED.getName()); // replyText
- protocolSession.writeResponse(evt, close);
disposeSaslServer(protocolSession);
+ protocolSession.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+ AMQConstant.NOT_ALLOWED.getName(), body.getClazz(), body.getMethod());
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
index 4b61d4767f..61da80a2d1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
@@ -23,20 +23,15 @@ package org.apache.qpid.server.handler;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody>
@@ -77,14 +72,11 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo
{
session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(),
"No such queue, '" + body.queue + "'");
-// channelClose(session, channelId, stateManager,
-// "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND);
}
else
{
- connectionClose(session, channelId, session.getStateManager(),
- "No queue name provided, no default queue defined.",
- AMQConstant.NOT_ALLOWED);
+ session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+ "No queue name provided, no default queue defined.", body.getClazz(), body.getMethod());
}
}
else
@@ -103,54 +95,18 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo
}
catch (AMQInvalidSelectorException ise)
{
- _log.info("Closing connection due to invalid selector");
+ _log.info("Closing connection due to invalid selector: " + ise.getMessage());
session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(),
ise.getMessage());
-// channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR);
}
catch (ConsumerTagNotUniqueException e)
{
- connectionClose(session, channelId, session.getStateManager(),
- "Non-unique consumer tag, '" + body.destination + "'",
- AMQConstant.NOT_ALLOWED);
+ _log.info("Closing connection due to duplicate (non-unique) consumer tag: " + e.getMessage());
+ session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+ "Non-unique consumer tag, '" + body.destination + "'", body.getClazz(), body.getMethod());
}
}
}
}
-
-// private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
-// String message, AMQConstant code)
-// throws AMQException
-// {
-// /*AMQShort*/String msg = new /*AMQShort*/String(message);
-// // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-// // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-// // Be aware of possible changes to parameter order as versions change.
-// session.writeRequest(channelId, ChannelCloseBody.createMethodBody
-// ((byte)0, (byte)9, // AMQP version (major, minor)
-// MessageConsumeBody.getClazz((byte)0, (byte)9), // classId
-// MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId
-// code.getCode(), // replyCode
-// msg), // replyText
-// listener);
-// }
-
- private void connectionClose(AMQProtocolSession session, int channelId, AMQMethodListener listener,
- String message, AMQConstant code)
- throws AMQException
- {
- byte major = session.getMajor();
- byte minor = session.getMinor();
- /*AMQShort*/String msg = new /*AMQShort*/String(message);
- // Be aware of possible changes to parameter order as versions change.
- session.writeRequest(channelId, ConnectionCloseBody.createMethodBody(
- major, minor, // AMQP version (major, minor)
- MessageConsumeBody.getClazz(major, minor), // classId
- MessageConsumeBody.getMethod(major, minor), // methodId
- code.getCode(), // replyCode
- msg), // replyText
- listener);
- }
-
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 8eb914758b..2c14323df8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -33,6 +33,9 @@ import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.Content;
import org.apache.qpid.framing.FieldTable;
@@ -59,6 +62,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.AMQState;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -100,6 +104,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
private Object _lastSent;
+ private boolean _closePending;
private boolean _closed;
// maximum number of channels this session should have
private long _maxNoOfChannels = 1000;
@@ -128,6 +133,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
+ _closePending = false;
+ _closed = false;
}
public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry,
@@ -143,6 +150,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_codecFactory = codecFactory;
_managedObject = createMBean();
_managedObject.register();
+ _closePending = false;
+ _closed = false;
}
private AMQProtocolSessionMBean createMBean() throws AMQException
@@ -168,7 +177,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return (AMQProtocolSession) minaProtocolSession.getAttachment();
}
- private AMQChannel createChannel(int id) throws AMQException {
+ private AMQChannel createChannel(int id) throws AMQException
+ {
IApplicationRegistry registry = ApplicationRegistry.getInstance();
AMQChannel channel = new AMQChannel(id, registry.getMessageStore(),
_exchangeRegistry, this, _stateManager);
@@ -221,12 +231,22 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
- else
+ else if(!_closed)
{
AMQFrame frame = (AMQFrame) message;
-
AMQChannel channel = getChannel(frame.channel);
- if (channel == null)
+
+ if (_closePending)
+ {
+ // If a close is pending (ie ChannelClose has been sent, but no ChannelCloseOk received), then
+ // all methods except ChannelCloseOk must be rejected. (AMQP spec)
+ if((frame.bodyFrame instanceof AMQRequestBody))
+ throw new AMQException("Incoming request frame on connection which is pending close.");
+ AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+ if (!(requestBody.getMethodPayload() instanceof ConnectionCloseOkBody))
+ throw new AMQException("Incoming frame on unopened channel is not a Connection.Open method.");
+ }
+ else if (channel == null)
{
// Perform a check on incoming frames that may result in a new channel
// being opened. The frame MUST be:
@@ -235,12 +255,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
// c. Must be a ConnectionOpenBody method.
// Throw an exception for all other incoming frames on an unopened channel
if(!(frame.bodyFrame instanceof AMQRequestBody))
- throw new AMQException("Incoming frame on unopened channel not a request");
+ throw new AMQException("Incoming frame on unopened channel is not a request.");
AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
- if (requestBody.getMethodPayload() instanceof ConnectionOpenBody)
- throw new AMQException("Incoming frame on unopened channel not a Connection.Open method");
+ if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody))
+ throw new AMQException("Incoming frame on unopened channel is not a Channel.Open method.");
if (requestBody.getRequestId() != 1)
- throw new AMQException("Incoming Connection.Open frame on unopened channel does not have a request id = 1");
+ throw new AMQException("Incoming Channel.Open frame on unopened channel does not have a request id = 1.");
channel = createChannel(frame.channel);
}
@@ -391,17 +411,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
channel.rollback();
}
}
+
+ // Used to initiate a channel close from the server side and inform the client
+ public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+ {
+ final AMQChannel channel = _channelMap.get(channelId);
+ if (channel == null)
+ {
+ throw new IllegalArgumentException("Unknown channel id " + channelId);
+ }
+ else
+ {
+ channel.close(this);
+ // Be aware of possible changes to parameter order as versions change.
+ AMQMethodBody cf = ChannelCloseBody.createMethodBody
+ (_major, _minor, // AMQP version (major, minor)
+ MessageTransferBody.getClazz((byte)0, (byte)9), // classId
+ MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
+ replyCode, // replyCode
+ replyText); // replyText
+ writeRequest(channelId, cf);
+ // Wait a bit for the Channel.CloseOk to come in from the client, but don't
+ // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk
+ // method handler has not already done so.
+ // TODO - Find a better way of doing this without holding up this thread...
+ try { Thread.currentThread().sleep(2000); } // 2 seconds
+ catch (InterruptedException e) {}
+ _channelMap.remove(channelId); // Returns null if already removed (by closeOk handler
+ }
+ }
- /**
- * Close a specific channel. This will remove any resources used by the channel, including:
- * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
- * </ul>
- *
- * @param channelId id of the channel to close
- * @param requestId RequestId of recieved Channel.Close reuqest, used to send Channel.CloseOk response
- * @throws AMQException if an error occurs closing the channel
- * @throws IllegalArgumentException if the channel id is not valid
- */
// Used to close a channel as a response to a client close request
public void closeChannelResponse(int channelId, long requestId) throws AMQException
{
@@ -425,33 +464,52 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
}
}
}
+
+ // Used to initiate a connection close from the server side and inform the client
+ public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException
+ {
+ _closePending = true; // This prevents all methods except Close-Ok from being accepted
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ AMQMethodBody close = ConnectionCloseBody.createMethodBody(
+ _major, _minor, // AMQP version (major, minor)
+ classId, // classId
+ methodId, // methodId
+ replyCode, // replyCode
+ replyText); // replyText
+ writeRequest(0, close);
+ // Wait a bit for the Connection.CloseOk to come in from the client, but don't
+ // rely on it. Attempt to close the connection if the ConnectionCloseOk
+ // method handler has not already done so.
+ // TODO - Find a better way of doing this without holding up this thread...
+ try { Thread.currentThread().sleep(2000); } // 2 seconds
+ catch (InterruptedException e) {}
+ closeSession();
+ }
- // Used to close a channel from the server side and inform the client
- public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException
+ public void closeSessionRequest(int replyCode, String replyText) throws AMQException
{
- final AMQChannel channel = _channelMap.get(channelId);
- if (channel == null)
- {
- throw new IllegalArgumentException("Unknown channel id");
- }
- else
+ closeSessionRequest(replyCode, replyText, 0, 0);
+ }
+
+ // Used to close a connection as a response to a client close request
+ public void closeSessionResponse(long requestId) throws AMQException
+ {
+ // Be aware of possible changes to parameter order as versions change.
+ writeResponse(0, requestId, ConnectionCloseOkBody.createMethodBody(_major, _minor)); // AMQP version
+ closeSession();
+ }
+
+ public void closeSession() throws AMQException
+ {
+ if (!_closed)
{
- channel.close(this);
- // Be aware of possible changes to parameter order as versions change.
- AMQMethodBody cf = ChannelCloseBody.createMethodBody
- (_major, _minor, // AMQP version (major, minor)
- MessageTransferBody.getClazz((byte)0, (byte)9), // classId
- MessageTransferBody.getMethod((byte)0, (byte)9), // methodId
- replyCode, // replyCode
- replyText); // replyText
- writeRequest(channelId, cf);
- // Wait a bit for the Channel.CloseOk to come in from the client, but don't
- // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk
- // method handler has not already done so.
- // TODO - Find a better way of doing this without holding up this thread...
- try { Thread.currentThread().sleep(2000); } // 2 seconds
- catch (InterruptedException e) {}
- _channelMap.remove(channelId); // Returns null if already removed
+ _closed = true;
+ closeAllChannels();
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ if (_managedObject != null)
+ {
+ _managedObject.unregister();
+ }
}
}
@@ -494,23 +552,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
_channelMap.clear();
}
- /**
- * This must be called when the session is _closed in order to free up any resources
- * managed by the session.
- */
- public void closeSession() throws AMQException
- {
- if (!_closed)
- {
- _closed = true;
- closeAllChannels();
- if (_managedObject != null)
- {
- _managedObject.unregister();
- }
- }
- }
-
public String toString()
{
return "AMQProtocolSession(" + _minaProtocolSession.getRemoteAddress() + ")";
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
index 0a89d70734..b3c97669c5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
@@ -174,18 +174,9 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco
}
else
{
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- 200, // replyCode
- throwable.getMessage()); // replyText
- session.writeRequest(0, closeBody, methodListener);
_logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
- protocolSession.close();
+ // TODO: Closing with code 200 ("reply-sucess") ??? This cannot be right!
+ session.closeSessionRequest(200, throwable.getMessage());
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 6c5af02dd3..f9e5439890 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -72,19 +72,18 @@ public interface AMQProtocolSession extends AMQProtocolWriter
*/
void addChannel(AMQChannel channel) throws AMQException;
- /**
- * Close a specific channel. This will remove any resources used by the channel, including:
- * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li>
- * </ul>
- * @param channelId id of the channel to close
- * @param requestId id of the request that initiated the close, used in response
- * @throws org.apache.qpid.AMQException if an error occurs closing the channel
- * @throws IllegalArgumentException if the channel id is not valid
- */
+ void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException;
+
void closeChannelResponse(int channelId, long requestId) throws AMQException;
- void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException;
+ void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException;
+ void closeSessionRequest(int replyCode, String replyText) throws AMQException;
+
+ void closeSessionResponse(long requestId) throws AMQException;
+
+ void closeSession() throws AMQException;
+
/**
* Remove a channel from the session but do not close it.
* @param channelId
@@ -98,12 +97,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter
void initHeartbeats(int delay);
/**
- * This must be called when the session is _closed in order to free up any resources
- * managed by the session.
- */
- void closeSession() throws AMQException;
-
- /**
* @return a key that uniquely identifies this session
*/
Object getKey();
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
index 1bad249bc2..ca8735cb62 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
@@ -22,12 +22,16 @@ package org.apache.qpid.framing;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQProtocolWriter;
public class RequestManager
{
+ private static final Logger logger = Logger.getLogger(RequestManager.class);
+
private int channel;
private AMQProtocolWriter protocolWriter;
@@ -71,7 +75,11 @@ public class RequestManager
lastProcessedResponseId, requestMethodBody);
requestSentMap.put(requestId, methodListener);
protocolWriter.writeFrame(requestFrame);
- // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel +
+ " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
+ }
return requestId;
}
@@ -80,7 +88,11 @@ public class RequestManager
{
long requestIdStart = responseBody.getRequestId();
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
- // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + " " + responseBody + "; " + responseBody.getMethodPayload());
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel +
+ " " + responseBody + "; " + responseBody.getMethodPayload());
+ }
for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
{
AMQMethodListener methodListener = requestSentMap.get(requestId);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index 90f33a08c0..8bc526900a 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -23,6 +23,8 @@ package org.apache.qpid.framing;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -30,6 +32,8 @@ import org.apache.qpid.protocol.AMQProtocolWriter;
public class ResponseManager
{
+ private static final Logger logger = Logger.getLogger(ResponseManager.class);
+
private int channel;
private AMQMethodListener methodListener;
private AMQProtocolWriter protocolWriter;
@@ -113,12 +117,15 @@ public class ResponseManager
public void requestReceived(AMQRequestBody requestBody) throws Exception
{
long requestId = requestBody.getRequestId();
- // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload());
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
+ " " + requestBody + "; " + requestBody.getMethodPayload());
+ }
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
lastReceivedRequestId = requestId;
responseMap.put(requestId, new ResponseStatus(requestId));
- // TODO: Update MethodEvent to use the RequestBody instead of MethodBody
AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload(), requestId);
methodListener.methodReceived(methodEvent);
}
@@ -126,7 +133,11 @@ public class ResponseManager
public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
throws RequestResponseMappingException
{
- // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + " Res[# " + requestId + "]; " + responseMethodBody);
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel +
+ " Res[# " + requestId + "]; " + responseMethodBody);
+ }
ResponseStatus responseStatus = responseMap.get(requestId);
if (responseStatus == null)
throw new RequestResponseMappingException(requestId,