diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-22 14:53:43 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-22 14:53:43 +0000 |
| commit | 210aacb8d90319e0a643836fc55a6717f54ddb05 (patch) | |
| tree | 0aa0a64dbe7a44b18953273ab7fe6821b325959e /java | |
| parent | 1afc480c103f3e9de3f468c0203cfa4bcfa67168 (diff) | |
| download | qpid-python-210aacb8d90319e0a643836fc55a6717f54ddb05.tar.gz | |
Improvements to debugging messages from Request/ResponseManager. Added timed wait for Channel.CloseOk massage in broker's closeChannelRequest method. Added checks for illegal frames that would open a closed channel
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@498631 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
6 files changed, 73 insertions, 25 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index e34104914f..47288884f3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -137,8 +137,8 @@ public class AMQChannel _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; _exchanges = exchanges; - _requestManager = new RequestManager(channelId, protocolWriter); - _responseManager = new ResponseManager(channelId, methodListener, protocolWriter); + _requestManager = new RequestManager(channelId, protocolWriter, true); + _responseManager = new ResponseManager(channelId, methodListener, protocolWriter, true); _txnBuffer = new TxnBuffer(_messageStore); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 642b2ea03b..8eb914758b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -27,8 +27,10 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.ProtocolInitiation; import org.apache.qpid.framing.ConnectionStartBody; +import org.apache.qpid.framing.ConnectionOpenBody; import org.apache.qpid.framing.ChannelCloseBody; import org.apache.qpid.framing.ChannelCloseOkBody; import org.apache.qpid.framing.AMQFrame; @@ -224,7 +226,21 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, AMQFrame frame = (AMQFrame) message; AMQChannel channel = getChannel(frame.channel); - if (channel == null) { + if (channel == null) + { + // Perform a check on incoming frames that may result in a new channel + // being opened. The frame MUST be: + // a. A new request; + // b. Have a request id of 1 (i.e. the first request on a new channel); + // c. Must be a ConnectionOpenBody method. + // Throw an exception for all other incoming frames on an unopened channel + if(!(frame.bodyFrame instanceof AMQRequestBody)) + throw new AMQException("Incoming frame on unopened channel not a request"); + AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame; + if (requestBody.getMethodPayload() instanceof ConnectionOpenBody) + throw new AMQException("Incoming frame on unopened channel not a Connection.Open method"); + if (requestBody.getRequestId() != 1) + throw new AMQException("Incoming Connection.Open frame on unopened channel does not have a request id = 1"); channel = createChannel(frame.channel); } @@ -268,6 +284,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener) throws AMQException { + if (!checkMethodBodyVersion(methodBody)) + throw new AMQProtocolVersionException("MethodBody version did not match version of current session."); AMQChannel channel = getChannel(channelNum); RequestManager requestManager = channel.getRequestManager(); return requestManager.sendRequest(methodBody, methodListener); @@ -277,14 +295,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, public long writeRequest(int channelNum, AMQMethodBody methodBody) throws AMQException { - AMQChannel channel = getChannel(channelNum); - RequestManager requestManager = channel.getRequestManager(); - return requestManager.sendRequest(methodBody, _stateManager); + return writeRequest(channelNum, methodBody, _stateManager); } public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) throws AMQException { + if (!checkMethodBodyVersion(methodBody)) + throw new AMQProtocolVersionException("MethodBody version did not match version of current session."); AMQChannel channel = getChannel(channelNum); ResponseManager responseManager = channel.getResponseManager(); responseManager.sendResponse(requestId, methodBody); @@ -380,6 +398,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, * </ul> * * @param channelId id of the channel to close + * @param requestId RequestId of recieved Channel.Close reuqest, used to send Channel.CloseOk response * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid */ @@ -396,6 +415,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, try { channel.close(this); + // Send the Channel.CloseOk response // Be aware of possible changes to parameter order as versions change. writeResponse(channelId, requestId, ChannelCloseOkBody.createMethodBody(_major, _minor)); } @@ -425,6 +445,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, replyCode, // replyCode replyText); // replyText writeRequest(channelId, cf); + // Wait a bit for the Channel.CloseOk to come in from the client, but don't + // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk + // method handler has not already done so. + // TODO - Find a better way of doing this without holding up this thread... + try { Thread.currentThread().sleep(2000); } // 2 seconds + catch (InterruptedException e) {} + _channelMap.remove(channelId); // Returns null if already removed } } @@ -577,8 +604,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, return _minor; } - public boolean amqpVersionEquals(byte major, byte minor) + public boolean versionEquals(byte major, byte minor) { return _major == major && _minor == minor; } + + public boolean checkMethodBodyVersion(AMQMethodBody methodBody) + { + return versionEquals(methodBody.getMajor(), methodBody.getMinor()); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java index 58ed3b6522..6c5af02dd3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; @@ -136,5 +137,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter AMQStateManager getStateManager(); byte getMajor(); byte getMinor(); - boolean amqpVersionEquals(byte major, byte minor); + boolean versionEquals(byte major, byte minor); + boolean checkMethodBodyVersion(AMQMethodBody methodBody); } diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java index 52cc70a37a..5ea2e66b35 100644 --- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java @@ -117,8 +117,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _stateManager = new AMQStateManager(this); // Add channel 0 request and response managers, since they will not be added through the usual mechanism - _channelId2RequestMgrMap.put(0, new RequestManager(0, this)); - _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this)); + _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false)); + _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false)); } public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection) @@ -130,8 +130,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _stateManager = new AMQStateManager(this); // Add channel 0 request and response managers, since they will not be added through the usual mechanism - _channelId2RequestMgrMap.put(0, new RequestManager(0, this)); - _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this)); + _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false)); + _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false)); } public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager) @@ -145,8 +145,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis _stateManager.setProtocolSession(this); // Add channel 0 request and response managers, since they will not be added through the usual mechanism - _channelId2RequestMgrMap.put(0, new RequestManager(0, this)); - _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this)); + _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false)); + _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false)); } public void init() @@ -377,12 +377,12 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis // Add request and response handlers, one per channel, if they do not already exist if (_channelId2RequestMgrMap.get(channelId) == null) { - _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this)); + _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this, false)); } if (_channelId2ResponseMgrMap.get(channelId) == null) { - _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this)); + _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this, false)); } } diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index a3379484bd..1bad249bc2 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -29,7 +29,13 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public class RequestManager { private int channel; - AMQProtocolWriter protocolWriter; + private AMQProtocolWriter protocolWriter; + + /** + * Used for logging and debugging only - allows the context of this instance + * to be known. + */ + private boolean serverFlag; /** * Request and response frames must have a requestID and responseID which @@ -45,10 +51,11 @@ public class RequestManager private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap; - public RequestManager(int channel, AMQProtocolWriter protocolWriter) + public RequestManager(int channel, AMQProtocolWriter protocolWriter, boolean serverFlag) { this.channel = channel; this.protocolWriter = protocolWriter; + this.serverFlag = serverFlag; requestIdCount = 1L; lastProcessedResponseId = 0L; requestSentMap = new ConcurrentHashMap<Long, AMQMethodListener>(); @@ -64,7 +71,7 @@ public class RequestManager lastProcessedResponseId, requestMethodBody); requestSentMap.put(requestId, methodListener); protocolWriter.writeFrame(requestFrame); - // System.out.println("[" + channel + "] SEND REQUEST: requestId = " + requestId + " {" + this.toString().substring(this.toString().lastIndexOf("@")) + "} " + requestMethodBody); + // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody); return requestId; } @@ -73,7 +80,7 @@ public class RequestManager { long requestIdStart = responseBody.getRequestId(); long requestIdStop = requestIdStart + responseBody.getBatchOffset(); - // System.out.println("[" + channel + "] RECEIVE RESPONSE: " + responseBody + "; " + responseBody.getMethodPayload()); + // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + " " + responseBody + "; " + responseBody.getMethodPayload()); for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) { AMQMethodListener methodListener = requestSentMap.get(requestId); diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index 43c6de74c5..90f33a08c0 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -31,8 +31,14 @@ import org.apache.qpid.protocol.AMQProtocolWriter; public class ResponseManager { private int channel; - AMQMethodListener methodListener; - AMQProtocolWriter protocolWriter; + private AMQMethodListener methodListener; + private AMQProtocolWriter protocolWriter; + + /** + * Used for logging and debugging only - allows the context of this instance + * to be known. + */ + private boolean serverFlag; /** * Determines the batch behaviour of the manager. @@ -91,11 +97,12 @@ public class ResponseManager private ConcurrentHashMap<Long, ResponseStatus> responseMap; public ResponseManager(int channel, AMQMethodListener methodListener, - AMQProtocolWriter protocolWriter) + AMQProtocolWriter protocolWriter, boolean serverFlag) { this.channel = channel; this.methodListener = methodListener; this.protocolWriter = protocolWriter; + this.serverFlag = serverFlag; responseIdCount = 1L; lastReceivedRequestId = 0L; responseMap = new ConcurrentHashMap<Long, ResponseStatus>(); @@ -106,7 +113,7 @@ public class ResponseManager public void requestReceived(AMQRequestBody requestBody) throws Exception { long requestId = requestBody.getRequestId(); - // System.out.println("[" + channel + "] RECEIVE REQUEST: " + requestBody + "; " + requestBody.getMethodPayload()); + // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload()); // TODO: responseMark is used in HA, but until then, ignore... long responseMark = requestBody.getResponseMark(); lastReceivedRequestId = requestId; @@ -119,7 +126,7 @@ public class ResponseManager public void sendResponse(long requestId, AMQMethodBody responseMethodBody) throws RequestResponseMappingException { - // System.out.println("[" + channel + "] SEND RESPONSE: requestId = " + requestId + "; " + responseMethodBody); + // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + " Res[# " + requestId + "]; " + responseMethodBody); ResponseStatus responseStatus = responseMap.get(requestId); if (responseStatus == null) throw new RequestResponseMappingException(requestId, |
