summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-06-27 21:19:14 +0000
committerAlan Conway <aconway@apache.org>2007-06-27 21:19:14 +0000
commit0efcf2c5c91f4927ccc00ad1cf391c2f964cc2e1 (patch)
treea9318ac4787cf588dd1329c2e557d8f870be20cc /cpp/src/qpid/framing
parent548abd065f91bc1f238ac98c24edf410edf10356 (diff)
downloadqpid-python-0efcf2c5c91f4927ccc00ad1cf391c2f964cc2e1.tar.gz
* src/qpid/framing/ChannelAdapter.cpp: Use handler chains
for in and outbound frames. * src/qpid/framing/InputHandler.h, OutputHandler.h, FrameHandler.h: All handlers pass AMQFrame& and have consistent memory management. Terminal OutputHandlers used to take ownership and delete frame, now they make a shallow copy instead. * src/qpid/framing/Handler.h, FrameHandler.h: Simplified. * src/qpid/client/ClientConnection.cpp: * src/qpid/broker/Connection.cpp: * src/qpid/broker/BrokerChannel.cpp: Update for ChannelAdapter changes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@551336 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp18
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h39
-rw-r--r--cpp/src/qpid/framing/FrameHandler.h4
-rw-r--r--cpp/src/qpid/framing/Handler.h70
-rw-r--r--cpp/src/qpid/framing/InputHandler.h18
-rw-r--r--cpp/src/qpid/framing/OutputHandler.h19
6 files changed, 83 insertions, 85 deletions
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index 48a200c4e1..4ee834b561 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -19,6 +19,7 @@
#include "ChannelAdapter.h"
#include "AMQFrame.h"
+#include "FrameHandler.h"
#include "qpid/Exception.h"
using boost::format;
@@ -26,13 +27,21 @@ using boost::format;
namespace qpid {
namespace framing {
-void ChannelAdapter::init(
- ChannelId i, OutputHandler& o, ProtocolVersion v)
+/** Framehandler that feeds into the channel. */
+struct ChannelAdapter::ChannelAdapterHandler : public FrameHandler {
+ ChannelAdapterHandler(ChannelAdapter& channel_) : channel(channel_) {}
+ void handle(AMQFrame& frame) { channel.handleBody(frame.getBody()); }
+ ChannelAdapter& channel;
+};
+
+void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
{
assertChannelNotOpen();
id = i;
- out = &o;
version = v;
+
+ handlers.in = make_shared_ptr(new ChannelAdapterHandler(*this));
+ handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out));
}
RequestId ChannelAdapter::send(
@@ -58,7 +67,8 @@ RequestId ChannelAdapter::send(
}
// No action required for other body types.
}
- out->send(new AMQFrame(getVersion(), getId(), body));
+ AMQFrame frame(getVersion(), getId(), body);
+ handlers.out->handle(frame);
return requestId;
}
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 75e4d26ab3..0597e5e372 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -1,3 +1,6 @@
+
+
+
#ifndef _ChannelAdapter_
#define _ChannelAdapter_
@@ -28,40 +31,39 @@
#include "Responder.h"
#include "Correlator.h"
#include "amqp_types.h"
+#include "FrameHandler.h"
namespace qpid {
namespace framing {
class MethodContext;
-// FIXME aconway 2007-02-20: Rename as ChannelBase or just Channel.
-
/**
* Base class for client and broker channels.
*
- * - receives frame bodies from the network.
- * - Updates request/response data.
- * - Dispatches requests with a MethodContext for responses.
+ * Provides in/out handler chains containing channel handlers.
+ * Chains may be modified by ChannelUpdaters registered with the broker.
+ *
+ * The handlers provided by the ChannelAdapter update request/response data.
*
- * send()
- * - Updates request/resposne ID data.
- * - Forwards frame to the peer.
+ * send() constructs a frame, updates request/resposne ID and forwards it
+ * to the out() chain.
*
* Thread safety: OBJECT UNSAFE. Instances must not be called
* concurrently. AMQP defines channels to be serialized.
*/
-class ChannelAdapter : public BodyHandler {
+class ChannelAdapter : private BodyHandler {
public:
/**
*@param output Processed frames are forwarded to this handler.
*/
- ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0,
- ProtocolVersion ver=ProtocolVersion())
- : id(id_), out(out_), version(ver) {}
+ ChannelAdapter() : id(0) {}
/** Initialize the channel adapter. */
void init(ChannelId, OutputHandler&, ProtocolVersion);
+ FrameHandler::Chains& getHandlers() { return handlers; }
+
ChannelId getId() const { return id; }
ProtocolVersion getVersion() const { return version; }
@@ -79,10 +81,6 @@ class ChannelAdapter : public BodyHandler {
/**@deprecated Use make_shared_ptr with the other send() override */
RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
- void handleMethod(shared_ptr<AMQMethodBody>);
- void handleRequest(shared_ptr<AMQRequestBody>);
- void handleResponse(shared_ptr<AMQResponseBody>);
-
virtual bool isOpen() const = 0;
protected:
@@ -99,12 +97,19 @@ class ChannelAdapter : public BodyHandler {
RequestId getNextSendRequestId() { return requester.getNextId(); }
private:
+ class ChannelAdapterHandler;
+ friend class ChannelAdapterHandler;
+
+ void handleMethod(shared_ptr<AMQMethodBody>);
+ void handleRequest(shared_ptr<AMQRequestBody>);
+ void handleResponse(shared_ptr<AMQResponseBody>);
+
ChannelId id;
- OutputHandler* out;
ProtocolVersion version;
Requester requester;
Responder responder;
Correlator correlator;
+ FrameHandler::Chains handlers;
};
}}
diff --git a/cpp/src/qpid/framing/FrameHandler.h b/cpp/src/qpid/framing/FrameHandler.h
index c552fbb9df..457968c35e 100644
--- a/cpp/src/qpid/framing/FrameHandler.h
+++ b/cpp/src/qpid/framing/FrameHandler.h
@@ -20,14 +20,14 @@
* under the License.
*
*/
-
#include "Handler.h"
namespace qpid {
namespace framing {
+
class AMQFrame;
typedef Handler<AMQFrame&> FrameHandler;
-}}
+}}
#endif /*!QPID_FRAMING_FRAMEHANDLER_H*/
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index 05a02a30b1..56e150a66d 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -22,75 +22,29 @@
*
*/
#include "qpid/shared_ptr.h"
-#include <vector>
#include <assert.h>
namespace qpid {
namespace framing {
-/** Handler for objects of type T. */
+/** Interface for handler for values of type T.
+ * Handlers can be linked into chains via the next pointer.
+ */
template <class T> struct Handler {
- typedef T Type;
- typedef shared_ptr<Handler> Ptr;
+ typedef T ParamType;
+ typedef shared_ptr<Handler> Chain;
+
+ /** Handler chains for incoming and outgoing traffic. */
+ struct Chains {
+ Chain in;
+ Chain out;
+ };
virtual ~Handler() {}
virtual void handle(T) = 0;
- virtual void link(Ptr next_) { next=next_; }
- protected:
- void nextHandler(T data) { if (next) next->handle(data); }
- private:
- Ptr next;
-};
-
-
-/** Factory interface that takes a context of type C */
-template <class T, class C> struct HandlerFactory {
- virtual ~HandlerFactory() {}
- typedef typename Handler<T>::Ptr Ptr;
-
- /** Create a handler */
- virtual Ptr create(C context) = 0;
-
- /** Create a handler and link it to next */
- Ptr create(C context, Ptr next) {
- Ptr h=create(context);
- h->link(next);
- }
-};
-
-/** Factory implementation template */
-template <class FH, class C>
-struct HandlerFactoryImpl : public HandlerFactory<typename FH::Type, C> {
- shared_ptr<Handler<typename FH::Type> > create(C context) {
- return typename FH::Ptr(new FH(context));
- }
+ Chain next;
};
-/** A factory chain is a vector of handler factories used to create
- * handler chains. The chain does not own the factories.
- */
-template <class T, class C>
-struct HandlerFactoryChain : public std::vector<HandlerFactory<T,C>* > {
- typedef typename Handler<T>::Ptr Ptr;
-
- /** Create a handler chain, return the first handler.
- *@param context - passed to each factory.
- */
- Ptr create(C context) {
- return this->create(context, this->begin());
- }
-
- private:
- typedef typename std::vector<HandlerFactory<T,C>*>::iterator iterator;
- Ptr create(C context, iterator i) {
- if (i != this->end()) {
- Ptr h=(*i)->create(context);
- h->link(create(context, i+1));
- return h;
- }
- return Ptr();
- }
-};
}}
diff --git a/cpp/src/qpid/framing/InputHandler.h b/cpp/src/qpid/framing/InputHandler.h
index dc6814b849..48a96803da 100644
--- a/cpp/src/qpid/framing/InputHandler.h
+++ b/cpp/src/qpid/framing/InputHandler.h
@@ -21,7 +21,7 @@
*
*/
-#include "AMQFrame.h"
+#include "FrameHandler.h"
#include <boost/noncopyable.hpp>
namespace qpid {
@@ -30,7 +30,21 @@ namespace framing {
class InputHandler : private boost::noncopyable {
public:
virtual ~InputHandler() {}
- virtual void received(AMQFrame* frame) = 0;
+ virtual void received(AMQFrame&) = 0;
+};
+
+/** FrameHandler that delegates to an InputHandler */
+struct InputHandlerFrameHandler : public FrameHandler {
+ InputHandlerFrameHandler(InputHandler& in_) : in(in_) {}
+ void handle(ParamType frame) { in.received(frame); }
+ InputHandler& in;
+};
+
+/** InputHandler that delegates to a FrameHandler */
+struct FrameHandlerInputHandler : public InputHandler {
+ FrameHandlerInputHandler(shared_ptr<FrameHandler> h) : handler(h) {}
+ void received(AMQFrame& frame) { handler->handle(frame); }
+ FrameHandler::Chain handler;
};
}}
diff --git a/cpp/src/qpid/framing/OutputHandler.h b/cpp/src/qpid/framing/OutputHandler.h
index 9ffd4227d8..89917ac3df 100644
--- a/cpp/src/qpid/framing/OutputHandler.h
+++ b/cpp/src/qpid/framing/OutputHandler.h
@@ -22,17 +22,32 @@
*
*/
#include <boost/noncopyable.hpp>
+#include "FrameHandler.h"
namespace qpid {
namespace framing {
-class AMQFrame;
class OutputHandler : private boost::noncopyable {
public:
virtual ~OutputHandler() {}
- virtual void send(AMQFrame* frame) = 0;
+ virtual void send(AMQFrame&) = 0;
};
+/** OutputHandler that delegates to a FrameHandler */
+struct FrameHandlerOutputHandler : public OutputHandler {
+ FrameHandlerOutputHandler(shared_ptr<FrameHandler> h) : handler(h) {}
+ void received(AMQFrame& frame) { handler->handle(frame); }
+ FrameHandler::Chain handler;
+};
+
+/** FrameHandler that delegates to an OutputHandler */
+struct OutputHandlerFrameHandler : public FrameHandler {
+ OutputHandlerFrameHandler(OutputHandler& out_) : out(out_) {}
+ void handle(ParamType frame) { out.send(frame); }
+ OutputHandler& out;
+};
+
+
}}