From bb79efff2408de5f6cd66089cde8b8a82cc80cc2 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 5 Apr 2007 19:16:09 +0000 Subject: * Exteneded use of shared pointers frame bodies across all send() commands. * tests/Makefile.am: added check-unit target to run just unit tests. * Introduced make_shared_ptr convenience function for wrapping plain pointers with shared_ptr. * cpp/src/client/ClientChannel.h,cpp (sendsendAndReceive,sendAndReceiveSync): Pass shared_ptr instead of raw ptr to fix memory problems. Updated the following files to use make_shared_ptr - src/client/BasicMessageChannel.cpp - src/client/ClientConnection.cpp * src/client/MessageMessageChannel.cpp: implemented 0-9 message.get. * src/framing/Correlator.h,cpp: Allow request sender to register actions to take when the correlated response arrives. * cpp/src/tests/FramingTest.cpp: Added Correlator tests. * src/framing/ChannelAdapter.h,cpp: use Correlator to dispatch response actions. * cpp/src/shared_ptr.h (make_shared_ptr): Convenience function to make a shared pointer from a raw pointer. * cpp/src/tests/ClientChannelTest.cpp: Added message.get test. * cpp/src/tests/Makefile.am (check-unit): Added test-unit target to run unit tests. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525932 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/framing/ChannelAdapter.cpp | 22 ++++++++---- cpp/src/framing/ChannelAdapter.h | 28 ++++++++++------ cpp/src/framing/Correlator.cpp | 42 +++++++++++++++++++++++ cpp/src/framing/Correlator.h | 68 ++++++++++++++++++++++++++++++++++++++ cpp/src/framing/Requester.h | 17 +++++----- 5 files changed, 153 insertions(+), 24 deletions(-) create mode 100644 cpp/src/framing/Correlator.cpp create mode 100644 cpp/src/framing/Correlator.h (limited to 'cpp/src/framing') diff --git a/cpp/src/framing/ChannelAdapter.cpp b/cpp/src/framing/ChannelAdapter.cpp index 99a14f08fb..d16934a857 100644 --- a/cpp/src/framing/ChannelAdapter.cpp +++ b/cpp/src/framing/ChannelAdapter.cpp @@ -35,15 +35,19 @@ void ChannelAdapter::init( version = v; } -RequestId ChannelAdapter::send(AMQBody::shared_ptr body) { - RequestId result = 0; +RequestId ChannelAdapter::send( + shared_ptr body, Correlator::Action action) +{ + RequestId requestId = 0; assertChannelOpen(); switch (body->type()) { case REQUEST_BODY: { AMQRequestBody::shared_ptr request = boost::shared_polymorphic_downcast(body); requester.sending(request->getData()); - result = request->getData().requestId; + requestId = request->getData().requestId; + if (!action.empty()) + correlator.request(requestId, action); break; } case RESPONSE_BODY: { @@ -52,9 +56,10 @@ RequestId ChannelAdapter::send(AMQBody::shared_ptr body) { responder.sending(response->getData()); break; } + // No action required for other body types. } out->send(new AMQFrame(getVersion(), getId(), body)); - return result; + return requestId; } void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { @@ -66,10 +71,15 @@ void ChannelAdapter::handleRequest(AMQRequestBody::shared_ptr request) { void ChannelAdapter::handleResponse(AMQResponseBody::shared_ptr response) { assertMethodOk(*response); - // TODO aconway 2007-01-30: Consider a response handled on receipt. - // Review - any cases where this is not the case? AMQResponseBody::Data& responseData = response->getData(); + + // FIXME aconway 2007-04-05: processed should be last + // but causes problems with InProcessBroker tests because + // we execute client code in handleMethod. + // Need to introduce a queue & 2 threads for inprocess. requester.processed(responseData); + // FIXME aconway 2007-04-04: exception handling. + correlator.response(response); handleMethod(response); } diff --git a/cpp/src/framing/ChannelAdapter.h b/cpp/src/framing/ChannelAdapter.h index 493191d92b..1b325495ff 100644 --- a/cpp/src/framing/ChannelAdapter.h +++ b/cpp/src/framing/ChannelAdapter.h @@ -22,11 +22,11 @@ * */ -#include - +#include "../shared_ptr.h" #include "BodyHandler.h" #include "Requester.h" #include "Responder.h" +#include "Correlator.h" #include "amqp_types.h" namespace qpid { @@ -64,17 +64,24 @@ class ChannelAdapter : public BodyHandler { ChannelId getId() const { return id; } ProtocolVersion getVersion() const { return version; } - + /** - * Wrap body in a frame and send the frame. - * Takes ownership of body. + * Send a frame. + *@param body Body of the frame. + *@param action optional action to execute when we receive a + *response to this frame. Ignored if body is not a Request. + *@return If body is a request, the ID assigned else 0. */ - RequestId send(AMQBody::shared_ptr body); + RequestId send(shared_ptr body, + Correlator::Action action=Correlator::Action()); + + // TODO aconway 2007-04-05: remove and use make_shared_ptr at call sites. + /**@deprecated Use make_shared_ptr with the other send() override */ RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); } - void handleMethod(boost::shared_ptr); - void handleRequest(boost::shared_ptr); - void handleResponse(boost::shared_ptr); + void handleMethod(shared_ptr); + void handleRequest(shared_ptr); + void handleResponse(shared_ptr); virtual bool isOpen() const = 0; @@ -84,7 +91,7 @@ class ChannelAdapter : public BodyHandler { void assertChannelNotOpen() const; virtual void handleMethodInContext( - boost::shared_ptr method, + shared_ptr method, const MethodContext& context) = 0; RequestId getFirstAckRequest() { return requester.getFirstAckRequest(); } @@ -97,6 +104,7 @@ class ChannelAdapter : public BodyHandler { ProtocolVersion version; Requester requester; Responder responder; + Correlator correlator; }; }} diff --git a/cpp/src/framing/Correlator.cpp b/cpp/src/framing/Correlator.cpp new file mode 100644 index 0000000000..1c18f6b414 --- /dev/null +++ b/cpp/src/framing/Correlator.cpp @@ -0,0 +1,42 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "Correlator.h" + +namespace qpid { +namespace framing { + +void Correlator::request(RequestId id, Action action) { + actions[id] = action; +} + +bool Correlator::response(shared_ptr r) { + Actions::iterator begin = actions.lower_bound(r->getRequestId()); + Actions::iterator end = + actions.upper_bound(r->getRequestId()+r->getBatchOffset()); + bool didAction = false; + for(Actions::iterator i=begin; i != end; ++i) { + // FIXME aconway 2007-04-04: Exception handling. + didAction = true; + i->second(r); + actions.erase(i); + } + return didAction; +} + +}} // namespace qpid::framing diff --git a/cpp/src/framing/Correlator.h b/cpp/src/framing/Correlator.h new file mode 100644 index 0000000000..b3eb998149 --- /dev/null +++ b/cpp/src/framing/Correlator.h @@ -0,0 +1,68 @@ +#ifndef _framing_Correlator_h +#define _framing_Correlator_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "../shared_ptr.h" +#include "../framing/AMQResponseBody.h" +#include +#include + +namespace qpid { +namespace framing { + +/** + * Correlate responses with actions established when sending the request. + * + * THREAD UNSAFE. + */ +class Correlator +{ + public: + typedef shared_ptr ResponsePtr; + typedef boost::function Action; + + /** + * Note that request with id was sent, record an action to call + * when a response arrives. + */ + void request(RequestId id, Action doOnResponse); + + /** + * Note response received, call action for associated request if any. + * Return true of some action(s) were executed. + */ + bool response(shared_ptr); + + /** + * Note the given execution mark was received, call actions + * for any requests that are impicitly responded to. + */ + void mark(RequestId mark); + + private: + typedef std::map Actions; + Actions actions; +}; + +}} // namespace qpid::framing + + + +#endif /*!_framing_Correlator_h*/ diff --git a/cpp/src/framing/Requester.h b/cpp/src/framing/Requester.h index dcc4460041..65bdc9a5a1 100644 --- a/cpp/src/framing/Requester.h +++ b/cpp/src/framing/Requester.h @@ -32,8 +32,7 @@ class AMQResponseBody; /** * Manage request IDs and the response mark for locally initiated requests. * - * THREAD UNSAFE: This class is called as frames are sent or received - * sequentially on a connection, so it does not need to be thread safe. + * THREAD UNSAFE: must be locked externally. */ class Requester { @@ -46,12 +45,14 @@ class Requester /** Called after processing a response. */ void processed(const AMQResponseBody::Data&); - /** Get the next request id to be used. */ - RequestId getNextId() { return lastId + 1; } - /** Get the first request acked by this response */ - RequestId getFirstAckRequest() { return firstAckRequest; } - /** Get the last request acked by this response */ - RequestId getLastAckRequest() { return lastAckRequest; } + /** Get the next request id to be used. */ + RequestId getNextId() { return lastId + 1; } + + /** Get the first request acked by last response */ + RequestId getFirstAckRequest() { return firstAckRequest; } + + /** Get the last request acked by last response */ + RequestId getLastAckRequest() { return lastAckRequest; } private: RequestId lastId; -- cgit v1.2.1