diff options
Diffstat (limited to 'cpp/src/framing')
| -rw-r--r-- | cpp/src/framing/ChannelAdapter.cpp | 22 | ||||
| -rw-r--r-- | cpp/src/framing/ChannelAdapter.h | 28 | ||||
| -rw-r--r-- | cpp/src/framing/Correlator.cpp | 42 | ||||
| -rw-r--r-- | cpp/src/framing/Correlator.h | 68 | ||||
| -rw-r--r-- | cpp/src/framing/Requester.h | 17 |
5 files changed, 153 insertions, 24 deletions
diff --git a/cpp/src/framing/ChannelAdapter.cpp b/cpp/src/framing/ChannelAdapter.cpp index 99a14f08fb..d16934a857 100644 --- a/cpp/src/framing/ChannelAdapter.cpp +++ b/cpp/src/framing/ChannelAdapter.cpp @@ -35,15 +35,19 @@ void ChannelAdapter::init( version = v; } -RequestId ChannelAdapter::send(AMQBody::shared_ptr body) { - RequestId result = 0; +RequestId ChannelAdapter::send( + shared_ptr<AMQBody> body, Correlator::Action action) +{ + RequestId requestId = 0; assertChannelOpen(); switch (body->type()) { case REQUEST_BODY: { AMQRequestBody::shared_ptr request = boost::shared_polymorphic_downcast<AMQRequestBody>(body); requester.sending(request->getData()); - result = request->getData().requestId; + requestId = request->getData().requestId; + if (!action.empty()) + correlator.request(requestId, action); break; } case RESPONSE_BODY: { @@ -52,9 +56,10 @@ RequestId ChannelAdapter::send(AMQBody::shared_ptr body) { responder.sending(response->getData()); break; } + // No action required for other body types. } out->send(new AMQFrame(getVersion(), getId(), body)); - return result; + return requestId; } void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { @@ -66,10 +71,15 @@ void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { assertMethodOk(*response); - // TODO aconway 2007-01-30: Consider a response handled on receipt. - // Review - any cases where this is not the case? AMQResponseBody::Data& responseData = response->getData(); + + // FIXME aconway 2007-04-05: processed should be last + // but causes problems with InProcessBroker tests because + // we execute client code in handleMethod. + // Need to introduce a queue & 2 threads for inprocess. requester.processed(responseData); + // FIXME aconway 2007-04-04: exception handling. + correlator.response(response); handleMethod(response); } diff --git a/cpp/src/framing/ChannelAdapter.h b/cpp/src/framing/ChannelAdapter.h index 493191d92b..1b325495ff 100644 --- a/cpp/src/framing/ChannelAdapter.h +++ b/cpp/src/framing/ChannelAdapter.h @@ -22,11 +22,11 @@ * */ -#include <boost/shared_ptr.hpp> - +#include "../shared_ptr.h" #include "BodyHandler.h" #include "Requester.h" #include "Responder.h" +#include "Correlator.h" #include "amqp_types.h" namespace qpid { @@ -64,17 +64,24 @@ class ChannelAdapter : public BodyHandler { ChannelId getId() const { return id; } ProtocolVersion getVersion() const { return version; } - + /** - * Wrap body in a frame and send the frame. - * Takes ownership of body. + * Send a frame. + *@param body Body of the frame. + *@param action optional action to execute when we receive a + *response to this frame. Ignored if body is not a Request. + *@return If body is a request, the ID assigned else 0. */ - RequestId send(AMQBody::shared_ptr body); + RequestId send(shared_ptr<AMQBody> body, + Correlator::Action action=Correlator::Action()); + + // TODO aconway 2007-04-05: remove and use make_shared_ptr at call sites. + /**@deprecated Use make_shared_ptr with the other send() override */ RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); } - void handleMethod(boost::shared_ptr<qpid::framing::AMQMethodBody>); - void handleRequest(boost::shared_ptr<qpid::framing::AMQRequestBody>); - void handleResponse(boost::shared_ptr<qpid::framing::AMQResponseBody>); + void handleMethod(shared_ptr<AMQMethodBody>); + void handleRequest(shared_ptr<AMQRequestBody>); + void handleResponse(shared_ptr<AMQResponseBody>); virtual bool isOpen() const = 0; @@ -84,7 +91,7 @@ class ChannelAdapter : public BodyHandler { void assertChannelNotOpen() const; virtual void handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, + shared_ptr<AMQMethodBody> method, const MethodContext& context) = 0; RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } @@ -97,6 +104,7 @@ class ChannelAdapter : public BodyHandler { ProtocolVersion version; Requester requester; Responder responder; + Correlator correlator; }; }} diff --git a/cpp/src/framing/Correlator.cpp b/cpp/src/framing/Correlator.cpp new file mode 100644 index 0000000000..1c18f6b414 --- /dev/null +++ b/cpp/src/framing/Correlator.cpp @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Correlator.h" + +namespace qpid { +namespace framing { + +void Correlator::request(RequestId id, Action action) { + actions[id] = action; +} + +bool Correlator::response(shared_ptr<AMQResponseBody> r) { + Actions::iterator begin = actions.lower_bound(r->getRequestId()); + Actions::iterator end = + actions.upper_bound(r->getRequestId()+r->getBatchOffset()); + bool didAction = false; + for(Actions::iterator i=begin; i != end; ++i) { + // FIXME aconway 2007-04-04: Exception handling. + didAction = true; + i->second(r); + actions.erase(i); + } + return didAction; +} + +}} // namespace qpid::framing diff --git a/cpp/src/framing/Correlator.h b/cpp/src/framing/Correlator.h new file mode 100644 index 0000000000..b3eb998149 --- /dev/null +++ b/cpp/src/framing/Correlator.h @@ -0,0 +1,68 @@ +#ifndef _framing_Correlator_h +#define _framing_Correlator_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "../shared_ptr.h" +#include "../framing/AMQResponseBody.h" +#include <boost/function.hpp> +#include <map> + +namespace qpid { +namespace framing { + +/** + * Correlate responses with actions established when sending the request. + * + * THREAD UNSAFE. + */ +class Correlator +{ + public: + typedef shared_ptr<AMQResponseBody> ResponsePtr; + typedef boost::function<void (ResponsePtr)> Action; + + /** + * Note that request with id was sent, record an action to call + * when a response arrives. + */ + void request(RequestId id, Action doOnResponse); + + /** + * Note response received, call action for associated request if any. + * Return true of some action(s) were executed. + */ + bool response(shared_ptr<AMQResponseBody>); + + /** + * Note the given execution mark was received, call actions + * for any requests that are impicitly responded to. + */ + void mark(RequestId mark); + + private: + typedef std::map<RequestId, Action> Actions; + Actions actions; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_Correlator_h*/ diff --git a/cpp/src/framing/Requester.h b/cpp/src/framing/Requester.h index dcc4460041..65bdc9a5a1 100644 --- a/cpp/src/framing/Requester.h +++ b/cpp/src/framing/Requester.h @@ -32,8 +32,7 @@ class AMQResponseBody; /** * Manage request IDs and the response mark for locally initiated requests. * - * THREAD UNSAFE: This class is called as frames are sent or received - * sequentially on a connection, so it does not need to be thread safe. + * THREAD UNSAFE: must be locked externally. */ class Requester { @@ -46,12 +45,14 @@ class Requester /** Called after processing a response. */ void processed(const AMQResponseBody::Data&); - /** Get the next request id to be used. */ - RequestId getNextId() { return lastId + 1; } - /** Get the first request acked by this response */ - RequestId getFirstAckRequest() { return firstAckRequest; } - /** Get the last request acked by this response */ - RequestId getLastAckRequest() { return lastAckRequest; } + /** Get the next request id to be used. */ + RequestId getNextId() { return lastId + 1; } + + /** Get the first request acked by last response */ + RequestId getFirstAckRequest() { return firstAckRequest; } + + /** Get the last request acked by last response */ + RequestId getLastAckRequest() { return lastAckRequest; } private: RequestId lastId; |
