summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp3
-rw-r--r--cpp/src/qpid/broker/Connection.cpp4
-rw-r--r--cpp/src/qpid/broker/Connection.h2
-rw-r--r--cpp/src/qpid/client/ClientConnection.cpp16
-rw-r--r--cpp/src/qpid/client/Connection.h4
-rw-r--r--cpp/src/qpid/client/Connector.cpp11
-rw-r--r--cpp/src/qpid/client/Connector.h2
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp18
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.h39
-rw-r--r--cpp/src/qpid/framing/FrameHandler.h4
-rw-r--r--cpp/src/qpid/framing/Handler.h70
-rw-r--r--cpp/src/qpid/framing/InputHandler.h18
-rw-r--r--cpp/src/qpid/framing/OutputHandler.h19
-rw-r--r--cpp/src/qpid/sys/apr/LFSessionContext.cpp14
-rw-r--r--cpp/src/qpid/sys/apr/LFSessionContext.h5
15 files changed, 112 insertions, 117 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index c1f0b44ed4..26e590f87e 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -55,7 +55,7 @@ Channel::Channel(
uint32_t _framesize, MessageStore* const _store,
uint64_t _stagingThreshold
) :
- ChannelAdapter(id, &con.getOutput(), con.getVersion()),
+ ChannelAdapter(),
connection(con),
currentDeliveryTag(1),
prefetchSize(0),
@@ -70,6 +70,7 @@ Channel::Channel(
flowActive(true),
adapter(new BrokerAdapter(*this, con, con.broker))
{
+ init(id, con.getOutput(), con.getVersion());
outstanding.reset();
}
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index d809ef14d9..2bd835e753 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -66,8 +66,8 @@ Exchange::shared_ptr Connection::findExchange(const string& name){
}
-void Connection::received(framing::AMQFrame* frame){
- getChannel(frame->getChannel()).handleBody(frame->getBody());
+void Connection::received(framing::AMQFrame& frame){
+ getChannel((frame.getChannel())).getHandlers().in->handle(frame);
}
void Connection::close(
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 3fc575280a..fcfc1d3334 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -80,7 +80,7 @@ class Connection : public sys::ConnectionInputHandler,
std::vector<Queue::shared_ptr> exclusiveQueues;
// ConnectionInputHandler methods
- void received(framing::AMQFrame* frame);
+ void received(framing::AMQFrame& frame);
void initiated(const framing::ProtocolInitiation& header);
void idleOut();
void idleIn();
diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp
index d6d03680c1..bddadb0800 100644
--- a/cpp/src/qpid/client/ClientConnection.cpp
+++ b/cpp/src/qpid/client/ClientConnection.cpp
@@ -133,25 +133,22 @@ void Connection::erase(ChannelId id) {
channels.erase(id);
}
-void Connection::received(AMQFrame* frame){
- // FIXME aconway 2007-01-25: Mutex
- ChannelId id = frame->getChannel();
+void Connection::received(AMQFrame& frame){
+ ChannelId id = frame.getChannel();
Channel* channel = channels[id];
- // FIXME aconway 2007-01-26: Exception thrown here is hanging the
- // client. Need to review use of exceptions.
if (channel == 0)
THROW_QPID_ERROR(
PROTOCOL_ERROR+504,
(boost::format("Invalid channel number %g") % id).str());
try{
- channel->handleBody(frame->getBody());
+ channel->getHandlers().in->handle(frame);
}catch(const qpid::QpidError& e){
channelException(
- *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e);
+ *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e);
}
}
-void Connection::send(AMQFrame* frame) {
+void Connection::send(AMQFrame& frame) {
out->send(frame);
}
@@ -172,7 +169,8 @@ void Connection::idleIn(){
}
void Connection::idleOut(){
- out->send(new AMQFrame(version, 0, new AMQHeartbeatBody()));
+ AMQFrame frame(version, 0, new AMQHeartbeatBody());
+ out->send(frame);
}
}} // namespace qpid::client
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index c3b9aa33d0..51434fcefd 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -159,8 +159,8 @@ class Connection : public ConnectionForChannel
// TODO aconway 2007-01-26: can these be private?
- void send(framing::AMQFrame*);
- void received(framing::AMQFrame*);
+ void send(framing::AMQFrame&);
+ void received(framing::AMQFrame&);
void idleOut();
void idleIn();
void shutdown();
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 17b68c1f6a..257e2b577a 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -22,6 +22,7 @@
#include "qpid/log/Statement.h"
#include "qpid/QpidError.h"
#include "qpid/sys/Time.h"
+#include "qpid/framing/AMQFrame.h"
#include "Connector.h"
namespace qpid {
@@ -91,11 +92,9 @@ OutputHandler* Connector::getOutputHandler(){
return this;
}
-void Connector::send(AMQFrame* f){
- std::auto_ptr<AMQFrame> frame(f);
- AMQBody::shared_ptr body = frame->getBody();
- writeBlock(frame.get());
- QPID_LOG(trace, "SENT: " << *frame);
+void Connector::send(AMQFrame& frame){
+ writeBlock(&frame);
+ QPID_LOG(trace, "SENT: " << frame);
}
void Connector::writeBlock(AMQDataBlock* data){
@@ -185,7 +184,7 @@ void Connector::run(){
AMQFrame frame(version);
while(frame.decode(inbuf)){
QPID_LOG(trace, "RECV: " << frame);
- input->received(&frame);
+ input->received(frame);
}
//need to compact buffer to preserve any 'extra' data
inbuf.compact();
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index 9447a05a07..1577564d57 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -91,7 +91,7 @@ class Connector : public framing::OutputHandler,
virtual void setTimeoutHandler(sys::TimeoutHandler* handler);
virtual void setShutdownHandler(sys::ShutdownHandler* handler);
virtual framing::OutputHandler* getOutputHandler();
- virtual void send(framing::AMQFrame* frame);
+ virtual void send(framing::AMQFrame& frame);
virtual void setReadTimeout(uint16_t timeout);
virtual void setWriteTimeout(uint16_t timeout);
};
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index 48a200c4e1..4ee834b561 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -19,6 +19,7 @@
#include "ChannelAdapter.h"
#include "AMQFrame.h"
+#include "FrameHandler.h"
#include "qpid/Exception.h"
using boost::format;
@@ -26,13 +27,21 @@ using boost::format;
namespace qpid {
namespace framing {
-void ChannelAdapter::init(
- ChannelId i, OutputHandler& o, ProtocolVersion v)
+/** Framehandler that feeds into the channel. */
+struct ChannelAdapter::ChannelAdapterHandler : public FrameHandler {
+ ChannelAdapterHandler(ChannelAdapter& channel_) : channel(channel_) {}
+ void handle(AMQFrame& frame) { channel.handleBody(frame.getBody()); }
+ ChannelAdapter& channel;
+};
+
+void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
{
assertChannelNotOpen();
id = i;
- out = &o;
version = v;
+
+ handlers.in = make_shared_ptr(new ChannelAdapterHandler(*this));
+ handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out));
}
RequestId ChannelAdapter::send(
@@ -58,7 +67,8 @@ RequestId ChannelAdapter::send(
}
// No action required for other body types.
}
- out->send(new AMQFrame(getVersion(), getId(), body));
+ AMQFrame frame(getVersion(), getId(), body);
+ handlers.out->handle(frame);
return requestId;
}
diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h
index 75e4d26ab3..0597e5e372 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.h
+++ b/cpp/src/qpid/framing/ChannelAdapter.h
@@ -1,3 +1,6 @@
+
+
+
#ifndef _ChannelAdapter_
#define _ChannelAdapter_
@@ -28,40 +31,39 @@
#include "Responder.h"
#include "Correlator.h"
#include "amqp_types.h"
+#include "FrameHandler.h"
namespace qpid {
namespace framing {
class MethodContext;
-// FIXME aconway 2007-02-20: Rename as ChannelBase or just Channel.
-
/**
* Base class for client and broker channels.
*
- * - receives frame bodies from the network.
- * - Updates request/response data.
- * - Dispatches requests with a MethodContext for responses.
+ * Provides in/out handler chains containing channel handlers.
+ * Chains may be modified by ChannelUpdaters registered with the broker.
+ *
+ * The handlers provided by the ChannelAdapter update request/response data.
*
- * send()
- * - Updates request/resposne ID data.
- * - Forwards frame to the peer.
+ * send() constructs a frame, updates request/resposne ID and forwards it
+ * to the out() chain.
*
* Thread safety: OBJECT UNSAFE. Instances must not be called
* concurrently. AMQP defines channels to be serialized.
*/
-class ChannelAdapter : public BodyHandler {
+class ChannelAdapter : private BodyHandler {
public:
/**
*@param output Processed frames are forwarded to this handler.
*/
- ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0,
- ProtocolVersion ver=ProtocolVersion())
- : id(id_), out(out_), version(ver) {}
+ ChannelAdapter() : id(0) {}
/** Initialize the channel adapter. */
void init(ChannelId, OutputHandler&, ProtocolVersion);
+ FrameHandler::Chains& getHandlers() { return handlers; }
+
ChannelId getId() const { return id; }
ProtocolVersion getVersion() const { return version; }
@@ -79,10 +81,6 @@ class ChannelAdapter : public BodyHandler {
/**@deprecated Use make_shared_ptr with the other send() override */
RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); }
- void handleMethod(shared_ptr<AMQMethodBody>);
- void handleRequest(shared_ptr<AMQRequestBody>);
- void handleResponse(shared_ptr<AMQResponseBody>);
-
virtual bool isOpen() const = 0;
protected:
@@ -99,12 +97,19 @@ class ChannelAdapter : public BodyHandler {
RequestId getNextSendRequestId() { return requester.getNextId(); }
private:
+ class ChannelAdapterHandler;
+ friend class ChannelAdapterHandler;
+
+ void handleMethod(shared_ptr<AMQMethodBody>);
+ void handleRequest(shared_ptr<AMQRequestBody>);
+ void handleResponse(shared_ptr<AMQResponseBody>);
+
ChannelId id;
- OutputHandler* out;
ProtocolVersion version;
Requester requester;
Responder responder;
Correlator correlator;
+ FrameHandler::Chains handlers;
};
}}
diff --git a/cpp/src/qpid/framing/FrameHandler.h b/cpp/src/qpid/framing/FrameHandler.h
index c552fbb9df..457968c35e 100644
--- a/cpp/src/qpid/framing/FrameHandler.h
+++ b/cpp/src/qpid/framing/FrameHandler.h
@@ -20,14 +20,14 @@
* under the License.
*
*/
-
#include "Handler.h"
namespace qpid {
namespace framing {
+
class AMQFrame;
typedef Handler<AMQFrame&> FrameHandler;
-}}
+}}
#endif /*!QPID_FRAMING_FRAMEHANDLER_H*/
diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h
index 05a02a30b1..56e150a66d 100644
--- a/cpp/src/qpid/framing/Handler.h
+++ b/cpp/src/qpid/framing/Handler.h
@@ -22,75 +22,29 @@
*
*/
#include "qpid/shared_ptr.h"
-#include <vector>
#include <assert.h>
namespace qpid {
namespace framing {
-/** Handler for objects of type T. */
+/** Interface for handler for values of type T.
+ * Handlers can be linked into chains via the next pointer.
+ */
template <class T> struct Handler {
- typedef T Type;
- typedef shared_ptr<Handler> Ptr;
+ typedef T ParamType;
+ typedef shared_ptr<Handler> Chain;
+
+ /** Handler chains for incoming and outgoing traffic. */
+ struct Chains {
+ Chain in;
+ Chain out;
+ };
virtual ~Handler() {}
virtual void handle(T) = 0;
- virtual void link(Ptr next_) { next=next_; }
- protected:
- void nextHandler(T data) { if (next) next->handle(data); }
- private:
- Ptr next;
-};
-
-
-/** Factory interface that takes a context of type C */
-template <class T, class C> struct HandlerFactory {
- virtual ~HandlerFactory() {}
- typedef typename Handler<T>::Ptr Ptr;
-
- /** Create a handler */
- virtual Ptr create(C context) = 0;
-
- /** Create a handler and link it to next */
- Ptr create(C context, Ptr next) {
- Ptr h=create(context);
- h->link(next);
- }
-};
-
-/** Factory implementation template */
-template <class FH, class C>
-struct HandlerFactoryImpl : public HandlerFactory<typename FH::Type, C> {
- shared_ptr<Handler<typename FH::Type> > create(C context) {
- return typename FH::Ptr(new FH(context));
- }
+ Chain next;
};
-/** A factory chain is a vector of handler factories used to create
- * handler chains. The chain does not own the factories.
- */
-template <class T, class C>
-struct HandlerFactoryChain : public std::vector<HandlerFactory<T,C>* > {
- typedef typename Handler<T>::Ptr Ptr;
-
- /** Create a handler chain, return the first handler.
- *@param context - passed to each factory.
- */
- Ptr create(C context) {
- return this->create(context, this->begin());
- }
-
- private:
- typedef typename std::vector<HandlerFactory<T,C>*>::iterator iterator;
- Ptr create(C context, iterator i) {
- if (i != this->end()) {
- Ptr h=(*i)->create(context);
- h->link(create(context, i+1));
- return h;
- }
- return Ptr();
- }
-};
}}
diff --git a/cpp/src/qpid/framing/InputHandler.h b/cpp/src/qpid/framing/InputHandler.h
index dc6814b849..48a96803da 100644
--- a/cpp/src/qpid/framing/InputHandler.h
+++ b/cpp/src/qpid/framing/InputHandler.h
@@ -21,7 +21,7 @@
*
*/
-#include "AMQFrame.h"
+#include "FrameHandler.h"
#include <boost/noncopyable.hpp>
namespace qpid {
@@ -30,7 +30,21 @@ namespace framing {
class InputHandler : private boost::noncopyable {
public:
virtual ~InputHandler() {}
- virtual void received(AMQFrame* frame) = 0;
+ virtual void received(AMQFrame&) = 0;
+};
+
+/** FrameHandler that delegates to an InputHandler */
+struct InputHandlerFrameHandler : public FrameHandler {
+ InputHandlerFrameHandler(InputHandler& in_) : in(in_) {}
+ void handle(ParamType frame) { in.received(frame); }
+ InputHandler& in;
+};
+
+/** InputHandler that delegates to a FrameHandler */
+struct FrameHandlerInputHandler : public InputHandler {
+ FrameHandlerInputHandler(shared_ptr<FrameHandler> h) : handler(h) {}
+ void received(AMQFrame& frame) { handler->handle(frame); }
+ FrameHandler::Chain handler;
};
}}
diff --git a/cpp/src/qpid/framing/OutputHandler.h b/cpp/src/qpid/framing/OutputHandler.h
index 9ffd4227d8..89917ac3df 100644
--- a/cpp/src/qpid/framing/OutputHandler.h
+++ b/cpp/src/qpid/framing/OutputHandler.h
@@ -22,17 +22,32 @@
*
*/
#include <boost/noncopyable.hpp>
+#include "FrameHandler.h"
namespace qpid {
namespace framing {
-class AMQFrame;
class OutputHandler : private boost::noncopyable {
public:
virtual ~OutputHandler() {}
- virtual void send(AMQFrame* frame) = 0;
+ virtual void send(AMQFrame&) = 0;
};
+/** OutputHandler that delegates to a FrameHandler */
+struct FrameHandlerOutputHandler : public OutputHandler {
+ FrameHandlerOutputHandler(shared_ptr<FrameHandler> h) : handler(h) {}
+ void received(AMQFrame& frame) { handler->handle(frame); }
+ FrameHandler::Chain handler;
+};
+
+/** FrameHandler that delegates to an OutputHandler */
+struct OutputHandlerFrameHandler : public FrameHandler {
+ OutputHandlerFrameHandler(OutputHandler& out_) : out(out_) {}
+ void handle(ParamType frame) { out.send(frame); }
+ OutputHandler& out;
+};
+
+
}}
diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.cpp b/cpp/src/qpid/sys/apr/LFSessionContext.cpp
index 0717dcc9ae..4e708fd747 100644
--- a/cpp/src/qpid/sys/apr/LFSessionContext.cpp
+++ b/cpp/src/qpid/sys/apr/LFSessionContext.cpp
@@ -62,7 +62,7 @@ void LFSessionContext::read(){
try{
while(frame.decode(in)){
QPID_LOG(debug, "RECV: " << frame);
- handler->received(&frame);
+ handler->received(frame);
}
}catch(const std::exception& e){
QPID_LOG(error, e.what());
@@ -94,14 +94,12 @@ void LFSessionContext::write(){
if(!framesToWrite.empty()){
out.clear();
bool encoded(false);
- AMQFrame* frame = framesToWrite.front();
- while(frame && out.available() >= frame->size()){
+ while(!framesToWrite.empty() && out.available() >= framesToWrite.front().size()){
+ AMQFrame& frame = framesToWrite.front();
encoded = true;
- frame->encode(out);
- QPID_LOG(debug, "SENT: " << *frame);
- delete frame;
+ frame.encode(out);
+ QPID_LOG(debug, "SENT: " << frame);
framesToWrite.pop();
- frame = framesToWrite.empty() ? 0 : framesToWrite.front();
}
if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
out.flip();
@@ -118,7 +116,7 @@ void LFSessionContext::write(){
}
}
-void LFSessionContext::send(AMQFrame* frame){
+void LFSessionContext::send(AMQFrame& frame){
Mutex::ScopedLock l(writeLock);
if(!closing){
framesToWrite.push(frame);
diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.h b/cpp/src/qpid/sys/apr/LFSessionContext.h
index 5248d8f5bd..0ff80eccec 100644
--- a/cpp/src/qpid/sys/apr/LFSessionContext.h
+++ b/cpp/src/qpid/sys/apr/LFSessionContext.h
@@ -28,6 +28,7 @@
#include <apr_time.h>
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/Buffer.h"
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Mutex.h"
@@ -55,7 +56,7 @@ class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler
apr_pollfd_t fd;
- std::queue<qpid::framing::AMQFrame*> framesToWrite;
+ std::queue<qpid::framing::AMQFrame> framesToWrite;
qpid::sys::Mutex writeLock;
bool processing;
@@ -66,7 +67,7 @@ class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler
LFProcessor* const processor,
bool debug = false);
virtual ~LFSessionContext();
- virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void send(framing::AMQFrame& frame);
virtual void close();
void read();
void write();