diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 21:24:15 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-15 21:24:15 +0000 |
| commit | 5aaad510dc978dc09f92c774c81255b7af6b8b68 (patch) | |
| tree | 2e057027cafb429e848ca8738b2cdb0f2f95397f /java/common/src | |
| parent | 4c70be2495bd94eba1720c9603b3f0bddb99ffd9 (diff) | |
| download | qpid-python-5aaad510dc978dc09f92c774c81255b7af6b8b68.tar.gz | |
Changed the RequestManager to use AMQMethodListener instead of the old AMQResponseCallback.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496499 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common/src')
4 files changed, 27 insertions, 46 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java b/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java deleted file mode 100644 index ed0e692921..0000000000 --- a/java/common/src/main/java/org/apache/qpid/framing/AMQResponseCallback.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.framing; - -public interface AMQResponseCallback -{ - /** - * Callback for response frames. An instance of this class must be - * passed to RequestResponseManager.sendRequest(). When a response - * is received, then this method will be invoked in the passed - * instance. - */ - public void responseFrameReceived(AMQResponseBody responseBody); -} diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java index 900d068c13..b567aea37e 100644 --- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java +++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java @@ -22,6 +22,8 @@ package org.apache.qpid.framing; import java.util.Hashtable; +import org.apache.qpid.protocol.AMQMethodEvent; +import org.apache.qpid.protocol.AMQMethodListener; import org.apache.qpid.protocol.AMQProtocolWriter; public class RequestManager @@ -41,7 +43,7 @@ public class RequestManager */ private long lastProcessedResponseId; - private Hashtable<Long, AMQResponseCallback> requestSentMap; + private Hashtable<Long, AMQMethodListener> requestSentMap; public RequestManager(int channel, AMQProtocolWriter protocolWriter) { @@ -49,34 +51,36 @@ public class RequestManager this.protocolWriter = protocolWriter; requestIdCount = 1L; lastProcessedResponseId = 0L; - requestSentMap = new Hashtable<Long, AMQResponseCallback>(); + requestSentMap = new Hashtable<Long, AMQMethodListener>(); } // *** Functions to originate a request *** public long sendRequest(AMQMethodBody requestMethodBody, - AMQResponseCallback responseCallback) + AMQMethodListener methodListener) { long requestId = getNextRequestId(); // Get new request ID AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId, lastProcessedResponseId, requestMethodBody); protocolWriter.writeFrame(requestFrame); - requestSentMap.put(requestId, responseCallback); + requestSentMap.put(requestId, methodListener); return requestId; } public void responseReceived(AMQResponseBody responseBody) - throws RequestResponseMappingException + throws Exception { long requestIdStart = responseBody.getRequestId(); long requestIdStop = requestIdStart + responseBody.getBatchOffset(); for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++) { - AMQResponseCallback responseCallback = requestSentMap.get(requestId); - if (responseCallback == null) + AMQMethodListener methodListener = requestSentMap.get(requestId); + if (methodListener == null) throw new RequestResponseMappingException(requestId, "Failed to locate requestId " + requestId + " in requestSentMap."); - responseCallback.responseFrameReceived(responseBody); + AMQMethodEvent methodEvent = new AMQMethodEvent(channel, responseBody.getMethodPayload(), + requestId); + methodListener.methodReceived(methodEvent); requestSentMap.remove(requestId); } lastProcessedResponseId = responseBody.getResponseId(); diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java index c42e49d7c8..c624d2e364 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodEvent.java @@ -37,13 +37,13 @@ public class AMQMethodEvent<M extends AMQMethodBody> { private final M _method; private final int _channelId; - private final long _requestResponseId; + private final long _requestId; - public AMQMethodEvent(int channelId, M method, long requestResponseId) + public AMQMethodEvent(int channelId, M method, long requestId) { _channelId = channelId; _method = method; - _requestResponseId = requestResponseId; + _requestId = requestId; } public M getMethod() @@ -56,9 +56,9 @@ public class AMQMethodEvent<M extends AMQMethodBody> return _channelId; } - public long getRequestResponseId() + public long getRequestId() { - return _requestResponseId; + return _requestId; } public String toString() @@ -66,7 +66,7 @@ public class AMQMethodEvent<M extends AMQMethodBody> StringBuilder buf = new StringBuilder("Method event: \n"); buf.append("Channel id: \n").append(_channelId); buf.append("Method: \n").append(_method); - buf.append("Request/Response Id: ").append(_requestResponseId); + buf.append("Request Id: ").append(_requestId); return buf.toString(); } } diff --git a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java index 41e4ad68c8..5ec9b122af 100644 --- a/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java +++ b/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java @@ -21,6 +21,9 @@ package org.apache.qpid.protocol; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.RequestResponseMappingException; +import org.apache.qpid.protocol.AMQMethodListener; public interface AMQProtocolWriter { @@ -29,4 +32,10 @@ public interface AMQProtocolWriter * @param frame the frame to be encoded and written */ public void writeFrame(AMQDataBlock frame); + + public long writeRequest(int channelNum, AMQMethodBody methodBody, + AMQMethodListener methodListener) throws RequestResponseMappingException; + + public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody) + throws RequestResponseMappingException; } |
