From ddf86268e7bea00710a49bc66fff1a398d41cf0b Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Wed, 10 Jan 2007 20:12:23 +0000 Subject: First version of RequestResponseHandler for an idea of how the RequestResponse frames will be managed... Not quite complete, still need to find a means of writing the frames that is common to both client and server. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@494962 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/framing/AMQRequestBody.java | 32 +-- .../org/apache/qpid/framing/AMQResponseBody.java | 38 ++-- .../qpid/framing/RequestResponseManager.java | 224 +++++++++++++++++++++ 3 files changed, 267 insertions(+), 27 deletions(-) create mode 100644 java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java (limited to 'java/common/src') diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java index 137ca1d0e4..af43ab6474 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java @@ -24,16 +24,22 @@ import org.apache.mina.common.ByteBuffer; public class AMQRequestBody extends AMQBody { - public static final byte TYPE = (byte)AmqpConstants.frameRequestAsInt(); - // Fields declared in specification - public long requestId; - public long responseMark; - public AMQMethodBody methodPayload; + protected long requestId; + protected long responseMark; + protected AMQMethodBody methodPayload; // Constructor public AMQRequestBody() {} + public AMQRequestBody(long requestId, long responseMark, + AMQMethodBody methodPayload) + { + this.requestId = requestId; + this.responseMark = responseMark; + this.methodPayload = methodPayload; + } + // Field methods public long getRequestId() { return requestId; } @@ -43,7 +49,7 @@ public class AMQRequestBody extends AMQBody protected byte getFrameType() { - return TYPE; + return (byte)AmqpConstants.frameRequestAsInt(); } protected int getSize() @@ -68,15 +74,17 @@ public class AMQRequestBody extends AMQBody methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); } + public String toString() + { + return "Req[" + requestId + " " + responseMark + "] C" + + methodPayload.getClazz() + " M" + methodPayload.getMethod(); + } + public static AMQFrame createAMQFrame(int channelId, long requestId, long responseMark, AMQMethodBody methodPayload) { - AMQRequestBody requestFrame = new AMQRequestBody(); - requestFrame.requestId = requestId; - requestFrame.responseMark = responseMark; - requestFrame.methodPayload = methodPayload; - - + AMQRequestBody requestFrame = new AMQRequestBody(requestId, responseMark, + methodPayload); AMQFrame frame = new AMQFrame(); frame.channel = channelId; frame.bodyFrame = requestFrame; diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java index 84174b4020..67fc485f48 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java @@ -24,16 +24,22 @@ import org.apache.mina.common.ByteBuffer; public class AMQResponseBody extends AMQBody { - public static final byte TYPE = (byte)AmqpConstants.frameResponseAsInt(); - // Fields declared in specification - public long responseId; - public long requestId; - public int batchOffset; - public AMQMethodBody methodPayload; + protected long responseId; + protected long requestId; + protected int batchOffset; + protected AMQMethodBody methodPayload; // Constructor public AMQResponseBody() {} + public AMQResponseBody(long getResponseId, long getRequestId, + int batchOffset, AMQMethodBody methodPayload) + { + this.responseId = responseId; + this.requestId = requestId; + this.batchOffset = batchOffset; + this.methodPayload = methodPayload; + } // Field methods public long getResponseId() { return responseId; } @@ -43,7 +49,7 @@ public class AMQResponseBody extends AMQBody protected byte getFrameType() { - return TYPE; + return (byte)AmqpConstants.frameResponseAsInt(); } protected int getSize() @@ -68,15 +74,17 @@ public class AMQResponseBody extends AMQBody methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4); } - public static AMQFrame createAMQFrame(int channelId, long requestId, - long responseId, int batchOffset, AMQMethodBody methodPayload) + public String toString() { - AMQResponseBody responseFrame = new AMQResponseBody(); - responseFrame.responseId = responseId; - responseFrame.requestId = requestId; - responseFrame.batchOffset = batchOffset; - responseFrame.methodPayload = methodPayload; - + return "Res[" + responseId + " " + requestId + "-" + requestId + batchOffset + "] C" + + methodPayload.getClazz() + " M" + methodPayload.getMethod(); + } + + public static AMQFrame createAMQFrame(int channelId, long responseId, + long requestId, int batchOffset, AMQMethodBody methodPayload) + { + AMQResponseBody responseFrame = new AMQResponseBody(responseId, + requestId, batchOffset, methodPayload); AMQFrame frame = new AMQFrame(); frame.channel = channelId; frame.bodyFrame = responseFrame; diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java new file mode 100644 index 0000000000..334a13ffe9 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java @@ -0,0 +1,224 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.TreeMap; + +import org.apache.qpid.AMQException; +//import org.apache.qpid.server.protocol.AMQProtocolSession; + +public class RequestResponseManager +{ + private int channel; +// AMQProtocolSession protocolSession; + + /** + * Determines the batch behaviour of the manager. + * + * Responses are sent to the RequestResponseManager through sendResponse(). + * These may be internally stored/accumulated for batching purposes, depending + * on the batching strategy/mode of the RequestResponseManager. + * + * The following modes are possibe: + * + * NONE: Each request results in an immediate single response, no batching + * takes place. + * DELAY_FIXED: Waits until a fixed period has passed to batch + * accumulated responses. An optional fixed threshold may be set, which + * if reached or exceeded within the delay period will trigger the batch. (TODO) + * MANUAL: No response is sent until it is explicitly released by calling + * function xxxx(). (TODO) + */ + public enum batchResponseModeEnum { NONE } + private batchResponseModeEnum batchResponseMode; + + /** + * Request and response frames must have a requestID and responseID which + * indepenedently increment from 0 on a per-channel basis. These are the + * counters, and contain the value of the next (not yet used) frame. + */ + private long requestIdCount; + private long responseIdCount; + + /** + * These keep track of the last requestId and responseId to be received. + */ + private long lastReceivedRequestId; + private long lastReceivedResponseId; + + /** + * Last requestID sent in a response (for batching) + */ + private long lastSentRequestId; + + private class ResponseStatus implements Comparable + { + public long requestId; + public AMQMethodBody responseMethodBody; + + public ResponseStatus(long requestId) + { + this.requestId = requestId; + responseMethodBody = null; + } + + public int compareTo(ResponseStatus o) + { + return (int)(requestId - o.requestId); + } + } + + private TreeMap requestSentMap; + private TreeMap responseMap; + +// public RequestResponseManager(int channel, AMQProtocolSession protocolSession) + public RequestResponseManager(int channel) + { + this.channel = channel; +// this.protocolSession = protocolSession; + requestIdCount = 1L; + responseIdCount = 1L; + lastReceivedRequestId = 0L; + lastReceivedResponseId = 0L; + requestSentMap = new TreeMap(); + responseMap = new TreeMap(); + } + + // *** Functions to originate a request *** + + public long sendRequest(AMQMethodBody requestMethodBody, Method responseCallback) + { + long requestId = getRequestId(); // Get new request ID + AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, + lastReceivedResponseId, requestMethodBody); +// protocolSession.writeFrame(requestFrame); + requestSentMap.put(requestId, responseCallback); + return requestId; + } + + public void responseReceived(AMQResponseBody responseBody) throws AMQException + { + lastReceivedResponseId = responseBody.getResponseId(); + long requestIdStart = responseBody.getRequestId(); + long requestIdStop = requestIdStart + responseBody.getBatchOffset(); + for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) + { + Method responseCallback = requestSentMap.get(requestId); + if (responseCallback == null) + throw new AMQException("Failed to locate requestId " + requestId + + " in requestSentMap."); + // TODO + // responseCallback.invoke(?); + requestSentMap.remove(requestId); + } + } + + // *** Functions to handle an incoming request *** + + public void requestReceived(AMQRequestBody requestBody) + { + long requestId = requestBody.getRequestId(); + long responseMark = requestBody.getResponseMark(); // TODO - what do we do with this?? + lastReceivedRequestId = requestId; + responseMap.put(requestId, new ResponseStatus(requestId)); + + // TODO: Initiate some action based on the MethodBody + } + + public void sendResponse(long requestId, AMQMethodBody responseMethodBody) throws AMQException + { + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus == null) + throw new AMQException("Failed to locate requestId " + requestId + + " in responseMap."); + if (responseStatus.responseMethodBody != null) + throw new AMQException("RequestId " + requestId + " already has a response."); + responseStatus.responseMethodBody = responseMethodBody; + doBatches(); + } + + // *** Management functions *** + + public batchResponseModeEnum getBatchResponseMode() + { + return batchResponseMode; + } + + public void setBatchResponseMode(batchResponseModeEnum batchResponseMode) + { + if (this.batchResponseMode != batchResponseMode) + { + this.batchResponseMode = batchResponseMode; + doBatches(); + } + } + + // *** Private helper functions *** + + private long getRequestId() + { + return requestIdCount++; + } + + private long getResponseId() + { + return responseIdCount++; + } + +/* private Method findRequest(long requestId) + throws AMQException + { + RequestStatus requestStatus = requestMap.get(requestId); + // TODO + return null; + } +*/ + private void doBatches() + { + switch (batchResponseMode) + { + case NONE: + Iterator lItr = responseMap.keySet().iterator(); + while (lItr.hasNext()) + { + long requestId = lItr.next(); + ResponseStatus responseStatus = responseMap.get(requestId); + if (responseStatus.responseMethodBody != null) + { + sendResponseBatch(requestId, 0, responseStatus.responseMethodBody); + lItr.remove(); + } + } + break; + } + } + + private void sendResponseBatch(long firstRequestId, int numAdditionalRequests, + AMQMethodBody responseMethodBody) + { + long responseId = getResponseId(); // Get new request ID + AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, + firstRequestId, numAdditionalRequests, responseMethodBody); +// protocolSession.writeFrame(responseFrame); + } +} -- cgit v1.2.1