summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-24 18:42:37 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-24 18:42:37 +0000
commit2d84ebceb5c55a7dca0f8523704a8ef1b7190a42 (patch)
tree198b4b6c18cdc3e86126a9103578e2bbea96a503 /java/common
parent0471974bddcb1bd24517f7c012429702a0394f32 (diff)
downloadqpid-python-2d84ebceb5c55a7dca0f8523704a8ef1b7190a42.tar.gz
Fixed problem with missing type field in Message.transport. Used JMSHeaders instead.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@499525 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/Content.java6
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java121
2 files changed, 70 insertions, 57 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/Content.java b/java/common/src/main/java/org/apache/qpid/framing/Content.java
index 4f31ed6c8a..b5b0e19dbb 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/Content.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/Content.java
@@ -42,8 +42,8 @@ public class Content
}
}
- public TypeEnum contentType;
- public ByteBuffer content;
+ private TypeEnum contentType;
+ private ByteBuffer content;
// Constructors
@@ -126,7 +126,7 @@ public class Content
{
EncodingUtils.writeUnsignedByte(buffer, contentType.toByte());
EncodingUtils.writeUnsignedInteger(buffer, content.remaining());
- buffer.put(content);
+ buffer.put(content.duplicate());
}
public void populateFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index f7dea180b8..dc7a0a2b75 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -44,25 +44,8 @@ public class ResponseManager
*/
private boolean serverFlag;
- /**
- * Determines the batch behaviour of the manager.
- *
- * Responses are sent to the RequestResponseManager through sendResponse().
- * These may be internally stored/accumulated for batching purposes, depending
- * on the batching strategy/mode of the RequestResponseManager.
- *
- * The following modes are possibe:
- *
- * NONE: Each request results in an immediate single response, no batching
- * takes place.
- * DELAY_FIXED: Waits until a fixed period has passed to batch
- * accumulated responses. An optional fixed threshold may be set, which
- * if reached or exceeded within the delay period will trigger the batch. (TODO)
- * MANUAL: No response is sent until it is explicitly released by calling
- * function xxxx(). (TODO)
- */
- public enum batchResponseModeEnum { NONE }
- private batchResponseModeEnum batchResponseMode = batchResponseModeEnum.NONE;
+ private int maxAccumulatedResponses = 20; // Default
+// private Class currentResponseMethodBodyClass;
/**
* Request and response frames must have a requestID and responseID which
@@ -109,6 +92,7 @@ public class ResponseManager
this.serverFlag = serverFlag;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
+// currentResponseMethodBodyClass = null;
responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
}
@@ -122,7 +106,7 @@ public class ResponseManager
logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
" " + requestBody + "; " + requestBody.getMethodPayload());
}
- //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
+ //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel +
// " " + requestBody + "; " + requestBody.getMethodPayload());
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
@@ -150,26 +134,79 @@ public class ResponseManager
if (responseStatus.responseMethodBody != null)
throw new RequestResponseMappingException(requestId, "RequestId " +
requestId + " already has a response in responseMap.");
+
responseStatus.responseMethodBody = responseMethodBody;
doBatches();
+
+// if (currentResponseMethodBodyClass == null)
+// {
+// currentResponseMethodBodyClass = responseMethodBody.getClass();
+// responseStatus.responseMethodBody = responseMethodBody;
+// }
+// else if (currentResponseMethodBodyClass.equals(responseMethodBody.getClass()))
+// {
+// doBatches();
+// currentResponseMethodBodyClass = responseMethodBody.getClass();
+// responseStatus.responseMethodBody = responseMethodBody;
+// }
+// else
+// {
+// responseStatus.responseMethodBody = responseMethodBody;
+// if (batchedResponses() >= maxAccumulatedResponses)
+// doBatches();
+// }
}
// *** Management functions ***
- public batchResponseModeEnum getBatchResponseMode()
- {
- return batchResponseMode;
- }
-
- public void setBatchResponseMode(batchResponseModeEnum batchResponseMode)
+ /**
+ * Sends batched responses - i.e. all those members of responseMap that have
+ * received a response.
+ */
+ public synchronized void doBatches()
{
- if (this.batchResponseMode != batchResponseMode)
+ long startRequestId = 0;
+ int numAdditionalRequestIds = 0;
+ Class responseMethodBodyClass = null;
+ Iterator<Long> lItr = responseMap.keySet().iterator();
+ while (lItr.hasNext())
{
- this.batchResponseMode = batchResponseMode;
- doBatches();
+ long requestId = lItr.next();
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus.responseMethodBody != null)
+ {
+// if (startRequestId == 0 || responseMethodBodyClass == null)
+// {
+// startRequestId = requestId;
+// responseMethodBodyClass = responseStatus.responseMethodBody.getClass();
+// lItr.remove();
+// }
+// else if (responseMethodBodyClass.equals(responseStatus.responseMethodBody.getClass()))
+// {
+// numAdditionalRequestIds++;
+// lItr.remove();
+// }
+// else
+// {
+// sendResponseBatchFrame(startRequestId, numAdditionalRequestIds,
+// responseStatus.responseMethodBody);
+// numAdditionalRequestIds = 0;
+// startRequestId = requestId;
+// responseMethodBodyClass = responseStatus.responseMethodBody.getClass();
+// lItr.remove();
+// }
+ sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody);
+ lItr.remove();
+ }
}
}
+ /**
+ * Total number of entries in the responseMap - including both those that
+ * are outstanding (i.e. no response has been received) and those that are
+ * batched (those for which responses have been received but have not yet
+ * been collected together and sent).
+ */
public int responsesMapSize()
{
return responseMap.size();
@@ -216,31 +253,7 @@ public class ResponseManager
return responseIdCount++;
}
- private synchronized void doBatches()
- {
- switch (batchResponseMode)
- {
- case NONE:
- Iterator<Long> lItr = responseMap.keySet().iterator();
- while (lItr.hasNext())
- {
- long requestId = lItr.next();
- ResponseStatus responseStatus = responseMap.get(requestId);
- if (responseStatus.responseMethodBody != null)
- {
- sendResponseBatch(requestId, 0, responseStatus.responseMethodBody);
- lItr.remove();
- }
- }
- break;
-
- // TODO: Add additional batch mode handlers here...
- // case DELAY_FIXED:
- // case MANUAL:
- }
- }
-
- private void sendResponseBatch(long firstRequestId, int numAdditionalRequests,
+ private void sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests,
AMQMethodBody responseMethodBody)
{
long responseId = getNextResponseId(); // Get new response ID