diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-24 18:42:37 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-24 18:42:37 +0000 |
| commit | 2d84ebceb5c55a7dca0f8523704a8ef1b7190a42 (patch) | |
| tree | 198b4b6c18cdc3e86126a9103578e2bbea96a503 /java/common | |
| parent | 0471974bddcb1bd24517f7c012429702a0394f32 (diff) | |
| download | qpid-python-2d84ebceb5c55a7dca0f8523704a8ef1b7190a42.tar.gz | |
Fixed problem with missing type field in Message.transport. Used JMSHeaders instead.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@499525 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
| -rw-r--r-- | java/common/src/main/java/org/apache/qpid/framing/Content.java | 6 | ||||
| -rw-r--r-- | java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java | 121 |
2 files changed, 70 insertions, 57 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/Content.java b/java/common/src/main/java/org/apache/qpid/framing/Content.java index 4f31ed6c8a..b5b0e19dbb 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/Content.java +++ b/java/common/src/main/java/org/apache/qpid/framing/Content.java @@ -42,8 +42,8 @@ public class Content } } - public TypeEnum contentType; - public ByteBuffer content; + private TypeEnum contentType; + private ByteBuffer content; // Constructors @@ -126,7 +126,7 @@ public class Content { EncodingUtils.writeUnsignedByte(buffer, contentType.toByte()); EncodingUtils.writeUnsignedInteger(buffer, content.remaining()); - buffer.put(content); + buffer.put(content.duplicate()); } public void populateFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index f7dea180b8..dc7a0a2b75 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -44,25 +44,8 @@ public class ResponseManager */ private boolean serverFlag; - /** - * Determines the batch behaviour of the manager. - * - * Responses are sent to the RequestResponseManager through sendResponse(). - * These may be internally stored/accumulated for batching purposes, depending - * on the batching strategy/mode of the RequestResponseManager. - * - * The following modes are possibe: - * - * NONE: Each request results in an immediate single response, no batching - * takes place. - * DELAY_FIXED: Waits until a fixed period has passed to batch - * accumulated responses. An optional fixed threshold may be set, which - * if reached or exceeded within the delay period will trigger the batch. (TODO) - * MANUAL: No response is sent until it is explicitly released by calling - * function xxxx(). (TODO) - */ - public enum batchResponseModeEnum { NONE } - private batchResponseModeEnum batchResponseMode = batchResponseModeEnum.NONE; + private int maxAccumulatedResponses = 20; // Default +// private Class currentResponseMethodBodyClass; /** * Request and response frames must have a requestID and responseID which @@ -109,6 +92,7 @@ public class ResponseManager this.serverFlag = serverFlag; responseIdCount = 1L; lastReceivedRequestId = 0L; +// currentResponseMethodBodyClass = null; responseMap = new ConcurrentHashMap<Long, ResponseStatus>(); } @@ -122,7 +106,7 @@ public class ResponseManager logger.debug((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + " " + requestBody + "; " + requestBody.getMethodPayload()); } - //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + + //System.out.println((serverFlag ? "SRV" : "CLI") + " RX REQ: ch=" + channel + // " " + requestBody + "; " + requestBody.getMethodPayload()); // TODO: responseMark is used in HA, but until then, ignore... long responseMark = requestBody.getResponseMark(); @@ -150,26 +134,79 @@ public class ResponseManager if (responseStatus.responseMethodBody != null) throw new RequestResponseMappingException(requestId, "RequestId " + requestId + " already has a response in responseMap."); + responseStatus.responseMethodBody = responseMethodBody; doBatches(); + +// if (currentResponseMethodBodyClass == null) +// { +// currentResponseMethodBodyClass = responseMethodBody.getClass(); +// responseStatus.responseMethodBody = responseMethodBody; +// } +// else if (currentResponseMethodBodyClass.equals(responseMethodBody.getClass())) +// { +// doBatches(); +// currentResponseMethodBodyClass = responseMethodBody.getClass(); +// responseStatus.responseMethodBody = responseMethodBody; +// } +// else +// { +// responseStatus.responseMethodBody = responseMethodBody; +// if (batchedResponses() >= maxAccumulatedResponses) +// doBatches(); +// } } // *** Management functions *** - public batchResponseModeEnum getBatchResponseMode() - { - return batchResponseMode; - } - - public void setBatchResponseMode(batchResponseModeEnum batchResponseMode) + /** + * Sends batched responses - i.e. all those members of responseMap that have + * received a response. + */ + public synchronized void doBatches() { - if (this.batchResponseMode != batchResponseMode) + long startRequestId = 0; + int numAdditionalRequestIds = 0; + Class responseMethodBodyClass = null; + Iterator<Long> lItr = responseMap.keySet().iterator(); + while (lItr.hasNext()) { - this.batchResponseMode = batchResponseMode; - doBatches(); + long requestId = lItr.next(); + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus.responseMethodBody != null) + { +// if (startRequestId == 0 || responseMethodBodyClass == null) +// { +// startRequestId = requestId; +// responseMethodBodyClass = responseStatus.responseMethodBody.getClass(); +// lItr.remove(); +// } +// else if (responseMethodBodyClass.equals(responseStatus.responseMethodBody.getClass())) +// { +// numAdditionalRequestIds++; +// lItr.remove(); +// } +// else +// { +// sendResponseBatchFrame(startRequestId, numAdditionalRequestIds, +// responseStatus.responseMethodBody); +// numAdditionalRequestIds = 0; +// startRequestId = requestId; +// responseMethodBodyClass = responseStatus.responseMethodBody.getClass(); +// lItr.remove(); +// } + sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody); + lItr.remove(); + } } } + /** + * Total number of entries in the responseMap - including both those that + * are outstanding (i.e. no response has been received) and those that are + * batched (those for which responses have been received but have not yet + * been collected together and sent). + */ public int responsesMapSize() { return responseMap.size(); @@ -216,31 +253,7 @@ public class ResponseManager return responseIdCount++; } - private synchronized void doBatches() - { - switch (batchResponseMode) - { - case NONE: - Iterator<Long> lItr = responseMap.keySet().iterator(); - while (lItr.hasNext()) - { - long requestId = lItr.next(); - ResponseStatus responseStatus = responseMap.get(requestId); - if (responseStatus.responseMethodBody != null) - { - sendResponseBatch(requestId, 0, responseStatus.responseMethodBody); - lItr.remove(); - } - } - break; - - // TODO: Add additional batch mode handlers here... - // case DELAY_FIXED: - // case MANUAL: - } - } - - private void sendResponseBatch(long firstRequestId, int numAdditionalRequests, + private void sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests, AMQMethodBody responseMethodBody) { long responseId = getNextResponseId(); // Get new response ID |
