diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 16:26:00 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 16:26:00 +0000 |
| commit | fa15e6d52022cc1576b19e3caaecf66260c1923e (patch) | |
| tree | a00bdc846c8b772faf199d228a60db87d75c7939 /java/common | |
| parent | 9ba2ca90c9127ea98372a9758e731dd9fe19c212 (diff) | |
| download | qpid-python-fa15e6d52022cc1576b19e3caaecf66260c1923e.tar.gz | |
Request and Respone managers now use the new common AMQMethodListener class
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496389 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
3 files changed, 44 insertions, 66 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java deleted file mode 100644 index f779258e00..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -public interface RequestHandler -{ - public boolean requestReceived(AMQRequestBody requestBody); -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index 55c25151da..da7479af8c 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -28,21 +28,21 @@ public class RequestManager { private int channel; AMQProtocolWriter protocolSession; - + /** * Request and response frames must have a requestID and responseID which * indepenedently increment from 0 on a per-channel basis. These are the * counters, and contain the value of the next (not yet used) frame. */ private long requestIdCount; - + /** * These keep track of the last requestId and responseId to be received. */ private long lastProcessedResponseId; - + private Hashtable<Long, AMQResponseCallback> requestSentMap; - + public RequestManager(int channel, AMQProtocolWriter protocolSession) { this.channel = channel; @@ -51,9 +51,9 @@ public class RequestManager lastProcessedResponseId = 0L; requestSentMap = new Hashtable<Long, AMQResponseCallback>(); } - + // *** Functions to originate a request *** - + public long sendRequest(AMQMethodBody requestMethodBody, AMQResponseCallback responseCallback) { @@ -64,7 +64,7 @@ public class RequestManager requestSentMap.put(requestId, responseCallback); return requestId; } - + public void responseReceived(AMQResponseBody responseBody) throws RequestResponseMappingException { @@ -81,16 +81,16 @@ public class RequestManager } lastProcessedResponseId = responseBody.getResponseId(); } - + // *** Management functions *** - + public int requestsMapSize() { return requestSentMap.size(); } - + // *** Private helper functions *** - + private long getNextRequestId() { return requestIdCount++; diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java index 280d8d562a..9174675ded 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java @@ -24,12 +24,14 @@ import java.util.Iterator; import java.util.Hashtable; import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQProtocolWriter; public class ResponseManager { private int channel; - RequestHandler requestHandler; + AMQMethodListener methodListener; AMQProtocolWriter protocolSession; /** @@ -51,66 +53,68 @@ public class ResponseManager */ public enum batchResponseModeEnum { NONE } private batchResponseModeEnum batchResponseMode; - + /** * Request and response frames must have a requestID and responseID which * indepenedently increment from 0 on a per-channel basis. These are the * counters, and contain the value of the next (not yet used) frame. */ private long responseIdCount; - + /** * These keep track of the last requestId and responseId to be received. */ private long lastReceivedRequestId; - + /** * Last requestID sent in a response (for batching) */ private long lastSentRequestId; - + private class ResponseStatus implements Comparable<ResponseStatus> { - public long requestId; - public AMQMethodBody responseMethodBody; - + private long requestId; + private AMQMethodBody responseMethodBody; + public ResponseStatus(long requestId) { - this.requestId = requestId; + this.requestId = requestId; responseMethodBody = null; } - + public int compareTo(ResponseStatus o) { return (int)(requestId - o.requestId); } } - + private Hashtable<Long, ResponseStatus> responseMap; - - public ResponseManager(int channel, RequestHandler requestHandler, + + public ResponseManager(int channel, AMQMethodListener methodListener, AMQProtocolWriter protocolSession) { this.channel = channel; - this.requestHandler = requestHandler; + this.methodListener = methodListener; this.protocolSession = protocolSession; responseIdCount = 1L; lastReceivedRequestId = 0L; responseMap = new Hashtable<Long, ResponseStatus>(); } - + // *** Functions to handle an incoming request *** - - public void requestReceived(AMQRequestBody requestBody) + + public void requestReceived(AMQRequestBody requestBody) throws Exception { long requestId = requestBody.getRequestId(); // TODO: responseMark is used in HA, but until then, ignore... long responseMark = requestBody.getResponseMark(); lastReceivedRequestId = requestId; responseMap.put(requestId, new ResponseStatus(requestId)); - requestHandler.requestReceived(requestBody); + // TODO: Update MethodEvent to use the RequestBody instead of MethodBody + AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload()); + methodListener.methodReceived(methodEvent); } - + public void sendResponse(long requestId, AMQMethodBody responseMethodBody) throws RequestResponseMappingException { @@ -124,14 +128,14 @@ public class ResponseManager responseStatus.responseMethodBody = responseMethodBody; doBatches(); } - + // *** Management functions *** public batchResponseModeEnum getBatchResponseMode() { return batchResponseMode; } - + public void setBatchResponseMode(batchResponseModeEnum batchResponseMode) { if (this.batchResponseMode != batchResponseMode) @@ -140,12 +144,12 @@ public class ResponseManager doBatches(); } } - + public int responsesMapSize() { return responseMap.size(); } - + /** * As the responseMap may contain both outstanding responses (those with * ResponseStatus.responseMethodBody still null) and responses waiting to @@ -162,7 +166,7 @@ public class ResponseManager } return cnt; } - + /** * As the responseMap may contain both outstanding responses (those with * ResponseStatus.responseMethodBody still null) and responses waiting to @@ -179,14 +183,14 @@ public class ResponseManager } return cnt; } - + // *** Private helper functions *** - + private long getNextResponseId() { return responseIdCount++; } - + private void doBatches() { switch (batchResponseMode) @@ -204,13 +208,13 @@ public class ResponseManager } } break; - + // TODO: Add additional batch mode handlers here... // case DELAY_FIXED: // case MANUAL: } } - + private void sendResponseBatch(long firstRequestId, int numAdditionalRequests, AMQMethodBody responseMethodBody) { |
