summaryrefslogtreecommitdiff
path: root/cpp/src/framing
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/framing')
-rw-r--r--cpp/src/framing/ChannelAdapter.cpp22
-rw-r--r--cpp/src/framing/ChannelAdapter.h28
-rw-r--r--cpp/src/framing/Correlator.cpp42
-rw-r--r--cpp/src/framing/Correlator.h68
-rw-r--r--cpp/src/framing/Requester.h17
5 files changed, 153 insertions, 24 deletions
diff --git a/cpp/src/framing/ChannelAdapter.cpp b/cpp/src/framing/ChannelAdapter.cpp
index 99a14f08fb..d16934a857 100644
--- a/cpp/src/framing/ChannelAdapter.cpp
+++ b/cpp/src/framing/ChannelAdapter.cpp
@@ -35,15 +35,19 @@ void ChannelAdapter::init(
version = v;
}
-RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
- RequestId result = 0;
+RequestId ChannelAdapter::send(
+ shared_ptr<AMQBody> body, Correlator::Action action)
+{
+ RequestId requestId = 0;
assertChannelOpen();
switch (body->type()) {
case REQUEST_BODY: {
AMQRequestBody::shared_ptr request =
boost::shared_polymorphic_downcast<AMQRequestBody>(body);
requester.sending(request->getData());
- result = request->getData().requestId;
+ requestId = request->getData().requestId;
+ if (!action.empty())
+ correlator.request(requestId, action);
break;
}
case RESPONSE_BODY: {
@@ -52,9 +56,10 @@ RequestId ChannelAdapter::send(AMQBody::shared_ptr body) {
responder.sending(response->getData());
break;
}
+ // No action required for other body types.
}
out->send(new AMQFrame(getVersion(), getId(), body));
- return result;
+ return requestId;
}
void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
@@ -66,10 +71,15 @@ void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) {
void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) {
assertMethodOk(*response);
- // TODO aconway 2007-01-30: Consider a response handled on receipt.
- // Review - any cases where this is not the case?
AMQResponseBody::Data& responseData = response->getData();
+
+ // FIXME aconway 2007-04-05: processed should be last
+ // but causes problems with InProcessBroker tests because
+ // we execute client code in handleMethod.
+ // Need to introduce a queue & 2 threads for inprocess.
requester.processed(responseData);
+ // FIXME aconway 2007-04-04: exception handling.
+ correlator.response(response);
handleMethod(response);
}
diff --git a/cpp/src/framing/ChannelAdapter.h b/cpp/src/framing/ChannelAdapter.h
index 493191d92b..1b325495ff 100644
--- a/cpp/src/framing/ChannelAdapter.h
+++ b/cpp/src/framing/ChannelAdapter.h
@@ -22,11 +22,11 @@
*
*/
-#include <boost/shared_ptr.hpp>
-
+#include "../shared_ptr.h"
#include "BodyHandler.h"
#include "Requester.h"
#include "Responder.h"
+#include "Correlator.h"
#include "amqp_types.h"
namespace qpid {
@@ -64,17 +64,24 @@ class ChannelAdapter : public BodyHandler {
ChannelId getId() const { return id; }
ProtocolVersion getVersion() const { return version; }
-
+
/**
- * Wrap body in a frame and send the frame.
- * Takes ownership of body.
+ * Send a frame.
+ *@param body Body of the frame.
+ *@param action optional action to execute when we receive a
+ *response to this frame. Ignored if body is not a Request.
+ *@return If body is a request, the ID assigned else 0.
*/
- RequestId send(AMQBody::shared_ptr body);
+ RequestId send(shared_ptr<AMQBody> body,
+ Correlator::Action action=Correlator::Action());
+
+ // TODO aconway 2007-04-05: remove and use make_shared_ptr at call sites.
+ /**@deprecated Use make_shared_ptr with the other send() override */
RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
- void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>);
- void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>);
- void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>);
+ void handleMethod(shared_ptr<AMQMethodBody>);
+ void handleRequest(shared_ptr<AMQRequestBody>);
+ void handleResponse(shared_ptr<AMQResponseBody>);
virtual bool isOpen() const = 0;
@@ -84,7 +91,7 @@ class ChannelAdapter : public BodyHandler {
void assertChannelNotOpen() const;
virtual void handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
+ shared_ptr<AMQMethodBody> method,
const MethodContext& context) = 0;
RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); }
@@ -97,6 +104,7 @@ class ChannelAdapter : public BodyHandler {
ProtocolVersion version;
Requester requester;
Responder responder;
+ Correlator correlator;
};
}}
diff --git a/cpp/src/framing/Correlator.cpp b/cpp/src/framing/Correlator.cpp
new file mode 100644
index 0000000000..1c18f6b414
--- /dev/null
+++ b/cpp/src/framing/Correlator.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "Correlator.h"
+
+namespace qpid {
+namespace framing {
+
+void Correlator::request(RequestId id, Action action) {
+ actions[id] = action;
+}
+
+bool Correlator::response(shared_ptr<AMQResponseBody> r) {
+ Actions::iterator begin = actions.lower_bound(r->getRequestId());
+ Actions::iterator end =
+ actions.upper_bound(r->getRequestId()+r->getBatchOffset());
+ bool didAction = false;
+ for(Actions::iterator i=begin; i != end; ++i) {
+ // FIXME aconway 2007-04-04: Exception handling.
+ didAction = true;
+ i->second(r);
+ actions.erase(i);
+ }
+ return didAction;
+}
+
+}} // namespace qpid::framing
diff --git a/cpp/src/framing/Correlator.h b/cpp/src/framing/Correlator.h
new file mode 100644
index 0000000000..b3eb998149
--- /dev/null
+++ b/cpp/src/framing/Correlator.h
@@ -0,0 +1,68 @@
+#ifndef _framing_Correlator_h
+#define _framing_Correlator_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "../shared_ptr.h"
+#include "../framing/AMQResponseBody.h"
+#include <boost/function.hpp>
+#include <map>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Correlate responses with actions established when sending the request.
+ *
+ * THREAD UNSAFE.
+ */
+class Correlator
+{
+ public:
+ typedef shared_ptr<AMQResponseBody> ResponsePtr;
+ typedef boost::function<void (ResponsePtr)> Action;
+
+ /**
+ * Note that request with id was sent, record an action to call
+ * when a response arrives.
+ */
+ void request(RequestId id, Action doOnResponse);
+
+ /**
+ * Note response received, call action for associated request if any.
+ * Return true of some action(s) were executed.
+ */
+ bool response(shared_ptr<AMQResponseBody>);
+
+ /**
+ * Note the given execution mark was received, call actions
+ * for any requests that are impicitly responded to.
+ */
+ void mark(RequestId mark);
+
+ private:
+ typedef std::map<RequestId, Action> Actions;
+ Actions actions;
+};
+
+}} // namespace qpid::framing
+
+
+
+#endif /*!_framing_Correlator_h*/
diff --git a/cpp/src/framing/Requester.h b/cpp/src/framing/Requester.h
index dcc4460041..65bdc9a5a1 100644
--- a/cpp/src/framing/Requester.h
+++ b/cpp/src/framing/Requester.h
@@ -32,8 +32,7 @@ class AMQResponseBody;
/**
* Manage request IDs and the response mark for locally initiated requests.
*
- * THREAD UNSAFE: This class is called as frames are sent or received
- * sequentially on a connection, so it does not need to be thread safe.
+ * THREAD UNSAFE: must be locked externally.
*/
class Requester
{
@@ -46,12 +45,14 @@ class Requester
/** Called after processing a response. */
void processed(const AMQResponseBody::Data&);
- /** Get the next request id to be used. */
- RequestId getNextId() { return lastId + 1; }
- /** Get the first request acked by this response */
- RequestId getFirstAckRequest() { return firstAckRequest; }
- /** Get the last request acked by this response */
- RequestId getLastAckRequest() { return lastAckRequest; }
+ /** Get the next request id to be used. */
+ RequestId getNextId() { return lastId + 1; }
+
+ /** Get the first request acked by last response */
+ RequestId getFirstAckRequest() { return firstAckRequest; }
+
+ /** Get the last request acked by last response */
+ RequestId getLastAckRequest() { return lastAckRequest; }
private:
RequestId lastId;