diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-22 20:58:01 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-22 20:58:01 +0000 |
| commit | abfd652c09f7a267b175d7105ca0b9c1f618e368 (patch) | |
| tree | bc62f61a3a7fe28b0ad7f95b10a05035370e3f14 | |
| parent | 9c249dfc99dd8c58c387bc085f0d71ec9b78ad9e (diff) | |
| download | qpid-python-abfd652c09f7a267b175d7105ca0b9c1f618e368.tar.gz | |
Added session close convinience methods to broker ProtocolSession, modified handlers that need to close a session to use new methods. Added logger to RequestManager and ResponseManager.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@498797 13f79535-47bb-0310-9956-ffa450edef68
10 files changed, 153 insertions, 177 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index fc6a185aff..4bde15b4b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -51,8 +51,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener<ChannelClos AMQMethodEvent<ChannelCloseBody> evt) throws AMQException { ChannelCloseBody body = evt.getMethod(); - _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + - " and method " + body.methodId); + _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + + body.classId + " and method " + body.methodId); protocolSession.closeChannelResponse(evt.getChannelId(), evt.getRequestId()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index c622309062..b47bb0ff34 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -51,19 +51,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener<C AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException { final ConnectionCloseBody body = evt.getMethod(); - _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" + - body.replyText + " for " + protocolSession); - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody( - protocolSession.getMajor(), // AMQP major version - protocolSession.getMinor())); // AMQP minor version - try - { - protocolSession.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } + _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + + "/" + body.replyText + " for " + protocolSession); + protocolSession.closeSessionResponse(evt.getRequestId()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java index c772b6d7c1..941c1b7e76 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java @@ -49,17 +49,7 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener< public void methodReceived(AMQProtocolSession protocolSession, AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException { - //todo should this not do more than just log the method? _logger.info("Received Connection-close-ok"); - - try - { - protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - protocolSession.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } + protocolSession.closeSession(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 599c58f5af..d7c927110a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -76,16 +76,9 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener // Can't do this as we violate protocol. Need to send Close // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName()); _logger.info("Authentication failed"); - stateManager.changeState(AMQState.CONNECTION_CLOSING); - // Be aware of possible changes to parameter order as versions change. - AMQMethodBody close = ConnectionCloseBody.createMethodBody( - major, minor, // AMQP version (major, minor) - ConnectionCloseBody.getClazz(major, minor), // classId - ConnectionCloseBody.getMethod(major, minor), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - AMQConstant.NOT_ALLOWED.getName()); // replyText - protocolSession.writeResponse(evt, close); disposeSaslServer(protocolSession); + protocolSession.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(), + AMQConstant.NOT_ALLOWED.getName(), body.getClazz(), body.getMethod()); break; case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java index 4b61d4767f..61da80a2d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java @@ -23,20 +23,15 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidSelectorException; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.MessageConsumeBody; import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody> @@ -77,14 +72,11 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo { session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(), "No such queue, '" + body.queue + "'"); -// channelClose(session, channelId, stateManager, -// "No such queue, '" + body.queue + "'", AMQConstant.NOT_FOUND); } else { - connectionClose(session, channelId, session.getStateManager(), - "No queue name provided, no default queue defined.", - AMQConstant.NOT_ALLOWED); + session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(), + "No queue name provided, no default queue defined.", body.getClazz(), body.getMethod()); } } else @@ -103,54 +95,18 @@ public class MessageConsumeHandler implements StateAwareMethodListener<MessageCo } catch (AMQInvalidSelectorException ise) { - _log.info("Closing connection due to invalid selector"); + _log.info("Closing connection due to invalid selector: " + ise.getMessage()); session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(), ise.getMessage()); -// channelClose(session, channelId, stateManager, ise.getMessage(), AMQConstant.INVALID_SELECTOR); } catch (ConsumerTagNotUniqueException e) { - connectionClose(session, channelId, session.getStateManager(), - "Non-unique consumer tag, '" + body.destination + "'", - AMQConstant.NOT_ALLOWED); + _log.info("Closing connection due to duplicate (non-unique) consumer tag: " + e.getMessage()); + session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(), + "Non-unique consumer tag, '" + body.destination + "'", body.getClazz(), body.getMethod()); } } } } - -// private void channelClose(AMQProtocolSession session, int channelId, AMQMethodListener listener, -// String message, AMQConstant code) -// throws AMQException -// { -// /*AMQShort*/String msg = new /*AMQShort*/String(message); -// // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) -// // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. -// // Be aware of possible changes to parameter order as versions change. -// session.writeRequest(channelId, ChannelCloseBody.createMethodBody -// ((byte)0, (byte)9, // AMQP version (major, minor) -// MessageConsumeBody.getClazz((byte)0, (byte)9), // classId -// MessageConsumeBody.getMethod((byte)0, (byte)9), // methodId -// code.getCode(), // replyCode -// msg), // replyText -// listener); -// } - - private void connectionClose(AMQProtocolSession session, int channelId, AMQMethodListener listener, - String message, AMQConstant code) - throws AMQException - { - byte major = session.getMajor(); - byte minor = session.getMinor(); - /*AMQShort*/String msg = new /*AMQShort*/String(message); - // Be aware of possible changes to parameter order as versions change. - session.writeRequest(channelId, ConnectionCloseBody.createMethodBody( - major, minor, // AMQP version (major, minor) - MessageConsumeBody.getClazz(major, minor), // classId - MessageConsumeBody.getMethod(major, minor), // methodId - code.getCode(), // replyCode - msg), // replyText - listener); - } - } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 8eb914758b..2c14323df8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -33,6 +33,9 @@ import org.apache.qpid.framing.ConnectionStartBody; import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; +import org.apache.qpid.framing.ChannelOpenBody; +import org.apache.qpid.framing.ConnectionCloseBody; +import org.apache.qpid.framing.ConnectionCloseOkBody; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.Content; import org.apache.qpid.framing.FieldTable; @@ -59,6 +62,7 @@ import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.state.AMQState; import javax.management.JMException; import javax.security.sasl.SaslServer; @@ -100,6 +104,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, private Object _lastSent; + private boolean _closePending; private boolean _closed; // maximum number of channels this session should have private long _maxNoOfChannels = 1000; @@ -128,6 +133,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _codecFactory = codecFactory; _managedObject = createMBean(); _managedObject.register(); + _closePending = false; + _closed = false; } public AMQMinaProtocolSession(IoSession session, QueueRegistry queueRegistry, ExchangeRegistry exchangeRegistry, @@ -143,6 +150,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _codecFactory = codecFactory; _managedObject = createMBean(); _managedObject.register(); + _closePending = false; + _closed = false; } private AMQProtocolSessionMBean createMBean() throws AMQException @@ -168,7 +177,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return (AMQProtocolSession) minaProtocolSession.getAttachment(); } - private AMQChannel createChannel(int id) throws AMQException { + private AMQChannel createChannel(int id) throws AMQException + { IApplicationRegistry registry = ApplicationRegistry.getInstance(); AMQChannel channel = new AMQChannel(id, registry.getMessageStore(), _exchangeRegistry, this, _stateManager); @@ -221,12 +231,22 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } - else + else if(!_closed) { AMQFrame frame = (AMQFrame) message; - AMQChannel channel = getChannel(frame.channel); - if (channel == null) + + if (_closePending) + { + // If a close is pending (ie ChannelClose has been sent, but no ChannelCloseOk received), then + // all methods except ChannelCloseOk must be rejected. (AMQP spec) + if((frame.bodyFrame instanceof AMQRequestBody)) + throw new AMQException("Incoming request frame on connection which is pending close."); + AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame; + if (!(requestBody.getMethodPayload() instanceof ConnectionCloseOkBody)) + throw new AMQException("Incoming frame on unopened channel is not a Connection.Open method."); + } + else if (channel == null) { // Perform a check on incoming frames that may result in a new channel // being opened. The frame MUST be: @@ -235,12 +255,12 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, // c. Must be a ConnectionOpenBody method. // Throw an exception for all other incoming frames on an unopened channel if(!(frame.bodyFrame instanceof AMQRequestBody)) - throw new AMQException("Incoming frame on unopened channel not a request"); + throw new AMQException("Incoming frame on unopened channel is not a request."); AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame; - if (requestBody.getMethodPayload() instanceof ConnectionOpenBody) - throw new AMQException("Incoming frame on unopened channel not a Connection.Open method"); + if (!(requestBody.getMethodPayload() instanceof ChannelOpenBody)) + throw new AMQException("Incoming frame on unopened channel is not a Channel.Open method."); if (requestBody.getRequestId() != 1) - throw new AMQException("Incoming Connection.Open frame on unopened channel does not have a request id = 1"); + throw new AMQException("Incoming Channel.Open frame on unopened channel does not have a request id = 1."); channel = createChannel(frame.channel); } @@ -391,17 +411,36 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, channel.rollback(); } } + + // Used to initiate a channel close from the server side and inform the client + public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException + { + final AMQChannel channel = _channelMap.get(channelId); + if (channel == null) + { + throw new IllegalArgumentException("Unknown channel id " + channelId); + } + else + { + channel.close(this); + // Be aware of possible changes to parameter order as versions change. + AMQMethodBody cf = ChannelCloseBody.createMethodBody + (_major, _minor, // AMQP version (major, minor) + MessageTransferBody.getClazz((byte)0, (byte)9), // classId + MessageTransferBody.getMethod((byte)0, (byte)9), // methodId + replyCode, // replyCode + replyText); // replyText + writeRequest(channelId, cf); + // Wait a bit for the Channel.CloseOk to come in from the client, but don't + // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk + // method handler has not already done so. + // TODO - Find a better way of doing this without holding up this thread... + try { Thread.currentThread().sleep(2000); } // 2 seconds + catch (InterruptedException e) {} + _channelMap.remove(channelId); // Returns null if already removed (by closeOk handler + } + } - /** - * Close a specific channel. This will remove any resources used by the channel, including: - * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li> - * </ul> - * - * @param channelId id of the channel to close - * @param requestId RequestId of recieved Channel.Close reuqest, used to send Channel.CloseOk response - * @throws AMQException if an error occurs closing the channel - * @throws IllegalArgumentException if the channel id is not valid - */ // Used to close a channel as a response to a client close request public void closeChannelResponse(int channelId, long requestId) throws AMQException { @@ -425,33 +464,52 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } } + + // Used to initiate a connection close from the server side and inform the client + public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException + { + _closePending = true; // This prevents all methods except Close-Ok from being accepted + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + AMQMethodBody close = ConnectionCloseBody.createMethodBody( + _major, _minor, // AMQP version (major, minor) + classId, // classId + methodId, // methodId + replyCode, // replyCode + replyText); // replyText + writeRequest(0, close); + // Wait a bit for the Connection.CloseOk to come in from the client, but don't + // rely on it. Attempt to close the connection if the ConnectionCloseOk + // method handler has not already done so. + // TODO - Find a better way of doing this without holding up this thread... + try { Thread.currentThread().sleep(2000); } // 2 seconds + catch (InterruptedException e) {} + closeSession(); + } - // Used to close a channel from the server side and inform the client - public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException + public void closeSessionRequest(int replyCode, String replyText) throws AMQException { - final AMQChannel channel = _channelMap.get(channelId); - if (channel == null) - { - throw new IllegalArgumentException("Unknown channel id"); - } - else + closeSessionRequest(replyCode, replyText, 0, 0); + } + + // Used to close a connection as a response to a client close request + public void closeSessionResponse(long requestId) throws AMQException + { + // Be aware of possible changes to parameter order as versions change. + writeResponse(0, requestId, ConnectionCloseOkBody.createMethodBody(_major, _minor)); // AMQP version + closeSession(); + } + + public void closeSession() throws AMQException + { + if (!_closed) { - channel.close(this); - // Be aware of possible changes to parameter order as versions change. - AMQMethodBody cf = ChannelCloseBody.createMethodBody - (_major, _minor, // AMQP version (major, minor) - MessageTransferBody.getClazz((byte)0, (byte)9), // classId - MessageTransferBody.getMethod((byte)0, (byte)9), // methodId - replyCode, // replyCode - replyText); // replyText - writeRequest(channelId, cf); - // Wait a bit for the Channel.CloseOk to come in from the client, but don't - // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk - // method handler has not already done so. - // TODO - Find a better way of doing this without holding up this thread... - try { Thread.currentThread().sleep(2000); } // 2 seconds - catch (InterruptedException e) {} - _channelMap.remove(channelId); // Returns null if already removed + _closed = true; + closeAllChannels(); + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + if (_managedObject != null) + { + _managedObject.unregister(); + } } } @@ -494,23 +552,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _channelMap.clear(); } - /** - * This must be called when the session is _closed in order to free up any resources - * managed by the session. - */ - public void closeSession() throws AMQException - { - if (!_closed) - { - _closed = true; - closeAllChannels(); - if (_managedObject != null) - { - _managedObject.unregister(); - } - } - } - public String toString() { return "AMQProtocolSession(" + _minaProtocolSession.getRemoteAddress() + ")"; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 0a89d70734..b3c97669c5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -174,18 +174,9 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco } else { - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody( - (byte)0, (byte)9, // AMQP version (major, minor) - 0, // classId - 0, // methodId - 200, // replyCode - throwable.getMessage()); // replyText - session.writeRequest(0, closeBody, methodListener); _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); - protocolSession.close(); + // TODO: Closing with code 200 ("reply-sucess") ??? This cannot be right! + session.closeSessionRequest(200, throwable.getMessage()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 6c5af02dd3..f9e5439890 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -72,19 +72,18 @@ public interface AMQProtocolSession extends AMQProtocolWriter */ void addChannel(AMQChannel channel) throws AMQException; - /** - * Close a specific channel. This will remove any resources used by the channel, including: - * <ul><li>any queue subscriptions (this may in turn remove queues if they are auto delete</li> - * </ul> - * @param channelId id of the channel to close - * @param requestId id of the request that initiated the close, used in response - * @throws org.apache.qpid.AMQException if an error occurs closing the channel - * @throws IllegalArgumentException if the channel id is not valid - */ + void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException; + void closeChannelResponse(int channelId, long requestId) throws AMQException; - void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException; + void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException; + void closeSessionRequest(int replyCode, String replyText) throws AMQException; + + void closeSessionResponse(long requestId) throws AMQException; + + void closeSession() throws AMQException; + /** * Remove a channel from the session but do not close it. * @param channelId @@ -98,12 +97,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter void initHeartbeats(int delay); /** - * This must be called when the session is _closed in order to free up any resources - * managed by the session. - */ - void closeSession() throws AMQException; - - /** * @return a key that uniquely identifies this session */ Object getKey(); diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index 1bad249bc2..ca8735cb62 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -22,12 +22,16 @@ package org.apache.qpid.framing; import java.util.concurrent.ConcurrentHashMap; +import org.apache.log4j.Logger; + import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQProtocolWriter; public class RequestManager { + private static final Logger logger = Logger.getLogger(RequestManager.class); + private int channel; private AMQProtocolWriter protocolWriter; @@ -71,7 +75,11 @@ public class RequestManager lastProcessedResponseId, requestMethodBody); requestSentMap.put(requestId, methodListener); protocolWriter.writeFrame(requestFrame); - // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody); + } return requestId; } @@ -80,7 +88,11 @@ public class RequestManager { long requestIdStart = responseBody.getRequestId(); long requestIdStop = requestIdStart + responseBody.getBatchOffset(); - // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + " " + responseBody + "; " + responseBody.getMethodPayload()); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + + " " + responseBody + "; " + responseBody.getMethodPayload()); + } for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) { AMQMethodListener methodListener = requestSentMap.get(requestId); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index 90f33a08c0..8bc526900a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -23,6 +23,8 @@ package org.apache.qpid.framing; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -30,6 +32,8 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public class ResponseManager { + private static final Logger logger = Logger.getLogger(ResponseManager.class); + private int channel; private AMQMethodListener methodListener; private AMQProtocolWriter protocolWriter; @@ -113,12 +117,15 @@ public class ResponseManager public void requestReceived(AMQRequestBody requestBody) throws Exception { long requestId = requestBody.getRequestId(); - // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload()); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + + " " + requestBody + "; " + requestBody.getMethodPayload()); + } // TODO: responseMark is used in HA, but until then, ignore... long responseMark = requestBody.getResponseMark(); lastReceivedRequestId = requestId; responseMap.put(requestId, new ResponseStatus(requestId)); - // TODO: Update MethodEvent to use the RequestBody instead of MethodBody AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload(), requestId); methodListener.methodReceived(methodEvent); } @@ -126,7 +133,11 @@ public class ResponseManager public void sendResponse(long requestId, AMQMethodBody responseMethodBody) throws RequestResponseMappingException { - // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + " Res[# " + requestId + "]; " + responseMethodBody); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + + " Res[# " + requestId + "]; " + responseMethodBody); + } ResponseStatus responseStatus = responseMap.get(requestId); if (responseStatus == null) throw new RequestResponseMappingException(requestId, |
