summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-22 14:53:43 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-22 14:53:43 +0000
commit210aacb8d90319e0a643836fc55a6717f54ddb05 (patch)
tree0aa0a64dbe7a44b18953273ab7fe6821b325959e /java
parent1afc480c103f3e9de3f468c0203cfa4bcfa67168 (diff)
downloadqpid-python-210aacb8d90319e0a643836fc55a6717f54ddb05.tar.gz
Improvements to debugging messages from Request/ResponseManager. Added timed wait for Channel.CloseOk massage in broker's closeChannelRequest method. Added checks for illegal frames that would open a closed channel
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@498631 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java42
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java16
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestManager.java15
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java17
6 files changed, 73 insertions, 25 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index e34104914f..47288884f3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -137,8 +137,8 @@ public class AMQChannel
_prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
_messageStore = messageStore;
_exchanges = exchanges;
- _requestManager = new RequestManager(channelId, protocolWriter);
- _responseManager = new ResponseManager(channelId, methodListener, protocolWriter);
+ _requestManager = new RequestManager(channelId, protocolWriter, true);
+ _responseManager = new ResponseManager(channelId, methodListener, protocolWriter, true);
_txnBuffer = new TxnBuffer(_messageStore);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
index 642b2ea03b..8eb914758b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
@@ -27,8 +27,10 @@ import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.AMQFrame;
@@ -224,7 +226,21 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
AMQFrame frame = (AMQFrame) message;
AMQChannel channel = getChannel(frame.channel);
- if (channel == null) {
+ if (channel == null)
+ {
+ // Perform a check on incoming frames that may result in a new channel
+ // being opened. The frame MUST be:
+ // a. A new request;
+ // b. Have a request id of 1 (i.e. the first request on a new channel);
+ // c. Must be a ConnectionOpenBody method.
+ // Throw an exception for all other incoming frames on an unopened channel
+ if(!(frame.bodyFrame instanceof AMQRequestBody))
+ throw new AMQException("Incoming frame on unopened channel not a request");
+ AMQRequestBody requestBody = (AMQRequestBody)frame.bodyFrame;
+ if (requestBody.getMethodPayload() instanceof ConnectionOpenBody)
+ throw new AMQException("Incoming frame on unopened channel not a Connection.Open method");
+ if (requestBody.getRequestId() != 1)
+ throw new AMQException("Incoming Connection.Open frame on unopened channel does not have a request id = 1");
channel = createChannel(frame.channel);
}
@@ -268,6 +284,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener)
throws AMQException
{
+ if (!checkMethodBodyVersion(methodBody))
+ throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
AMQChannel channel = getChannel(channelNum);
RequestManager requestManager = channel.getRequestManager();
return requestManager.sendRequest(methodBody, methodListener);
@@ -277,14 +295,14 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
public long writeRequest(int channelNum, AMQMethodBody methodBody)
throws AMQException
{
- AMQChannel channel = getChannel(channelNum);
- RequestManager requestManager = channel.getRequestManager();
- return requestManager.sendRequest(methodBody, _stateManager);
+ return writeRequest(channelNum, methodBody, _stateManager);
}
public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
throws AMQException
{
+ if (!checkMethodBodyVersion(methodBody))
+ throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
AMQChannel channel = getChannel(channelNum);
ResponseManager responseManager = channel.getResponseManager();
responseManager.sendResponse(requestId, methodBody);
@@ -380,6 +398,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
* </ul>
*
* @param channelId id of the channel to close
+ * @param requestId RequestId of recieved Channel.Close reuqest, used to send Channel.CloseOk response
* @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
@@ -396,6 +415,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
try
{
channel.close(this);
+ // Send the Channel.CloseOk response
// Be aware of possible changes to parameter order as versions change.
writeResponse(channelId, requestId, ChannelCloseOkBody.createMethodBody(_major, _minor));
}
@@ -425,6 +445,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
replyCode, // replyCode
replyText); // replyText
writeRequest(channelId, cf);
+ // Wait a bit for the Channel.CloseOk to come in from the client, but don't
+ // rely on it. Attempt to remove the channel from the list if the ChannelCloseOk
+ // method handler has not already done so.
+ // TODO - Find a better way of doing this without holding up this thread...
+ try { Thread.currentThread().sleep(2000); } // 2 seconds
+ catch (InterruptedException e) {}
+ _channelMap.remove(channelId); // Returns null if already removed
}
}
@@ -577,8 +604,13 @@ public class AMQMinaProtocolSession implements AMQProtocolSession,
return _minor;
}
- public boolean amqpVersionEquals(byte major, byte minor)
+ public boolean versionEquals(byte major, byte minor)
{
return _major == major && _minor == minor;
}
+
+ public boolean checkMethodBodyVersion(AMQMethodBody methodBody)
+ {
+ return versionEquals(methodBody.getMajor(), methodBody.getMinor());
+ }
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
index 58ed3b6522..6c5af02dd3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
@@ -136,5 +137,6 @@ public interface AMQProtocolSession extends AMQProtocolWriter
AMQStateManager getStateManager();
byte getMajor();
byte getMinor();
- boolean amqpVersionEquals(byte major, byte minor);
+ boolean versionEquals(byte major, byte minor);
+ boolean checkMethodBodyVersion(AMQMethodBody methodBody);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
index 52cc70a37a..5ea2e66b35 100644
--- a/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
@@ -117,8 +117,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_stateManager = new AMQStateManager(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this));
+ _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
@@ -130,8 +130,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_stateManager = new AMQStateManager(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this));
+ _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection, AMQStateManager stateManager)
@@ -145,8 +145,8 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
_stateManager.setProtocolSession(this);
// Add channel 0 request and response managers, since they will not be added through the usual mechanism
- _channelId2RequestMgrMap.put(0, new RequestManager(0, this));
- _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this));
+ _channelId2RequestMgrMap.put(0, new RequestManager(0, this, false));
+ _channelId2ResponseMgrMap.put(0, new ResponseManager(0, _stateManager, this, false));
}
public void init()
@@ -377,12 +377,12 @@ public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionLis
// Add request and response handlers, one per channel, if they do not already exist
if (_channelId2RequestMgrMap.get(channelId) == null)
{
- _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this));
+ _channelId2RequestMgrMap.put(channelId, new RequestManager(channelId, this, false));
}
if (_channelId2ResponseMgrMap.get(channelId) == null)
{
- _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this));
+ _channelId2ResponseMgrMap.put(channelId, new ResponseManager(channelId, _stateManager, this, false));
}
}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
index a3379484bd..1bad249bc2 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
@@ -29,7 +29,13 @@ import org.apache.qpid.protocol.AMQProtocolWriter;
public class RequestManager
{
private int channel;
- AMQProtocolWriter protocolWriter;
+ private AMQProtocolWriter protocolWriter;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
/**
* Request and response frames must have a requestID and responseID which
@@ -45,10 +51,11 @@ public class RequestManager
private ConcurrentHashMap<Long, AMQMethodListener> requestSentMap;
- public RequestManager(int channel, AMQProtocolWriter protocolWriter)
+ public RequestManager(int channel, AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.protocolWriter = protocolWriter;
+ this.serverFlag = serverFlag;
requestIdCount = 1L;
lastProcessedResponseId = 0L;
requestSentMap = new ConcurrentHashMap<Long, AMQMethodListener>();
@@ -64,7 +71,7 @@ public class RequestManager
lastProcessedResponseId, requestMethodBody);
requestSentMap.put(requestId, methodListener);
protocolWriter.writeFrame(requestFrame);
- // System.out.println("[" + channel + "] SEND REQUEST: requestId = " + requestId + " {" + this.toString().substring(this.toString().lastIndexOf("@")) + "} " + requestMethodBody);
+ // System.out.println((serverFlag ? "SRV" : "CLI") + " TX REQ: ch=" + channel + " Req[" + requestId + " " + lastProcessedResponseId + "]; " + requestMethodBody);
return requestId;
}
@@ -73,7 +80,7 @@ public class RequestManager
{
long requestIdStart = responseBody.getRequestId();
long requestIdStop = requestIdStart + responseBody.getBatchOffset();
- // System.out.println("[" + channel + "] RECEIVE RESPONSE: " + responseBody + "; " + responseBody.getMethodPayload());
+ // System.out.println((serverFlag ? "SRV" : "CLI") + " RX RES: ch=" + channel + " " + responseBody + "; " + responseBody.getMethodPayload());
for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
{
AMQMethodListener methodListener = requestSentMap.get(requestId);
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index 43c6de74c5..90f33a08c0 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -31,8 +31,14 @@ import org.apache.qpid.protocol.AMQProtocolWriter;
public class ResponseManager
{
private int channel;
- AMQMethodListener methodListener;
- AMQProtocolWriter protocolWriter;
+ private AMQMethodListener methodListener;
+ private AMQProtocolWriter protocolWriter;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
/**
* Determines the batch behaviour of the manager.
@@ -91,11 +97,12 @@ public class ResponseManager
private ConcurrentHashMap<Long, ResponseStatus> responseMap;
public ResponseManager(int channel, AMQMethodListener methodListener,
- AMQProtocolWriter protocolWriter)
+ AMQProtocolWriter protocolWriter, boolean serverFlag)
{
this.channel = channel;
this.methodListener = methodListener;
this.protocolWriter = protocolWriter;
+ this.serverFlag = serverFlag;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
@@ -106,7 +113,7 @@ public class ResponseManager
public void requestReceived(AMQRequestBody requestBody) throws Exception
{
long requestId = requestBody.getRequestId();
- // System.out.println("[" + channel + "] RECEIVE REQUEST: " + requestBody + "; " + requestBody.getMethodPayload());
+ // System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload());
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
lastReceivedRequestId = requestId;
@@ -119,7 +126,7 @@ public class ResponseManager
public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
throws RequestResponseMappingException
{
- // System.out.println("[" + channel + "] SEND RESPONSE: requestId = " + requestId + "; " + responseMethodBody);
+ // System.out.println((serverFlag ? "SRV" : "CLI") + " TX RES: ch=" + channel + " Res[# " + requestId + "]; " + responseMethodBody);
ResponseStatus responseStatus = responseMap.get(requestId);
if (responseStatus == null)
throw new RequestResponseMappingException(requestId,