summaryrefslogtreecommitdiff
path: root/java/common
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-15 16:26:00 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-15 16:26:00 +0000
commitfa15e6d52022cc1576b19e3caaecf66260c1923e (patch)
treea00bdc846c8b772faf199d228a60db87d75c7939 /java/common
parent9ba2ca90c9127ea98372a9758e731dd9fe19c212 (diff)
downloadqpid-python-fa15e6d52022cc1576b19e3caaecf66260c1923e.tar.gz
Request and Respone managers now use the new common AMQMethodListener class
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496389 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/common')
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java26
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/RequestManager.java22
-rw-r--r--java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java62
3 files changed, 44 insertions, 66 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java b/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java
deleted file mode 100644
index f779258e00..0000000000
--- a/java/common/src/main/java/org/apache/qpid/framing/RequestHandler.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.framing;
-
-public interface RequestHandler
-{
- public boolean requestReceived(AMQRequestBody requestBody);
-}
diff --git a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
index 55c25151da..da7479af8c 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/RequestManager.java
@@ -28,21 +28,21 @@ public class RequestManager
{
private int channel;
AMQProtocolWriter protocolSession;
-
+
/**
* Request and response frames must have a requestID and responseID which
* indepenedently increment from 0 on a per-channel basis. These are the
* counters, and contain the value of the next (not yet used) frame.
*/
private long requestIdCount;
-
+
/**
* These keep track of the last requestId and responseId to be received.
*/
private long lastProcessedResponseId;
-
+
private Hashtable<Long, AMQResponseCallback> requestSentMap;
-
+
public RequestManager(int channel, AMQProtocolWriter protocolSession)
{
this.channel = channel;
@@ -51,9 +51,9 @@ public class RequestManager
lastProcessedResponseId = 0L;
requestSentMap = new Hashtable<Long, AMQResponseCallback>();
}
-
+
// *** Functions to originate a request ***
-
+
public long sendRequest(AMQMethodBody requestMethodBody,
AMQResponseCallback responseCallback)
{
@@ -64,7 +64,7 @@ public class RequestManager
requestSentMap.put(requestId, responseCallback);
return requestId;
}
-
+
public void responseReceived(AMQResponseBody responseBody)
throws RequestResponseMappingException
{
@@ -81,16 +81,16 @@ public class RequestManager
}
lastProcessedResponseId = responseBody.getResponseId();
}
-
+
// *** Management functions ***
-
+
public int requestsMapSize()
{
return requestSentMap.size();
}
-
+
// *** Private helper functions ***
-
+
private long getNextRequestId()
{
return requestIdCount++;
diff --git a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
index 280d8d562a..9174675ded 100644
--- a/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
+++ b/java/common/src/main/java/org/apache/qpid/framing/ResponseManager.java
@@ -24,12 +24,14 @@ import java.util.Iterator;
import java.util.Hashtable;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQProtocolWriter;
public class ResponseManager
{
private int channel;
- RequestHandler requestHandler;
+ AMQMethodListener methodListener;
AMQProtocolWriter protocolSession;
/**
@@ -51,66 +53,68 @@ public class ResponseManager
*/
public enum batchResponseModeEnum { NONE }
private batchResponseModeEnum batchResponseMode;
-
+
/**
* Request and response frames must have a requestID and responseID which
* indepenedently increment from 0 on a per-channel basis. These are the
* counters, and contain the value of the next (not yet used) frame.
*/
private long responseIdCount;
-
+
/**
* These keep track of the last requestId and responseId to be received.
*/
private long lastReceivedRequestId;
-
+
/**
* Last requestID sent in a response (for batching)
*/
private long lastSentRequestId;
-
+
private class ResponseStatus implements Comparable<ResponseStatus>
{
- public long requestId;
- public AMQMethodBody responseMethodBody;
-
+ private long requestId;
+ private AMQMethodBody responseMethodBody;
+
public ResponseStatus(long requestId)
{
- this.requestId = requestId;
+ this.requestId = requestId;
responseMethodBody = null;
}
-
+
public int compareTo(ResponseStatus o)
{
return (int)(requestId - o.requestId);
}
}
-
+
private Hashtable<Long, ResponseStatus> responseMap;
-
- public ResponseManager(int channel, RequestHandler requestHandler,
+
+ public ResponseManager(int channel, AMQMethodListener methodListener,
AMQProtocolWriter protocolSession)
{
this.channel = channel;
- this.requestHandler = requestHandler;
+ this.methodListener = methodListener;
this.protocolSession = protocolSession;
responseIdCount = 1L;
lastReceivedRequestId = 0L;
responseMap = new Hashtable<Long, ResponseStatus>();
}
-
+
// *** Functions to handle an incoming request ***
-
- public void requestReceived(AMQRequestBody requestBody)
+
+ public void requestReceived(AMQRequestBody requestBody) throws Exception
{
long requestId = requestBody.getRequestId();
// TODO: responseMark is used in HA, but until then, ignore...
long responseMark = requestBody.getResponseMark();
lastReceivedRequestId = requestId;
responseMap.put(requestId, new ResponseStatus(requestId));
- requestHandler.requestReceived(requestBody);
+ // TODO: Update MethodEvent to use the RequestBody instead of MethodBody
+ AMQMethodEvent methodEvent = new AMQMethodEvent(channel, requestBody.getMethodPayload());
+ methodListener.methodReceived(methodEvent);
}
-
+
public void sendResponse(long requestId, AMQMethodBody responseMethodBody)
throws RequestResponseMappingException
{
@@ -124,14 +128,14 @@ public class ResponseManager
responseStatus.responseMethodBody = responseMethodBody;
doBatches();
}
-
+
// *** Management functions ***
public batchResponseModeEnum getBatchResponseMode()
{
return batchResponseMode;
}
-
+
public void setBatchResponseMode(batchResponseModeEnum batchResponseMode)
{
if (this.batchResponseMode != batchResponseMode)
@@ -140,12 +144,12 @@ public class ResponseManager
doBatches();
}
}
-
+
public int responsesMapSize()
{
return responseMap.size();
}
-
+
/**
* As the responseMap may contain both outstanding responses (those with
* ResponseStatus.responseMethodBody still null) and responses waiting to
@@ -162,7 +166,7 @@ public class ResponseManager
}
return cnt;
}
-
+
/**
* As the responseMap may contain both outstanding responses (those with
* ResponseStatus.responseMethodBody still null) and responses waiting to
@@ -179,14 +183,14 @@ public class ResponseManager
}
return cnt;
}
-
+
// *** Private helper functions ***
-
+
private long getNextResponseId()
{
return responseIdCount++;
}
-
+
private void doBatches()
{
switch (batchResponseMode)
@@ -204,13 +208,13 @@ public class ResponseManager
}
}
break;
-
+
// TODO: Add additional batch mode handlers here...
// case DELAY_FIXED:
// case MANUAL:
}
}
-
+
private void sendResponseBatch(long firstRequestId, int numAdditionalRequests,
AMQMethodBody responseMethodBody)
{