diff options
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ClientConnection.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connection.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/client/Connector.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.h | 39 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameHandler.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/Handler.h | 70 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/InputHandler.h | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/OutputHandler.h | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/LFSessionContext.cpp | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/LFSessionContext.h | 5 |
15 files changed, 112 insertions, 117 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index c1f0b44ed4..26e590f87e 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -55,7 +55,7 @@ Channel::Channel( uint32_t _framesize, MessageStore* const _store, uint64_t _stagingThreshold ) : - ChannelAdapter(id, &con.getOutput(), con.getVersion()), + ChannelAdapter(), connection(con), currentDeliveryTag(1), prefetchSize(0), @@ -70,6 +70,7 @@ Channel::Channel( flowActive(true), adapter(new BrokerAdapter(*this, con, con.broker)) { + init(id, con.getOutput(), con.getVersion()); outstanding.reset(); } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index d809ef14d9..2bd835e753 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -66,8 +66,8 @@ Exchange::shared_ptr Connection::findExchange(const string& name){ } -void Connection::received(framing::AMQFrame* frame){ - getChannel(frame->getChannel()).handleBody(frame->getBody()); +void Connection::received(framing::AMQFrame& frame){ + getChannel((frame.getChannel())).getHandlers().in->handle(frame); } void Connection::close( diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 3fc575280a..fcfc1d3334 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -80,7 +80,7 @@ class Connection : public sys::ConnectionInputHandler, std::vector<Queue::shared_ptr> exclusiveQueues; // ConnectionInputHandler methods - void received(framing::AMQFrame* frame); + void received(framing::AMQFrame& frame); void initiated(const framing::ProtocolInitiation& header); void idleOut(); void idleIn(); diff --git a/cpp/src/qpid/client/ClientConnection.cpp b/cpp/src/qpid/client/ClientConnection.cpp index d6d03680c1..bddadb0800 100644 --- a/cpp/src/qpid/client/ClientConnection.cpp +++ b/cpp/src/qpid/client/ClientConnection.cpp @@ -133,25 +133,22 @@ void Connection::erase(ChannelId id) { channels.erase(id); } -void Connection::received(AMQFrame* frame){ - // FIXME aconway 2007-01-25: Mutex - ChannelId id = frame->getChannel(); +void Connection::received(AMQFrame& frame){ + ChannelId id = frame.getChannel(); Channel* channel = channels[id]; - // FIXME aconway 2007-01-26: Exception thrown here is hanging the - // client. Need to review use of exceptions. if (channel == 0) THROW_QPID_ERROR( PROTOCOL_ERROR+504, (boost::format("Invalid channel number %g") % id).str()); try{ - channel->handleBody(frame->getBody()); + channel->getHandlers().in->handle(frame); }catch(const qpid::QpidError& e){ channelException( - *channel, dynamic_cast<AMQMethodBody*>(frame->getBody().get()), e); + *channel, dynamic_cast<AMQMethodBody*>(frame.getBody().get()), e); } } -void Connection::send(AMQFrame* frame) { +void Connection::send(AMQFrame& frame) { out->send(frame); } @@ -172,7 +169,8 @@ void Connection::idleIn(){ } void Connection::idleOut(){ - out->send(new AMQFrame(version, 0, new AMQHeartbeatBody())); + AMQFrame frame(version, 0, new AMQHeartbeatBody()); + out->send(frame); } }} // namespace qpid::client diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index c3b9aa33d0..51434fcefd 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -159,8 +159,8 @@ class Connection : public ConnectionForChannel // TODO aconway 2007-01-26: can these be private? - void send(framing::AMQFrame*); - void received(framing::AMQFrame*); + void send(framing::AMQFrame&); + void received(framing::AMQFrame&); void idleOut(); void idleIn(); void shutdown(); diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index 17b68c1f6a..257e2b577a 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -22,6 +22,7 @@ #include "qpid/log/Statement.h" #include "qpid/QpidError.h" #include "qpid/sys/Time.h" +#include "qpid/framing/AMQFrame.h" #include "Connector.h" namespace qpid { @@ -91,11 +92,9 @@ OutputHandler* Connector::getOutputHandler(){ return this; } -void Connector::send(AMQFrame* f){ - std::auto_ptr<AMQFrame> frame(f); - AMQBody::shared_ptr body = frame->getBody(); - writeBlock(frame.get()); - QPID_LOG(trace, "SENT: " << *frame); +void Connector::send(AMQFrame& frame){ + writeBlock(&frame); + QPID_LOG(trace, "SENT: " << frame); } void Connector::writeBlock(AMQDataBlock* data){ @@ -185,7 +184,7 @@ void Connector::run(){ AMQFrame frame(version); while(frame.decode(inbuf)){ QPID_LOG(trace, "RECV: " << frame); - input->received(&frame); + input->received(frame); } //need to compact buffer to preserve any 'extra' data inbuf.compact(); diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h index 9447a05a07..1577564d57 100644 --- a/cpp/src/qpid/client/Connector.h +++ b/cpp/src/qpid/client/Connector.h @@ -91,7 +91,7 @@ class Connector : public framing::OutputHandler, virtual void setTimeoutHandler(sys::TimeoutHandler* handler); virtual void setShutdownHandler(sys::ShutdownHandler* handler); virtual framing::OutputHandler* getOutputHandler(); - virtual void send(framing::AMQFrame* frame); + virtual void send(framing::AMQFrame& frame); virtual void setReadTimeout(uint16_t timeout); virtual void setWriteTimeout(uint16_t timeout); }; diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp index 48a200c4e1..4ee834b561 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -19,6 +19,7 @@ #include "ChannelAdapter.h" #include "AMQFrame.h" +#include "FrameHandler.h" #include "qpid/Exception.h" using boost::format; @@ -26,13 +27,21 @@ using boost::format; namespace qpid { namespace framing { -void ChannelAdapter::init( - ChannelId i, OutputHandler& o, ProtocolVersion v) +/** Framehandler that feeds into the channel. */ +struct ChannelAdapter::ChannelAdapterHandler : public FrameHandler { + ChannelAdapterHandler(ChannelAdapter& channel_) : channel(channel_) {} + void handle(AMQFrame& frame) { channel.handleBody(frame.getBody()); } + ChannelAdapter& channel; +}; + +void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) { assertChannelNotOpen(); id = i; - out = &o; version = v; + + handlers.in = make_shared_ptr(new ChannelAdapterHandler(*this)); + handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out)); } RequestId ChannelAdapter::send( @@ -58,7 +67,8 @@ RequestId ChannelAdapter::send( } // No action required for other body types. } - out->send(new AMQFrame(getVersion(), getId(), body)); + AMQFrame frame(getVersion(), getId(), body); + handlers.out->handle(frame); return requestId; } diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 75e4d26ab3..0597e5e372 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -1,3 +1,6 @@ + + + #ifndef _ChannelAdapter_ #define _ChannelAdapter_ @@ -28,40 +31,39 @@ #include "Responder.h" #include "Correlator.h" #include "amqp_types.h" +#include "FrameHandler.h" namespace qpid { namespace framing { class MethodContext; -// FIXME aconway 2007-02-20: Rename as ChannelBase or just Channel. - /** * Base class for client and broker channels. * - * - receives frame bodies from the network. - * - Updates request/response data. - * - Dispatches requests with a MethodContext for responses. + * Provides in/out handler chains containing channel handlers. + * Chains may be modified by ChannelUpdaters registered with the broker. + * + * The handlers provided by the ChannelAdapter update request/response data. * - * send() - * - Updates request/resposne ID data. - * - Forwards frame to the peer. + * send() constructs a frame, updates request/resposne ID and forwards it + * to the out() chain. * * Thread safety: OBJECT UNSAFE. Instances must not be called * concurrently. AMQP defines channels to be serialized. */ -class ChannelAdapter : public BodyHandler { +class ChannelAdapter : private BodyHandler { public: /** *@param output Processed frames are forwarded to this handler. */ - ChannelAdapter(ChannelId id_=0, OutputHandler* out_=0, - ProtocolVersion ver=ProtocolVersion()) - : id(id_), out(out_), version(ver) {} + ChannelAdapter() : id(0) {} /** Initialize the channel adapter. */ void init(ChannelId, OutputHandler&, ProtocolVersion); + FrameHandler::Chains& getHandlers() { return handlers; } + ChannelId getId() const { return id; } ProtocolVersion getVersion() const { return version; } @@ -79,10 +81,6 @@ class ChannelAdapter : public BodyHandler { /**@deprecated Use make_shared_ptr with the other send() override */ RequestId send(AMQBody* body) { return send(AMQBody::shared_ptr(body)); } - void handleMethod(shared_ptr<AMQMethodBody>); - void handleRequest(shared_ptr<AMQRequestBody>); - void handleResponse(shared_ptr<AMQResponseBody>); - virtual bool isOpen() const = 0; protected: @@ -99,12 +97,19 @@ class ChannelAdapter : public BodyHandler { RequestId getNextSendRequestId() { return requester.getNextId(); } private: + class ChannelAdapterHandler; + friend class ChannelAdapterHandler; + + void handleMethod(shared_ptr<AMQMethodBody>); + void handleRequest(shared_ptr<AMQRequestBody>); + void handleResponse(shared_ptr<AMQResponseBody>); + ChannelId id; - OutputHandler* out; ProtocolVersion version; Requester requester; Responder responder; Correlator correlator; + FrameHandler::Chains handlers; }; }} diff --git a/cpp/src/qpid/framing/FrameHandler.h b/cpp/src/qpid/framing/FrameHandler.h index c552fbb9df..457968c35e 100644 --- a/cpp/src/qpid/framing/FrameHandler.h +++ b/cpp/src/qpid/framing/FrameHandler.h @@ -20,14 +20,14 @@ * under the License. * */ - #include "Handler.h" namespace qpid { namespace framing { + class AMQFrame; typedef Handler<AMQFrame&> FrameHandler; -}} +}} #endif /*!QPID_FRAMING_FRAMEHANDLER_H*/ diff --git a/cpp/src/qpid/framing/Handler.h b/cpp/src/qpid/framing/Handler.h index 05a02a30b1..56e150a66d 100644 --- a/cpp/src/qpid/framing/Handler.h +++ b/cpp/src/qpid/framing/Handler.h @@ -22,75 +22,29 @@ * */ #include "qpid/shared_ptr.h" -#include <vector> #include <assert.h> namespace qpid { namespace framing { -/** Handler for objects of type T. */ +/** Interface for handler for values of type T. + * Handlers can be linked into chains via the next pointer. + */ template <class T> struct Handler { - typedef T Type; - typedef shared_ptr<Handler> Ptr; + typedef T ParamType; + typedef shared_ptr<Handler> Chain; + + /** Handler chains for incoming and outgoing traffic. */ + struct Chains { + Chain in; + Chain out; + }; virtual ~Handler() {} virtual void handle(T) = 0; - virtual void link(Ptr next_) { next=next_; } - protected: - void nextHandler(T data) { if (next) next->handle(data); } - private: - Ptr next; -}; - - -/** Factory interface that takes a context of type C */ -template <class T, class C> struct HandlerFactory { - virtual ~HandlerFactory() {} - typedef typename Handler<T>::Ptr Ptr; - - /** Create a handler */ - virtual Ptr create(C context) = 0; - - /** Create a handler and link it to next */ - Ptr create(C context, Ptr next) { - Ptr h=create(context); - h->link(next); - } -}; - -/** Factory implementation template */ -template <class FH, class C> -struct HandlerFactoryImpl : public HandlerFactory<typename FH::Type, C> { - shared_ptr<Handler<typename FH::Type> > create(C context) { - return typename FH::Ptr(new FH(context)); - } + Chain next; }; -/** A factory chain is a vector of handler factories used to create - * handler chains. The chain does not own the factories. - */ -template <class T, class C> -struct HandlerFactoryChain : public std::vector<HandlerFactory<T,C>* > { - typedef typename Handler<T>::Ptr Ptr; - - /** Create a handler chain, return the first handler. - *@param context - passed to each factory. - */ - Ptr create(C context) { - return this->create(context, this->begin()); - } - - private: - typedef typename std::vector<HandlerFactory<T,C>*>::iterator iterator; - Ptr create(C context, iterator i) { - if (i != this->end()) { - Ptr h=(*i)->create(context); - h->link(create(context, i+1)); - return h; - } - return Ptr(); - } -}; }} diff --git a/cpp/src/qpid/framing/InputHandler.h b/cpp/src/qpid/framing/InputHandler.h index dc6814b849..48a96803da 100644 --- a/cpp/src/qpid/framing/InputHandler.h +++ b/cpp/src/qpid/framing/InputHandler.h @@ -21,7 +21,7 @@ * */ -#include "AMQFrame.h" +#include "FrameHandler.h" #include <boost/noncopyable.hpp> namespace qpid { @@ -30,7 +30,21 @@ namespace framing { class InputHandler : private boost::noncopyable { public: virtual ~InputHandler() {} - virtual void received(AMQFrame* frame) = 0; + virtual void received(AMQFrame&) = 0; +}; + +/** FrameHandler that delegates to an InputHandler */ +struct InputHandlerFrameHandler : public FrameHandler { + InputHandlerFrameHandler(InputHandler& in_) : in(in_) {} + void handle(ParamType frame) { in.received(frame); } + InputHandler& in; +}; + +/** InputHandler that delegates to a FrameHandler */ +struct FrameHandlerInputHandler : public InputHandler { + FrameHandlerInputHandler(shared_ptr<FrameHandler> h) : handler(h) {} + void received(AMQFrame& frame) { handler->handle(frame); } + FrameHandler::Chain handler; }; }} diff --git a/cpp/src/qpid/framing/OutputHandler.h b/cpp/src/qpid/framing/OutputHandler.h index 9ffd4227d8..89917ac3df 100644 --- a/cpp/src/qpid/framing/OutputHandler.h +++ b/cpp/src/qpid/framing/OutputHandler.h @@ -22,17 +22,32 @@ * */ #include <boost/noncopyable.hpp> +#include "FrameHandler.h" namespace qpid { namespace framing { -class AMQFrame; class OutputHandler : private boost::noncopyable { public: virtual ~OutputHandler() {} - virtual void send(AMQFrame* frame) = 0; + virtual void send(AMQFrame&) = 0; }; +/** OutputHandler that delegates to a FrameHandler */ +struct FrameHandlerOutputHandler : public OutputHandler { + FrameHandlerOutputHandler(shared_ptr<FrameHandler> h) : handler(h) {} + void received(AMQFrame& frame) { handler->handle(frame); } + FrameHandler::Chain handler; +}; + +/** FrameHandler that delegates to an OutputHandler */ +struct OutputHandlerFrameHandler : public FrameHandler { + OutputHandlerFrameHandler(OutputHandler& out_) : out(out_) {} + void handle(ParamType frame) { out.send(frame); } + OutputHandler& out; +}; + + }} diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.cpp b/cpp/src/qpid/sys/apr/LFSessionContext.cpp index 0717dcc9ae..4e708fd747 100644 --- a/cpp/src/qpid/sys/apr/LFSessionContext.cpp +++ b/cpp/src/qpid/sys/apr/LFSessionContext.cpp @@ -62,7 +62,7 @@ void LFSessionContext::read(){ try{ while(frame.decode(in)){ QPID_LOG(debug, "RECV: " << frame); - handler->received(&frame); + handler->received(frame); } }catch(const std::exception& e){ QPID_LOG(error, e.what()); @@ -94,14 +94,12 @@ void LFSessionContext::write(){ if(!framesToWrite.empty()){ out.clear(); bool encoded(false); - AMQFrame* frame = framesToWrite.front(); - while(frame && out.available() >= frame->size()){ + while(!framesToWrite.empty() && out.available() >= framesToWrite.front().size()){ + AMQFrame& frame = framesToWrite.front(); encoded = true; - frame->encode(out); - QPID_LOG(debug, "SENT: " << *frame); - delete frame; + frame.encode(out); + QPID_LOG(debug, "SENT: " << frame); framesToWrite.pop(); - frame = framesToWrite.empty() ? 0 : framesToWrite.front(); } if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); out.flip(); @@ -118,7 +116,7 @@ void LFSessionContext::write(){ } } -void LFSessionContext::send(AMQFrame* frame){ +void LFSessionContext::send(AMQFrame& frame){ Mutex::ScopedLock l(writeLock); if(!closing){ framesToWrite.push(frame); diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.h b/cpp/src/qpid/sys/apr/LFSessionContext.h index 5248d8f5bd..0ff80eccec 100644 --- a/cpp/src/qpid/sys/apr/LFSessionContext.h +++ b/cpp/src/qpid/sys/apr/LFSessionContext.h @@ -28,6 +28,7 @@ #include <apr_time.h> #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" #include "qpid/framing/Buffer.h" #include "qpid/sys/Monitor.h" #include "qpid/sys/Mutex.h" @@ -55,7 +56,7 @@ class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler apr_pollfd_t fd; - std::queue<qpid::framing::AMQFrame*> framesToWrite; + std::queue<qpid::framing::AMQFrame> framesToWrite; qpid::sys::Mutex writeLock; bool processing; @@ -66,7 +67,7 @@ class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler LFProcessor* const processor, bool debug = false); virtual ~LFSessionContext(); - virtual void send(qpid::framing::AMQFrame* frame); + virtual void send(framing::AMQFrame& frame); virtual void close(); void read(); void write(); |
