From abfd652c09f7a267b175d7105ca0b9c1f618e368 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Mon, 22 Jan 2007 20:58:01 +0000 Subject: Added session close convinience methods to broker ProtocolSession, modified handlers that need to close a session to use new methods. Added logger to RequestManager and ResponseManager. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@498797 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/handler/ChannelCloseHandler.java | 4 +- .../handler/ConnectionCloseMethodHandler.java | 17 +-- .../handler/ConnectionCloseOkMethodHandler.java | 12 +- .../handler/ConnectionSecureOkMethodHandler.java | 11 +- .../qpid/server/handler/MessageConsumeHandler.java | 56 +------- .../server/protocol/AMQMinaProtocolSession.java | 159 +++++++++++++-------- .../server/protocol/AMQPFastProtocolHandler.java | 13 +- .../qpid/server/protocol/AMQProtocolSession.java | 25 ++-- .../org/apache/qpid/framing/RequestManager.java | 16 ++- .../org/apache/qpid/framing/ResponseManager.java | 17 ++- 10 files changed, 153 insertions(+), 177 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java index fc6a185aff..4bde15b4b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java @@ -51,8 +51,8 @@ public class ChannelCloseHandler implements StateAwareMethodListener evt) throws AMQException { ChannelCloseBody body = evt.getMethod(); - _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId + - " and method " + body.methodId); + _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + + body.classId + " and method " + body.methodId); protocolSession.closeChannelResponse(evt.getChannelId(), evt.getRequestId()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java index c622309062..b47bb0ff34 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java @@ -51,19 +51,8 @@ public class ConnectionCloseMethodHandler implements StateAwareMethodListener evt) throws AMQException { final ConnectionCloseBody body = evt.getMethod(); - _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" + - body.replyText + " for " + protocolSession); - // Be aware of possible changes to parameter order as versions change. - protocolSession.writeResponse(evt, ConnectionCloseOkBody.createMethodBody( - protocolSession.getMajor(), // AMQP major version - protocolSession.getMinor())); // AMQP minor version - try - { - protocolSession.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } + _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + + "/" + body.replyText + " for " + protocolSession); + protocolSession.closeSessionResponse(evt.getRequestId()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java index c772b6d7c1..941c1b7e76 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java @@ -49,17 +49,7 @@ public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener< public void methodReceived(AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException { - //todo should this not do more than just log the method? _logger.info("Received Connection-close-ok"); - - try - { - protocolSession.getStateManager().changeState(AMQState.CONNECTION_CLOSED); - protocolSession.closeSession(); - } - catch (Exception e) - { - _logger.error("Error closing protocol session: " + e, e); - } + protocolSession.closeSession(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java index 599c58f5af..d7c927110a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java @@ -76,16 +76,9 @@ public class ConnectionSecureOkMethodHandler implements StateAwareMethodListener // Can't do this as we violate protocol. Need to send Close // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName()); _logger.info("Authentication failed"); - stateManager.changeState(AMQState.CONNECTION_CLOSING); - // Be aware of possible changes to parameter order as versions change. - AMQMethodBody close = ConnectionCloseBody.createMethodBody( - major, minor, // AMQP version (major, minor) - ConnectionCloseBody.getClazz(major, minor), // classId - ConnectionCloseBody.getMethod(major, minor), // methodId - AMQConstant.NOT_ALLOWED.getCode(), // replyCode - AMQConstant.NOT_ALLOWED.getName()); // replyText - protocolSession.writeResponse(evt, close); disposeSaslServer(protocolSession); + protocolSession.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(), + AMQConstant.NOT_ALLOWED.getName(), body.getClazz(), body.getMethod()); break; case SUCCESS: _logger.info("Connected as: " + ss.getAuthorizationID()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java index 4b61d4767f..61da80a2d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java @@ -23,20 +23,15 @@ package org.apache.qpid.server.handler; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQInvalidSelectorException; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ConnectionCloseBody; import org.apache.qpid.framing.MessageConsumeBody; import org.apache.qpid.framing.MessageOkBody; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; -import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; -import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; public class MessageConsumeHandler implements StateAwareMethodListener @@ -77,14 +72,11 @@ public class MessageConsumeHandler implements StateAwareMethodListener
  • any queue subscriptions (this may in turn remove queues if they are auto delete
  • - * - * - * @param channelId id of the channel to close - * @param requestId RequestId of recieved Channel.Close reuqest, used to send Channel.CloseOk response - * @throws AMQException if an error occurs closing the channel - * @throws IllegalArgumentException if the channel id is not valid - */ // Used to close a channel as a response to a client close request public void closeChannelResponse(int channelId, long requestId) throws AMQException { @@ -425,33 +464,52 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, } } } + + // Used to initiate a connection close from the server side and inform the client + public void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException + { + _closePending = true; // This prevents all methods except Close-Ok from being accepted + _stateManager.changeState(AMQState.CONNECTION_CLOSING); + AMQMethodBody close = ConnectionCloseBody.createMethodBody( + _major, _minor, // AMQP version (major, minor) + classId, // classId + methodId, // methodId + replyCode, // replyCode + replyText); // replyText + writeRequest(0, close); + // Wait a bit for the Connection.CloseOk to come in from the client, but don't + // rely on it. Attempt to close the connection if the ConnectionCloseOk + // method handler has not already done so. + // TODO - Find a better way of doing this without holding up this thread... + try { Thread.currentThread().sleep(2000); } // 2 seconds + catch (InterruptedException e) {} + closeSession(); + } - // Used to close a channel from the server side and inform the client - public void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException + public void closeSessionRequest(int replyCode, String replyText) throws AMQException { - final AMQChannel channel = _channelMap.get(channelId); - if (channel == null) - { - throw new IllegalArgumentException("Unknown channel id"); - } - else + closeSessionRequest(replyCode, replyText, 0, 0); + } + + // Used to close a connection as a response to a client close request + public void closeSessionResponse(long requestId) throws AMQException + { + // Be aware of possible changes to parameter order as versions change. + writeResponse(0, requestId, ConnectionCloseOkBody.createMethodBody(_major, _minor)); // AMQP version + closeSession(); + } + + public void closeSession() throws AMQException + { + if (!_closed) { - channel.close(this); - // Be aware of possible changes to parameter order as versions change. - AMQMethodBody cf = ChannelCloseBody.createMethodBody - (_major, _minor, // AMQP version (major, minor) - MessageTransferBody.getClazz((byte)0, (byte)9), // classId - MessageTransferBody.getMethod((byte)0, (byte)9), // methodId - replyCode, // replyCode - replyText); // replyText - writeRequest(channelId, cf); - // Wait a bit for the Channel.CloseOk to come in from the client, but don't - // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk - // method handler has not already done so. - // TODO - Find a better way of doing this without holding up this thread... - try { Thread.currentThread().sleep(2000); } // 2 seconds - catch (InterruptedException e) {} - _channelMap.remove(channelId); // Returns null if already removed + _closed = true; + closeAllChannels(); + _stateManager.changeState(AMQState.CONNECTION_CLOSED); + if (_managedObject != null) + { + _managedObject.unregister(); + } } } @@ -494,23 +552,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, _channelMap.clear(); } - /** - * This must be called when the session is _closed in order to free up any resources - * managed by the session. - */ - public void closeSession() throws AMQException - { - if (!_closed) - { - _closed = true; - closeAllChannels(); - if (_managedObject != null) - { - _managedObject.unregister(); - } - } - } - public String toString() { return "AMQProtocolSession(" + _minaProtocolSession.getRemoteAddress() + ")"; diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 0a89d70734..b3c97669c5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -174,18 +174,9 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter implements Protoco } else { - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQMethodBody closeBody = ConnectionCloseBody.createMethodBody( - (byte)0, (byte)9, // AMQP version (major, minor) - 0, // classId - 0, // methodId - 200, // replyCode - throwable.getMessage()); // replyText - session.writeRequest(0, closeBody, methodListener); _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable); - protocolSession.close(); + // TODO: Closing with code 200 ("reply-sucess") ??? This cannot be right! + session.closeSessionRequest(200, throwable.getMessage()); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 6c5af02dd3..f9e5439890 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -72,19 +72,18 @@ public interface AMQProtocolSession extends AMQProtocolWriter */ void addChannel(AMQChannel channel) throws AMQException; - /** - * Close a specific channel. This will remove any resources used by the channel, including: - *
    • any queue subscriptions (this may in turn remove queues if they are auto delete
    • - *
    - * @param channelId id of the channel to close - * @param requestId id of the request that initiated the close, used in response - * @throws org.apache.qpid.AMQException if an error occurs closing the channel - * @throws IllegalArgumentException if the channel id is not valid - */ + void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException; + void closeChannelResponse(int channelId, long requestId) throws AMQException; - void closeChannelRequest(int channelId, int replyCode, String replyText) throws AMQException; + void closeSessionRequest(int replyCode, String replyText, int classId, int methodId) throws AMQException; + void closeSessionRequest(int replyCode, String replyText) throws AMQException; + + void closeSessionResponse(long requestId) throws AMQException; + + void closeSession() throws AMQException; + /** * Remove a channel from the session but do not close it. * @param channelId @@ -97,12 +96,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter */ void initHeartbeats(int delay); - /** - * This must be called when the session is _closed in order to free up any resources - * managed by the session. - */ - void closeSession() throws AMQException; - /** * @return a key that uniquely identifies this session */ diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index 1bad249bc2..ca8735cb62 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -22,12 +22,16 @@ package org.apache.qpid.framing; import java.util.concurrent.ConcurrentHashMap; +import org.apache.log4j.Logger; + import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQProtocolWriter; public class RequestManager { + private static final Logger logger = Logger.getLogger(RequestManager.class); + private int channel; private AMQProtocolWriter protocolWriter; @@ -71,7 +75,11 @@ public class RequestManager lastProcessedResponseId, requestMethodBody); requestSentMap.put(requestId, methodListener); protocolWriter.writeFrame(requestFrame); - // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody); + } return requestId; } @@ -80,7 +88,11 @@ public class RequestManager { long requestIdStart = responseBody.getRequestId(); long requestIdStop = requestIdStart + responseBody.getBatchOffset(); - // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + " " + responseBody + "; " + responseBody.getMethodPayload()); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + + " " + responseBody + "; " + responseBody.getMethodPayload()); + } for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) { AMQMethodListener methodListener = requestSentMap.get(requestId); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index 90f33a08c0..8bc526900a 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -23,6 +23,8 @@ package org.apache.qpid.framing; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import org.apache.log4j.Logger; + import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQMethodListener; @@ -30,6 +32,8 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public class ResponseManager { + private static final Logger logger = Logger.getLogger(ResponseManager.class); + private int channel; private AMQMethodListener methodListener; private AMQProtocolWriter protocolWriter; @@ -113,12 +117,15 @@ public class ResponseManager public void requestReceived(AMQRequestBody requestBody) throws Exception { long requestId = requestBody.getRequestId(); - // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload()); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + + " " + requestBody + "; " + requestBody.getMethodPayload()); + } // TODO: responseMark is used in HA, but until then, ignore... long responseMark = requestBody.getResponseMark(); lastReceivedRequestId = requestId; responseMap.put(requestId, new ResponseStatus(requestId)); - // TODO: Update MethodEvent to use the RequestBody instead of MethodBody AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload(), requestId); methodListener.methodReceived(methodEvent); } @@ -126,7 +133,11 @@ public class ResponseManager public void sendResponse(long requestId, AMQMethodBody responseMethodBody) throws RequestResponseMappingException { - // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + " Res[# " + requestId + "]; " + responseMethodBody); + if (logger.isDebugEnabled()) + { + logger.debug((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + + " Res[# " + requestId + "]; " + responseMethodBody); + } ResponseStatus responseStatus = responseMap.get(requestId); if (responseStatus == null) throw new RequestResponseMappingException(requestId, -- cgit v1.2.1