diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-10 21:47:30 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-10 21:47:30 +0000 |
| commit | b6b0463b260df363667c08131e5e5b16bb4daa5e (patch) | |
| tree | 774096746bbf939d92b3c4a8d2479eef3e0fd748 /java/common/src | |
| parent | ddf86268e7bea00710a49bc66fff1a398d41cf0b (diff) | |
| download | qpid-python-b6b0463b260df363667c08131e5e5b16bb4daa5e.tar.gz | |
Created new common interfaces to support the RequestResponseManager on both client and server
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@495000 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
3 files changed, 83 insertions, 24 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java new file mode 100644 index 0000000000..ed0e692921 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.framing; + +public interface AMQResponseCallback +{ + /** + * Callback for response frames. An instance of this class must be + * passed to RequestResponseManager.sendRequest(). When a response + * is received, then this method will be invoked in the passed + * instance. + */ + public void responseFrameReceived(AMQResponseBody responseBody); +} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java index 334a13ffe9..3e86ed7194 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestResponseManager.java @@ -20,17 +20,16 @@ */ package org.apache.qpid.framing; -import java.lang.reflect.Method; import java.util.Iterator; import java.util.TreeMap; import org.apache.qpid.AMQException; -//import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.protocol.AMQProtocolWriter; public class RequestResponseManager { private int channel; -// AMQProtocolSession protocolSession; + AMQProtocolWriter protocolSession; /** * Determines the batch behaviour of the manager. @@ -88,30 +87,29 @@ public class RequestResponseManager } } - private TreeMap<Long, Method> requestSentMap; + private TreeMap<Long, AMQResponseCallback> requestSentMap; private TreeMap<Long, ResponseStatus> responseMap; -// public RequestResponseManager(int channel, AMQProtocolSession protocolSession) - public RequestResponseManager(int channel) + public RequestResponseManager(int channel, AMQProtocolWriter protocolSession) { this.channel = channel; -// this.protocolSession = protocolSession; + this.protocolSession = protocolSession; requestIdCount = 1L; responseIdCount = 1L; lastReceivedRequestId = 0L; lastReceivedResponseId = 0L; - requestSentMap = new TreeMap<Long, Method>(); + requestSentMap = new TreeMap<Long, AMQResponseCallback>(); responseMap = new TreeMap<Long, ResponseStatus>(); } // *** Functions to originate a request *** - public long sendRequest(AMQMethodBody requestMethodBody, Method responseCallback) + public long sendRequest(AMQMethodBody requestMethodBody, AMQResponseCallback responseCallback) { long requestId = getRequestId(); // Get new request ID AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, lastReceivedResponseId, requestMethodBody); -// protocolSession.writeFrame(requestFrame); + protocolSession.writeFrame(requestFrame); requestSentMap.put(requestId, responseCallback); return requestId; } @@ -123,12 +121,11 @@ public class RequestResponseManager long requestIdStop = requestIdStart + responseBody.getBatchOffset(); for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) { - Method responseCallback = requestSentMap.get(requestId); + AMQResponseCallback responseCallback = requestSentMap.get(requestId); if (responseCallback == null) throw new AMQException("Failed to locate requestId " + requestId + " in requestSentMap."); - // TODO - // responseCallback.invoke(?); + responseCallback.responseFrameReceived(responseBody); requestSentMap.remove(requestId); } } @@ -138,11 +135,13 @@ public class RequestResponseManager public void requestReceived(AMQRequestBody requestBody) { long requestId = requestBody.getRequestId(); - long responseMark = requestBody.getResponseMark(); // TODO - what do we do with this?? + // TODO: responseMark is used in HA, but until then, ignore... + long responseMark = requestBody.getResponseMark(); lastReceivedRequestId = requestId; responseMap.put(requestId, new ResponseStatus(requestId)); - // TODO: Initiate some action based on the MethodBody + // TODO: Initiate some action based on the MethodBody - like send to handlers, + // but how to do this in a way that will work for both client and server? } public void sendResponse(long requestId, AMQMethodBody responseMethodBody) throws AMQException @@ -185,14 +184,6 @@ public class RequestResponseManager return responseIdCount++; } -/* private Method findRequest(long requestId) - throws AMQException - { - RequestStatus requestStatus = requestMap.get(requestId); - // TODO - return null; - } -*/ private void doBatches() { switch (batchResponseMode) @@ -210,6 +201,10 @@ public class RequestResponseManager } } break; + + // TODO: Add additional batch mode handlers here... + // case DELAY_FIXED: + // case MANUAL: } } @@ -219,6 +214,6 @@ public class RequestResponseManager long responseId = getResponseId(); // Get new request ID AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId, firstRequestId, numAdditionalRequests, responseMethodBody); -// protocolSession.writeFrame(responseFrame); + protocolSession.writeFrame(responseFrame); } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java new file mode 100644 index 0000000000..41e4ad68c8 --- /dev/null +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.protocol; + +import org.apache.qpid.framing.AMQDataBlock; + +public interface AMQProtocolWriter +{ + /** + * Write a datablock, encoding where necessary (e.g. into a sequence of bytes) + * @param frame the frame to be encoded and written + */ + public void writeFrame(AMQDataBlock frame); +} |
