diff options
| author | Alan Conway <aconway@apache.org> | 2007-06-27 21:19:14 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-06-27 21:19:14 +0000 |
| commit | 0efcf2c5c91f4927ccc00ad1cf391c2f964cc2e1 (patch) | |
| tree | a9318ac4787cf588dd1329c2e557d8f870be20cc /cpp/src/qpid/framing | |
| parent | 548abd065f91bc1f238ac98c24edf410edf10356 (diff) | |
| download | qpid-python-0efcf2c5c91f4927ccc00ad1cf391c2f964cc2e1.tar.gz | |
* src/qpid/framing/ChannelAdapter.cpp: Use handler chains
for in and outbound frames.
* src/qpid/framing/InputHandler.h, OutputHandler.h, FrameHandler.h:
All handlers pass AMQFrame& and have consistent memory management.
Terminal OutputHandlers used to take ownership and delete frame, now
they make a shallow copy instead.
* src/qpid/framing/Handler.h, FrameHandler.h: Simplified.
* src/qpid/client/ClientConnection.cpp:
* src/qpid/broker/Connection.cpp:
* src/qpid/broker/BrokerChannel.cpp:
Update for ChannelAdapter changes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551336 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
| -rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.h | 39 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameHandler.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/Handler.h | 70 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/InputHandler.h | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/OutputHandler.h | 19 |
6 files changed, 83 insertions, 85 deletions
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp index 48a200c4e1..4ee834b561 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -19,6 +19,7 @@ #include "ChannelAdapter.h" #include "AMQFrame.h" +#include "FrameHandler.h" #include "qpid/Exception.h" using boost::format; @@ -26,13 +27,21 @@ using boost::format; namespace qpid { namespace framing { -void ChannelAdapter::init( - ChannelId i, OutputHandler& o, ProtocolVersion v) +/** Framehandler that feeds into the channel. */ +struct ChannelAdapter::ChannelAdapterHandler : public FrameHandler { + ChannelAdapterHandler(ChannelAdapter& channel_) : channel(channel_) {} + void handle(AMQFrame& frame) { channel.handleBody(frame.getBody()); } + ChannelAdapter& channel; +}; + +void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) { assertChannelNotOpen(); id = i; - out = &o; version = v; + + handlers.in = make_shared_ptr(new ChannelAdapterHandler(*this)); + handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out)); } RequestId ChannelAdapter::send( @@ -58,7 +67,8 @@ RequestId ChannelAdapter::send( } // No action required for other body types. } - out->send(new AMQFrame(getVersion(), getId(), body)); + AMQFrame frame(getVersion(), getId(), body); + handlers.out->handle(frame); return requestId; } diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 75e4d26ab3..0597e5e372 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -1,3 +1,6 @@ + + + #ifndef _ChannelAdapter_ #define _ChannelAdapter_ @@ -28,40 +31,39 @@ #include "Responder.h" #include "Correlator.h" #include "amqp_types.h" +#include "FrameHandler.h" namespace qpid { namespace framing { class MethodContext; -// FIXME aconway 2007-02-20: Rename as ChannelBase or just Channel. - /** * Base class for client and broker channels. * - * - receives frame bodies from the network. - * - Updates request/response data. - * - Dispatches requests with a MethodContext for responses. + * Provides in/out handler chains containing channel handlers. + * Chains may be modified by ChannelUpdaters registered with the broker. + * + * The handlers provided by the ChannelAdapter update request/response data. * - * send() - * - Updates request/resposne ID data. - * - Forwards frame to the peer. + * send() constructs a frame, updates request/resposne ID and forwards it + * to the out() chain. * * Thread safety: OBJECT UNSAFE. Instances must not be called * concurrently. AMQP defines channels to be serialized. */ -class ChannelAdapter : public BodyHandler { +class ChannelAdapter : private BodyHandler { public: /** *@param output Processed frames are forwarded to this handler. */ - ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0, - ProtocolVersion ver=ProtocolVersion()) - : id(id_), out(out_), version(ver) {} + ChannelAdapter() : id(0) {} /** Initialize the channel adapter. */ void init(ChannelId, OutputHandler&, ProtocolVersion); + FrameHandler::Chains& getHandlers() { return handlers; } + ChannelId getId() const { return id; } ProtocolVersion getVersion() const { return version; } @@ -79,10 +81,6 @@ class ChannelAdapter : public BodyHandler { /**@deprecated Use make_shared_ptr with the other send() override */ RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); } - void handleMethod(shared_ptr<AMQMethodBody>); - void handleRequest(shared_ptr<AMQRequestBody>); - void handleResponse(shared_ptr<AMQResponseBody>); - virtual bool isOpen() const = 0; protected: @@ -99,12 +97,19 @@ class ChannelAdapter : public BodyHandler { RequestId getNextSendRequestId() { return requester.getNextId(); } private: + class ChannelAdapterHandler; + friend class ChannelAdapterHandler; + + void handleMethod(shared_ptr<AMQMethodBody>); + void handleRequest(shared_ptr<AMQRequestBody>); + void handleResponse(shared_ptr<AMQResponseBody>); + ChannelId id; - OutputHandler* out; ProtocolVersion version; Requester requester; Responder responder; Correlator correlator; + FrameHandler::Chains handlers; }; }} diff --git a/cpp/src/qpid/framing/FrameHandler.h b/cpp/src/qpid/framing/FrameHandler.h index c552fbb9df..457968c35e 100644 --- a/cpp/src/qpid/framing/FrameHandler.h +++ b/cpp/src/qpid/framing/FrameHandler.h @@ -20,14 +20,14 @@ * under the License. * */ - #include "Handler.h" namespace qpid { namespace framing { + class AMQFrame; typedef Handler<AMQFrame&> FrameHandler; -}} +}} #endif /*!QPID_FRAMING_FRAMEHANDLER_H*/ diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index 05a02a30b1..56e150a66d 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -22,75 +22,29 @@ * */ #include "qpid/shared_ptr.h" -#include <vector> #include <assert.h> namespace qpid { namespace framing { -/** Handler for objects of type T. */ +/** Interface for handler for values of type T. + * Handlers can be linked into chains via the next pointer. + */ template <class T> struct Handler { - typedef T Type; - typedef shared_ptr<Handler> Ptr; + typedef T ParamType; + typedef shared_ptr<Handler> Chain; + + /** Handler chains for incoming and outgoing traffic. */ + struct Chains { + Chain in; + Chain out; + }; virtual ~Handler() {} virtual void handle(T) = 0; - virtual void link(Ptr next_) { next=next_; } - protected: - void nextHandler(T data) { if (next) next->handle(data); } - private: - Ptr next; -}; - - -/** Factory interface that takes a context of type C */ -template <class T, class C> struct HandlerFactory { - virtual ~HandlerFactory() {} - typedef typename Handler<T>::Ptr Ptr; - - /** Create a handler */ - virtual Ptr create(C context) = 0; - - /** Create a handler and link it to next */ - Ptr create(C context, Ptr next) { - Ptr h=create(context); - h->link(next); - } -}; - -/** Factory implementation template */ -template <class FH, class C> -struct HandlerFactoryImpl : public HandlerFactory<typename FH::Type, C> { - shared_ptr<Handler<typename FH::Type> > create(C context) { - return typename FH::Ptr(new FH(context)); - } + Chain next; }; -/** A factory chain is a vector of handler factories used to create - * handler chains. The chain does not own the factories. - */ -template <class T, class C> -struct HandlerFactoryChain : public std::vector<HandlerFactory<T,C>* > { - typedef typename Handler<T>::Ptr Ptr; - - /** Create a handler chain, return the first handler. - *@param context - passed to each factory. - */ - Ptr create(C context) { - return this->create(context, this->begin()); - } - - private: - typedef typename std::vector<HandlerFactory<T,C>*>::iterator iterator; - Ptr create(C context, iterator i) { - if (i != this->end()) { - Ptr h=(*i)->create(context); - h->link(create(context, i+1)); - return h; - } - return Ptr(); - } -}; }} diff --git a/cpp/src/qpid/framing/InputHandler.h b/cpp/src/qpid/framing/InputHandler.h index dc6814b849..48a96803da 100644 --- a/cpp/src/qpid/framing/InputHandler.h +++ b/cpp/src/qpid/framing/InputHandler.h @@ -21,7 +21,7 @@ * */ -#include "AMQFrame.h" +#include "FrameHandler.h" #include <boost/noncopyable.hpp> namespace qpid { @@ -30,7 +30,21 @@ namespace framing { class InputHandler : private boost::noncopyable { public: virtual ~InputHandler() {} - virtual void received(AMQFrame* frame) = 0; + virtual void received(AMQFrame&) = 0; +}; + +/** FrameHandler that delegates to an InputHandler */ +struct InputHandlerFrameHandler : public FrameHandler { + InputHandlerFrameHandler(InputHandler& in_) : in(in_) {} + void handle(ParamType frame) { in.received(frame); } + InputHandler& in; +}; + +/** InputHandler that delegates to a FrameHandler */ +struct FrameHandlerInputHandler : public InputHandler { + FrameHandlerInputHandler(shared_ptr<FrameHandler> h) : handler(h) {} + void received(AMQFrame& frame) { handler->handle(frame); } + FrameHandler::Chain handler; }; }} diff --git a/cpp/src/qpid/framing/OutputHandler.h b/cpp/src/qpid/framing/OutputHandler.h index 9ffd4227d8..89917ac3df 100644 --- a/cpp/src/qpid/framing/OutputHandler.h +++ b/cpp/src/qpid/framing/OutputHandler.h @@ -22,17 +22,32 @@ * */ #include <boost/noncopyable.hpp> +#include "FrameHandler.h" namespace qpid { namespace framing { -class AMQFrame; class OutputHandler : private boost::noncopyable { public: virtual ~OutputHandler() {} - virtual void send(AMQFrame* frame) = 0; + virtual void send(AMQFrame&) = 0; }; +/** OutputHandler that delegates to a FrameHandler */ +struct FrameHandlerOutputHandler : public OutputHandler { + FrameHandlerOutputHandler(shared_ptr<FrameHandler> h) : handler(h) {} + void received(AMQFrame& frame) { handler->handle(frame); } + FrameHandler::Chain handler; +}; + +/** FrameHandler that delegates to an OutputHandler */ +struct OutputHandlerFrameHandler : public FrameHandler { + OutputHandlerFrameHandler(OutputHandler& out_) : out(out_) {} + void handle(ParamType frame) { out.send(frame); } + OutputHandler& out; +}; + + }} |
