diff options
| author | Alan Conway <aconway@apache.org> | 2007-02-13 02:41:14 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-02-13 02:41:14 +0000 |
| commit | 9517deedff9691dbe3429b0b917dfd4208b0b1b8 (patch) | |
| tree | f8868a2fbc63e92c770b401eeff2aee3a522697a /cpp/lib/broker | |
| parent | d26ea3376f66f69486fe214c8a7a8b96a7605c99 (diff) | |
| download | qpid-python-9517deedff9691dbe3429b0b917dfd4208b0b1b8.tar.gz | |
* gentools/templ.cpp/*Proxy*, CppGenerator.java: Changes to Proxy
classes to make them directly usable as an API for low-level AMQP access.
- Proxies hold reference to a ChannelAdapter not just an output handler.
- Removed MethodContext parameter, makes no sense on requester end.
- Return RequestId from request methods so caller can correlate
incoming responses.
- Add RequestId parameter to response methods so caller can provide
correlation for outgoing responses.
- No longer inherit from *Operations classes as the signatures no
longer match. Proxy is for caller (client/requester) and Operations
is for callee (server/responder)
* cpp/lib/client/ClientChannel.h: Channel provides a raw proxy to the broker.
Normal users will still use the Channel API to deal with the broker, but
advanced users (incl ourselves!) can use the raw API to directly send
and receive any AMQP message.
* cpp/lib/broker/BrokerChannel,BrokerAdapter: Refactor for new proxies.
broker::Channel is also a ClientProxy
* Sundry files:
- Pass ProtcolVersion by value, it is only two bytes.
- Misc. const correctness fixes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@506823 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker')
| -rw-r--r-- | cpp/lib/broker/Broker.h | 1 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerAdapter.cpp | 167 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerAdapter.h | 96 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 95 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.h | 26 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessage.cpp | 3 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessage.h | 13 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerMessageBase.h | 44 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerQueue.cpp | 4 | ||||
| -rw-r--r-- | cpp/lib/broker/BrokerQueue.h | 10 | ||||
| -rw-r--r-- | cpp/lib/broker/Connection.cpp | 43 | ||||
| -rw-r--r-- | cpp/lib/broker/Connection.h | 61 | ||||
| -rw-r--r-- | cpp/lib/broker/HandlerImpl.h | 71 | ||||
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.cpp | 33 | ||||
| -rw-r--r-- | cpp/lib/broker/MessageHandlerImpl.h | 13 |
15 files changed, 400 insertions, 280 deletions
diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index 27d2fec006..e2ca88d4d0 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -30,7 +30,6 @@ #include <MessageStore.h> #include <AutoDelete.h> #include <ExchangeRegistry.h> -#include <BrokerChannel.h> #include <ConnectionToken.h> #include <DirectExchange.h> #include <OutputHandler.h> diff --git a/cpp/lib/broker/BrokerAdapter.cpp b/cpp/lib/broker/BrokerAdapter.cpp index 8b081874fc..ec80241c66 100644 --- a/cpp/lib/broker/BrokerAdapter.cpp +++ b/cpp/lib/broker/BrokerAdapter.cpp @@ -18,11 +18,10 @@ #include <boost/format.hpp> #include "BrokerAdapter.h" +#include "BrokerChannel.h" #include "Connection.h" -#include "Exception.h" #include "AMQMethodBody.h" #include "Exception.h" -#include "MessageHandlerImpl.h" namespace qpid { namespace broker { @@ -33,18 +32,37 @@ using namespace qpid::framing; typedef std::vector<Queue::shared_ptr> QueueVector; -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::startOk( - const MethodContext& context , const FieldTable& /*clientProperties*/, + +BrokerAdapter::BrokerAdapter(Channel& ch, Connection& c, Broker& b) : + CoreRefs(ch, c, b), + connection(c), + basicHandler(*this), + channelHandler(*this), + connectionHandler(*this), + exchangeHandler(*this), + messageHandler(*this), + queueHandler(*this), + txHandler(*this) +{} + + +ProtocolVersion BrokerAdapter::getVersion() const { + return connection.getVersion(); +} + +void BrokerAdapter::ConnectionHandlerImpl::startOk( + const MethodContext&, const FieldTable& /*clientProperties*/, const string& /*mechanism*/, - const string& /*response*/, const string& /*locale*/){ - connection.client->getConnection().tune( - context, 100, connection.getFrameMax(), connection.getHeartbeat()); + const string& /*response*/, const string& /*locale*/) +{ + client.tune( + 100, connection.getFrameMax(), connection.getHeartbeat()); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::secureOk( +void BrokerAdapter::ConnectionHandlerImpl::secureOk( const MethodContext&, const string& /*response*/){} -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( +void BrokerAdapter::ConnectionHandlerImpl::tuneOk( const MethodContext&, u_int16_t /*channelmax*/, u_int32_t framemax, u_int16_t heartbeat) { @@ -52,50 +70,55 @@ void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::tuneOk( connection.setHeartbeat(heartbeat); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::open(const MethodContext& context, const string& /*virtualHost*/, const string& /*capabilities*/, bool /*insist*/){ +void BrokerAdapter::ConnectionHandlerImpl::open( + const MethodContext& context, const string& /*virtualHost*/, + const string& /*capabilities*/, bool /*insist*/) +{ string knownhosts; - connection.client->getConnection().openOk(context, knownhosts); + client.openOk( + knownhosts, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::close( +void BrokerAdapter::ConnectionHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getConnection().closeOk(context); + client.closeOk(context.getRequestId()); connection.getOutput().close(); } -void BrokerAdapter::BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ +void BrokerAdapter::ConnectionHandlerImpl::closeOk(const MethodContext&){ connection.getOutput().close(); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::open( +void BrokerAdapter::ChannelHandlerImpl::open( const MethodContext& context, const string& /*outOfBand*/){ channel.open(); // FIXME aconway 2007-01-04: provide valid ID as per ampq 0-9 - connection.client->getChannel().openOk(context, std::string()/* ID */); + client.openOk( + std::string()/* ID */, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flow(const MethodContext&, bool /*active*/){} +void BrokerAdapter::ChannelHandlerImpl::flowOk(const MethodContext&, bool /*active*/){} -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::close( +void BrokerAdapter::ChannelHandlerImpl::close( const MethodContext& context, u_int16_t /*replyCode*/, const string& /*replyText*/, u_int16_t /*classId*/, u_int16_t /*methodId*/) { - connection.client->getChannel().closeOk(context); + client.closeOk(context.getRequestId()); // FIXME aconway 2007-01-18: Following line will "delete this". Ugly. connection.closeChannel(channel.getId()); } -void BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} +void BrokerAdapter::ChannelHandlerImpl::closeOk(const MethodContext&){} -void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, - bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, - const FieldTable& /*arguments*/){ +void BrokerAdapter::ExchangeHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& exchange, const string& type, + bool passive, bool /*durable*/, bool /*autoDelete*/, bool /*internal*/, bool nowait, + const FieldTable& /*arguments*/){ if(passive){ if(!broker.getExchanges().get(exchange)) { @@ -116,27 +139,30 @@ void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::declare(const MethodCont } } if(!nowait){ - connection.client->getExchange().declareOk(context); + client.declareOk(context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, - const string& exchange, bool /*ifUnused*/, bool nowait){ +void BrokerAdapter::ExchangeHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, + const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused broker.getExchanges().destroy(exchange); - if(!nowait) connection.client->getExchange().deleteOk(context); + if(!nowait) client.deleteOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, - bool passive, bool durable, bool exclusive, - bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ +void BrokerAdapter::QueueHandlerImpl::declare(const MethodContext& context, u_int16_t /*ticket*/, const string& name, + bool passive, bool durable, bool exclusive, + bool autoDelete, bool nowait, const qpid::framing::FieldTable& arguments){ Queue::shared_ptr queue; if (passive && !name.empty()) { queue = connection.getQueue(name, channel.getId()); } else { std::pair<Queue::shared_ptr, bool> queue_created = - broker.getQueues().declare(name, durable, autoDelete ? connection.settings.timeout : 0, exclusive ? &connection : 0); + broker.getQueues().declare( + name, durable, + autoDelete ? connection.getTimeout() : 0, + exclusive ? &connection : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -161,20 +187,22 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::declare(const MethodContext % queue->getName()); if (!nowait) { string queueName = queue->getName(); - connection.client->getQueue().declareOk(context, queueName, queue->getMessageCount(), queue->getConsumerCount()); + client.declareOk( + queueName, queue->getMessageCount(), queue->getConsumerCount(), + context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, - const string& exchangeName, const string& routingKey, bool nowait, - const FieldTable& arguments){ +void BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, + const string& exchangeName, const string& routingKey, bool nowait, + const FieldTable& arguments){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); Exchange::shared_ptr exchange = broker.getExchanges().get(exchangeName); if(exchange){ string exchangeRoutingKey = routingKey.empty() && queueName.empty() ? queue->getName() : routingKey; exchange->bind(queue, exchangeRoutingKey, &arguments); - if(!nowait) connection.client->getQueue().bindOk(context); + if(!nowait) client.bindOk(context.getRequestId()); }else{ throw ChannelException( 404, "Bind failed. No such exchange: " + exchangeName); @@ -182,7 +210,7 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::bind(const MethodContext& c } void -BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( +BrokerAdapter::QueueHandlerImpl::unbind( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, @@ -198,18 +226,18 @@ BrokerAdapter::BrokerAdapter::QueueHandlerImpl::unbind( exchange->unbind(queue, routingKey, &arguments); - connection.client->getQueue().unbindOk(context); + client.unbindOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::purge(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool nowait){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); int count = queue->purge(); - if(!nowait) connection.client->getQueue().purgeOk(context, count); + if(!nowait) client.purgeOk( count, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, - bool ifUnused, bool ifEmpty, bool nowait){ +void BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext& context, u_int16_t /*ticket*/, const string& queue, + bool ifUnused, bool ifEmpty, bool nowait){ ChannelException error(0, ""); int count(0); Queue::shared_ptr q = connection.getQueue(queue, channel.getId()); @@ -228,20 +256,21 @@ void BrokerAdapter::BrokerAdapter::QueueHandlerImpl::delete_(const MethodContext broker.getQueues().destroy(queue); } - if(!nowait) connection.client->getQueue().deleteOk(context, count); + if(!nowait) + client.deleteOk(count, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ +void BrokerAdapter::BasicHandlerImpl::qos(const MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool /*global*/){ //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - connection.client->getBasic().qosOk(context); + client.qosOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( +void BrokerAdapter::BasicHandlerImpl::consume( const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, const string& consumerTag, bool noLocal, bool noAck, bool exclusive, @@ -257,19 +286,19 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::consume( channel.consume( newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); - if(!nowait) connection.client->getBasic().consumeOk(context, newTag); + if(!nowait) client.consumeOk(newTag, context.getRequestId()); //allow messages to be dispatched if required as there is now a consumer: queue->dispatch(); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ +void BrokerAdapter::BasicHandlerImpl::cancel(const MethodContext& context, const string& consumerTag, bool nowait){ channel.cancel(consumerTag); - if(!nowait) connection.client->getBasic().cancelOk(context, consumerTag); + if(!nowait) client.cancelOk(consumerTag, context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( +void BrokerAdapter::BasicHandlerImpl::publish( const MethodContext& context, u_int16_t /*ticket*/, const string& exchangeName, const string& routingKey, bool mandatory, bool immediate) @@ -287,16 +316,16 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::publish( } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ +void BrokerAdapter::BasicHandlerImpl::get(const MethodContext& context, u_int16_t /*ticket*/, const string& queueName, bool noAck){ Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!connection.getChannel(channel.getId()).get(queue, "", !noAck)){ string clusterId;//not used, part of an imatix hack - connection.client->getBasic().getEmpty(context, clusterId); + client.getEmpty(clusterId, context.getRequestId()); } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ +void BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u_int64_t deliveryTag, bool multiple){ try{ channel.ack(deliveryTag, multiple); }catch(InvalidAckException& e){ @@ -304,31 +333,31 @@ void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::ack(const MethodContext&, u } } -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} +void BrokerAdapter::BasicHandlerImpl::reject(const MethodContext&, u_int64_t /*deliveryTag*/, bool /*requeue*/){} -void BrokerAdapter::BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ +void BrokerAdapter::BasicHandlerImpl::recover(const MethodContext&, bool requeue){ channel.recover(requeue); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){ channel.begin(); - connection.client->getTx().selectOk(context); + client.selectOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::commit(const MethodContext& context){ channel.commit(); - connection.client->getTx().commitOk(context); + client.commitOk(context.getRequestId()); } -void BrokerAdapter::BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ +void BrokerAdapter::TxHandlerImpl::rollback(const MethodContext& context){ channel.rollback(); - connection.client->getTx().rollbackOk(context); + client.rollbackOk(context.getRequestId()); channel.recover(false); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) +BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) { //no specific action required, generic response handling should be sufficient } @@ -338,21 +367,21 @@ BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ok( const MethodContext& ) // Message class method handlers // void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::ping( const MethodContext& context) { - connection.client->getChannel().ok(context); - connection.client->getChannel().pong(context); + client.ok(context.getRequestId()); + client.pong(); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) +BrokerAdapter::ChannelHandlerImpl::pong( const MethodContext& context) { - connection.client->getChannel().ok(context); + client.ok(context.getRequestId()); } void -BrokerAdapter::BrokerAdapter::ChannelHandlerImpl::resume( +BrokerAdapter::ChannelHandlerImpl::resume( const MethodContext&, const string& /*channel*/ ) { diff --git a/cpp/lib/broker/BrokerAdapter.h b/cpp/lib/broker/BrokerAdapter.h index cd34f17e58..166ec78ddd 100644 --- a/cpp/lib/broker/BrokerAdapter.h +++ b/cpp/lib/broker/BrokerAdapter.h @@ -19,8 +19,9 @@ * */ #include "AMQP_ServerOperations.h" +#include "HandlerImpl.h" #include "MessageHandlerImpl.h" -#include "BrokerChannel.h" +#include "Exception.h" namespace qpid { namespace broker { @@ -28,14 +29,6 @@ namespace broker { class Channel; class Connection; class Broker; - -/** - * Per-channel protocol adapter. - * - * Translates protocol bodies into calls on the core Channel, - * Connection and Broker objects. - */ - class ChannelHandler; class ConnectionHandler; class BasicHandler; @@ -48,20 +41,23 @@ class FileHandler; class StreamHandler; class DtxHandler; class TunnelHandler; +class MessageHandlerImpl; -class BrokerAdapter : public framing::AMQP_ServerOperations +/** + * Per-channel protocol adapter. + * + * A container for a collection of AMQP-class adapters that translate + * AMQP method bodies into calls on the core Channel, Connection and + * Broker objects. Each adapter class also provides a client proxy + * to send methods to the peer. + * + */ +class BrokerAdapter : public CoreRefs, public framing::AMQP_ServerOperations { public: - BrokerAdapter(Channel& ch, Connection& c, Broker& b) : - basicHandler(ch, c, b), - channelHandler(ch, c, b), - connectionHandler(ch, c, b), - exchangeHandler(ch, c, b), - messageHandler(ch, c, b), - queueHandler(ch, c, b), - txHandler(ch, c, b) - {} - + BrokerAdapter(Channel& ch, Connection& c, Broker& b); + + framing::ProtocolVersion getVersion() const; ChannelHandler* getChannelHandler() { return &channelHandler; } ConnectionHandler* getConnectionHandler() { return &connectionHandler; } BasicHandler* getBasicHandler() { return &basicHandler; } @@ -80,19 +76,16 @@ class BrokerAdapter : public framing::AMQP_ServerOperations TunnelHandler* getTunnelHandler() { throw ConnectionException(540, "Tunnel class not implemented"); } + framing::AMQP_ClientProxy& getProxy() { return proxy; } + private: - struct CoreRefs { - CoreRefs(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b) {} - Channel& channel; - Connection& connection; - Broker& broker; - }; - - class ConnectionHandlerImpl : private CoreRefs, public ConnectionHandler { + class ConnectionHandlerImpl : + public ConnectionHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Connection> + { public: - ConnectionHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + ConnectionHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} void startOk(const framing::MethodContext& context, const qpid::framing::FieldTable& clientProperties, @@ -112,9 +105,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations void closeOk(const framing::MethodContext& context); }; - class ChannelHandlerImpl : private CoreRefs, public ChannelHandler{ + class ChannelHandlerImpl : + public ChannelHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Channel> + { public: - ChannelHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + ChannelHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void open(const framing::MethodContext& context, const std::string& outOfBand); void flow(const framing::MethodContext& context, bool active); void flowOk(const framing::MethodContext& context, bool active); @@ -127,9 +124,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations void closeOk(const framing::MethodContext& context); }; - class ExchangeHandlerImpl : private CoreRefs, public ExchangeHandler{ + class ExchangeHandlerImpl : + public ExchangeHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Exchange> + { public: - ExchangeHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + ExchangeHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& exchange, const std::string& type, bool passive, bool durable, bool autoDelete, @@ -139,9 +140,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations const std::string& exchange, bool ifUnused, bool nowait); }; - class QueueHandlerImpl : private CoreRefs, public QueueHandler{ + class QueueHandlerImpl : + public QueueHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Queue> + { public: - QueueHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + QueueHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void declare(const framing::MethodContext& context, u_int16_t ticket, const std::string& queue, bool passive, bool durable, bool exclusive, bool autoDelete, bool nowait, @@ -162,9 +167,13 @@ class BrokerAdapter : public framing::AMQP_ServerOperations bool nowait); }; - class BasicHandlerImpl : private CoreRefs, public BasicHandler{ + class BasicHandlerImpl : + public BasicHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Basic> + { public: - BasicHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + BasicHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void qos(const framing::MethodContext& context, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global); void consume( @@ -184,14 +193,19 @@ class BrokerAdapter : public framing::AMQP_ServerOperations void recover(const framing::MethodContext& context, bool requeue); }; - class TxHandlerImpl : private CoreRefs, public TxHandler{ + class TxHandlerImpl : + public TxHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Tx> + { public: - TxHandlerImpl(Channel& ch, Connection& c, Broker& b) : CoreRefs(ch, c, b) {} + TxHandlerImpl(BrokerAdapter& parent) : HandlerImplType(parent) {} + void select(const framing::MethodContext& context); void commit(const framing::MethodContext& context); void rollback(const framing::MethodContext& context); }; + Connection& connection; BasicHandlerImpl basicHandler; ChannelHandlerImpl channelHandler; ConnectionHandlerImpl connectionHandler; @@ -199,7 +213,7 @@ class BrokerAdapter : public framing::AMQP_ServerOperations MessageHandlerImpl messageHandler; QueueHandlerImpl queueHandler; TxHandlerImpl txHandler; - + }; }} // namespace qpid::broker diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 07636216a6..74e5504f17 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -25,6 +25,8 @@ #include <algorithm> #include <functional> +#include <boost/bind.hpp> + #include "BrokerChannel.h" #include "DeletingTxOp.h" #include "framing/ChannelAdapter.h" @@ -50,7 +52,7 @@ Channel::Channel( u_int32_t _framesize, MessageStore* const _store, u_int64_t _stagingThreshold ) : - ChannelAdapter(id, &con.getOutput(), con.client->getProtocolVersion()), + ChannelAdapter(id, &con.getOutput(), con.getVersion()), connection(con), currentDeliveryTag(1), transactional(false), @@ -74,46 +76,32 @@ bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, +// TODO aconway 2007-02-12: Why is connection token passed in instead +// of using the channel's parent connection? +void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { - if(tag.empty()) tag = tagGenerator.generate(); - ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); - try{ - queue->consume(c, exclusive);//may throw exception - consumers[tag] = c; - } catch(...) { - // FIXME aconway 2007-02-06: auto_ptr for exception safe mem. mgmt. - delete c; - throw; - } -} - -void Channel::cancel(consumer_iterator i){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } + if(tagInOut.empty()) + tagInOut = tagGenerator.generate(); + std::auto_ptr<ConsumerImpl> c( + new ConsumerImpl(this, tagInOut, queue, connection, acks)); + queue->consume(c.get(), exclusive);//may throw exception + consumers.insert(tagInOut, c.release()); } void Channel::cancel(const string& tag){ - consumer_iterator i = consumers.find(tag); - if(i != consumers.end()){ - cancel(i); - } + // consumers is a ptr_map so erase will delete the consumer + // which will call cancel. + ConsumerImplMap::iterator i = consumers.find(tag); + if (i != consumers.end()) + consumers.erase(i); } void Channel::close(){ - if (isOpen()) { - opened = false; - while (!consumers.empty()) - cancel(consumers.begin()); - //requeue: - recover(true); - } + opened = false; + consumers.clear(); + recover(true); } void Channel::begin(){ @@ -160,14 +148,10 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ } Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack) : parent(_parent), - tag(_tag), - queue(_queue), - connection(_connection), - ackExpected(ack), - blocked(false){ -} + Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack +) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local @@ -182,12 +166,18 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ return false; } +Channel::ConsumerImpl::~ConsumerImpl() { + cancel(); +} + void Channel::ConsumerImpl::cancel(){ - if(queue) queue->cancel(this); + if(queue) + queue->cancel(this); } void Channel::ConsumerImpl::requestDispatch(){ - if(blocked) queue->dispatch(); + if(blocked) + queue->dispatch(); } void Channel::handleInlineTransfer(Message::shared_ptr msg) @@ -196,11 +186,15 @@ void Channel::handleInlineTransfer(Message::shared_ptr msg) connection.broker.getExchanges().get(msg->getExchange()); if(transactional){ TxPublish* deliverable = new TxPublish(msg); - exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + *deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); txBuffer.enlist(new DeletingTxOp(deliverable)); }else{ DeliverableMessage deliverable(msg); - exchange->route(deliverable, msg->getRoutingKey(), &(msg->getApplicationHeaders())); + exchange->route( + deliverable, msg->getRoutingKey(), + &(msg->getApplicationHeaders())); } } @@ -244,7 +238,8 @@ void Channel::ack(){ ack(getRequestInProgress(), false); } -void Channel::ack(u_int64_t deliveryTag, bool multiple){ +void Channel::ack(u_int64_t deliveryTag, bool multiple) +{ if(transactional){ accumulatedAck.update(deliveryTag, multiple); //TODO: I think the outstanding prefetch size & count should be updated at this point... @@ -271,9 +266,8 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ //if the prefetch limit had previously been reached, there may //be messages that can be now be delivered - for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ - j->second->requestDispatch(); - } + std::for_each(consumers.begin(), consumers.end(), + boost::bind(&ConsumerImpl::requestDispatch, _1)); } } @@ -328,8 +322,8 @@ void Channel::handleMethodInContext( method->invoke(*adapter, context); } }catch(ChannelException& e){ - connection.client->getChannel().close( - context, e.code, e.toString(), + adapter->getProxy().getChannel().close( + e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); connection.closeChannel(getId()); }catch(ConnectionException& e){ @@ -338,4 +332,3 @@ void Channel::handleMethodInContext( connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } - diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h index 58c4f0a45b..538e86b0a8 100644 --- a/cpp/lib/broker/BrokerChannel.h +++ b/cpp/lib/broker/BrokerChannel.h @@ -23,10 +23,10 @@ */ #include <list> -#include <map> #include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> +#include <boost/ptr_container/ptr_map.hpp> #include <AccumulatedAck.h> #include <Consumer.h> @@ -56,7 +56,7 @@ using framing::string; class Channel : public framing::ChannelAdapter, public CompletionHandler { - class ConsumerImpl : public virtual Consumer + class ConsumerImpl : public Consumer { Channel* parent; const string tag; @@ -64,23 +64,25 @@ class Channel : public framing::ChannelAdapter, ConnectionToken* const connection; const bool ackExpected; bool blocked; + public: ConsumerImpl(Channel* parent, const string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); + ~ConsumerImpl(); virtual bool deliver(Message::shared_ptr& msg); void cancel(); void requestDispatch(); }; - typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; + typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; Connection& connection; u_int16_t id; u_int64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; bool transactional; - std::map<string, ConsumerImpl*> consumers; + ConsumerImplMap consumers; u_int32_t prefetchSize; u_int16_t prefetchCount; Prefetch outstanding; @@ -93,18 +95,17 @@ class Channel : public framing::ChannelAdapter, MessageStore* const store; MessageBuilder messageBuilder;//builder for in-progress message bool opened; - boost::scoped_ptr<BrokerAdapter> adapter; // completion handler for MessageBuilder void complete(Message::shared_ptr msg); - void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); - void cancel(consumer_iterator consumer); + void deliver(Message::shared_ptr& msg, const string& tag, + Queue::shared_ptr& queue, bool ackExpected); bool checkPrefetch(Message::shared_ptr& msg); public: - Channel(Connection& channel, + Channel(Connection& parent, framing::ChannelId id, u_int32_t framesize, MessageStore* const _store = 0, @@ -112,8 +113,8 @@ class Channel : public framing::ChannelAdapter, ~Channel(); - // For ChannelAdapter bool isOpen() const { return opened; } + BrokerAdapter& getAdatper() { return *adapter; } void open() { opened = true; } void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; } @@ -122,7 +123,11 @@ class Channel : public framing::ChannelAdapter, u_int16_t setPrefetchCount(u_int16_t n){ return prefetchCount = n; } bool exists(const string& consumerTag); - void consume(string& tag, Queue::shared_ptr queue, bool acks, + + /** + *@param tagInOut - if empty it is updated with the generated token. + */ + void consume(string& tagInOut, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0, const framing::FieldTable* = 0); void cancel(const string& tag); @@ -146,7 +151,6 @@ class Channel : public framing::ChannelAdapter, void handleMethodInContext( boost::shared_ptr<framing::AMQMethodBody> method, const framing::MethodContext& context); - }; struct InvalidAckException{}; diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index e8a993942a..bff4492a49 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -28,6 +28,9 @@ #include <MessageStore.h> #include <BasicDeliverBody.h> #include <BasicGetOkBody.h> +#include <AMQContentBody.h> +#include <AMQHeaderBody.h> +#include "AMQMethodBody.h" #include "AMQFrame.h" #include "framing/ChannelAdapter.h" diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index 9e77eab446..871514e55f 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -22,12 +22,10 @@ * */ -#include <BrokerMessageBase.h> #include <memory> #include <boost/shared_ptr.hpp> -#include <AMQContentBody.h> -#include <AMQHeaderBody.h> -#include "AMQMethodBody.h" + +#include <BrokerMessageBase.h> #include <BasicHeaderProperties.h> #include <ConnectionToken.h> #include <Content.h> @@ -39,6 +37,7 @@ namespace qpid { namespace framing { class MethodContext; class ChannelAdapter; +class AMQHeaderBody; } namespace broker { @@ -52,7 +51,7 @@ using framing::string; * request. */ class BasicMessage : public Message { - framing::AMQHeaderBody::shared_ptr header; + boost::shared_ptr<framing::AMQHeaderBody> header; std::auto_ptr<Content> content; sys::Mutex contentLock; u_int64_t size; @@ -65,10 +64,10 @@ class BasicMessage : public Message { BasicMessage(const ConnectionToken* const publisher, const string& exchange, const string& routingKey, bool mandatory, bool immediate, - framing::AMQMethodBody::shared_ptr respondTo); + boost::shared_ptr<framing::AMQMethodBody> respondTo); BasicMessage(); ~BasicMessage(); - void setHeader(framing::AMQHeaderBody::shared_ptr header); + void setHeader(boost::shared_ptr<framing::AMQHeaderBody> header); void addContent(framing::AMQContentBody::shared_ptr data); bool isComplete(); diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index 41a0cd45fa..3bba95a5f8 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -22,14 +22,10 @@ * */ -#include "AMQContentBody.h" -#include "AMQHeaderBody.h" -#include "AMQMethodBody.h" -#include "Content.h" -#include "framing/amqp_types.h" - #include <string> #include <boost/shared_ptr.hpp> +#include "Content.h" +#include "framing/amqp_types.h" namespace qpid { @@ -38,6 +34,9 @@ class MethodContext; class ChannelAdapter; class BasicHeaderProperties; class FieldTable; +class AMQMethodBody; +class AMQContentBody; +class AMQHeaderBody; } @@ -50,24 +49,17 @@ class MessageStore; * abstracting away the operations * TODO; AMS: for the moment this is mostly a placeholder */ -class Message{ - const ConnectionToken* publisher; - std::string exchange; - std::string routingKey; - const bool mandatory; - const bool immediate; - u_int64_t persistenceId; - bool redelivered; - framing::AMQMethodBody::shared_ptr respondTo; - +class Message { public: typedef boost::shared_ptr<Message> shared_ptr; + typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr; + Message(const ConnectionToken* publisher_, const std::string& _exchange, const std::string& _routingKey, bool _mandatory, bool _immediate, - framing::AMQMethodBody::shared_ptr respondTo_) : + AMQMethodBodyPtr respondTo_) : publisher(publisher_), exchange(_exchange), routingKey(_routingKey), @@ -92,9 +84,7 @@ class Message{ const std::string& getExchange() const { return exchange; } u_int64_t getPersistenceId() const { return persistenceId; } bool getRedelivered() const { return redelivered; } - framing::AMQMethodBody::shared_ptr getRespondTo() const { - return respondTo; - } + AMQMethodBodyPtr getRespondTo() const { return respondTo; } void setRouting(const std::string& _exchange, const std::string& _routingKey) { exchange = _exchange; routingKey = _routingKey; } @@ -168,14 +158,24 @@ class Message{ * it uses). */ virtual void setContent(std::auto_ptr<Content>& /*content*/) {}; - virtual void setHeader(framing::AMQHeaderBody::shared_ptr /*header*/) {}; - virtual void addContent(framing::AMQContentBody::shared_ptr /*data*/) {}; + virtual void setHeader(boost::shared_ptr<framing::AMQHeaderBody>) {}; + virtual void addContent(boost::shared_ptr<framing::AMQContentBody>) {}; /** * Releases the in-memory content data held by this * message. Must pass in a store from which the data can * be reloaded. */ virtual void releaseContent(MessageStore* /*store*/) {}; + + private: + const ConnectionToken* publisher; + std::string exchange; + std::string routingKey; + const bool mandatory; + const bool immediate; + u_int64_t persistenceId; + bool redelivered; + AMQMethodBodyPtr respondTo; }; }} diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 99c045c59a..789e652947 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -149,7 +149,9 @@ void Queue::consume(Consumer* c, bool requestExclusive){ void Queue::cancel(Consumer* c){ Mutex::ScopedLock locker(lock); - consumers.erase(find(consumers.begin(), consumers.end(), c)); + Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c); + if (i != consumers.end()) + consumers.erase(i); if(autodelete && consumers.empty()) lastUsed = now()*TIME_MSEC; if(exclusive == c) exclusive = 0; } diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h index 40fa4bd415..015b27fe76 100644 --- a/cpp/lib/broker/BrokerQueue.h +++ b/cpp/lib/broker/BrokerQueue.h @@ -53,13 +53,17 @@ namespace qpid { * or more consumers registers. */ class Queue{ + typedef std::vector<Consumer*> Consumers; + typedef std::queue<Binding*> Bindings; + typedef std::queue<Message::shared_ptr> Messages; + const string name; const u_int32_t autodelete; MessageStore* const store; const ConnectionToken* const owner; - std::vector<Consumer*> consumers; - std::queue<Binding*> bindings; - std::queue<Message::shared_ptr> messages; + Consumers consumers; + Bindings bindings; + Messages messages; bool queueing; bool dispatching; int next; diff --git a/cpp/lib/broker/Connection.cpp b/cpp/lib/broker/Connection.cpp index 000199a65e..3d9e5cdaf8 100644 --- a/cpp/lib/broker/Connection.cpp +++ b/cpp/lib/broker/Connection.cpp @@ -22,6 +22,9 @@ #include <assert.h> #include "Connection.h" +#include "BrokerChannel.h" +#include "AMQP_ClientProxy.h" +#include "BrokerAdapter.h" using namespace boost; using namespace qpid::sys; @@ -33,12 +36,15 @@ namespace broker { Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_) : broker(broker_), - settings(broker.getTimeout(), broker.getStagingThreshold()), out(out_), framemax(65536), - heartbeat(0) + heartbeat(0), + client(0), + timeout(broker.getTimeout()), + stagingThreshold(broker.getStagingThreshold()) {} + Queue::shared_ptr Connection::getQueue(const string& name, u_int16_t channel){ Queue::shared_ptr queue; if (name.empty()) { @@ -59,31 +65,27 @@ Exchange::shared_ptr Connection::findExchange(const string& name){ } -void Connection::received(qpid::framing::AMQFrame* frame){ +void Connection::received(framing::AMQFrame* frame){ getChannel(frame->getChannel()).handleBody(frame->getBody()); } -void Connection::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId){ - client->getConnection().close(MethodContext(&getChannel(0)), code, text, classId, methodId); +void Connection::close( + ReplyCode code, const string& text, ClassId classId, MethodId methodId) +{ + client->close(code, text, classId, methodId); getOutput().close(); } -// TODO aconway 2007-02-02: Should be delegated to the BrokerAdapter -// as it is part of the protocol. -void Connection::initiated(qpid::framing::ProtocolInitiation* header) { - if (client.get()) - // TODO aconway 2007-01-16: correct error code. - throw ConnectionException(0, "Connection initiated twice"); - client.reset(new qpid::framing::AMQP_ClientProxy( - out, header->getMajor(), header->getMinor())); +void Connection::initiated(framing::ProtocolInitiation* header) { + version = ProtocolVersion(header->getMajor(), header->getMinor()); FieldTable properties; string mechanisms("PLAIN"); string locales("en_US"); - client->getConnection().start( - MethodContext(&getChannel(0)), + getChannel(0).init(0, *out, getVersion()); + client = &getChannel(0).getAdatper().getProxy().getConnection(); + client->start( header->getMajor(), header->getMinor(), properties, mechanisms, locales); - getChannel(0).init(0, *out, client->getProtocolVersion()); } void Connection::idleOut(){} @@ -103,9 +105,10 @@ void Connection::closed(){ } } -void Connection::closeChannel(u_int16_t channel) { - getChannel(channel).close(); - channels.erase(channels.find(channel)); +void Connection::closeChannel(u_int16_t id) { + ChannelMap::iterator i = channels.find(id); + if (i != channels.end()) + i->close(); } @@ -115,7 +118,7 @@ Channel& Connection::getChannel(ChannelId id) { i = channels.insert( id, new Channel( *this, id, framemax, broker.getQueues().getStore(), - settings.stagingThreshold)).first; + broker.getStagingThreshold())).first; } return *i; } diff --git a/cpp/lib/broker/Connection.h b/cpp/lib/broker/Connection.h index 4f1156dd01..27faab4967 100644 --- a/cpp/lib/broker/Connection.h +++ b/cpp/lib/broker/Connection.h @@ -27,66 +27,64 @@ #include <boost/ptr_container/ptr_map.hpp> #include <AMQFrame.h> -#include <AMQP_ClientProxy.h> #include <AMQP_ServerOperations.h> +#include <AMQP_ClientProxy.h> #include <sys/ConnectionOutputHandler.h> #include <sys/ConnectionInputHandler.h> #include <sys/TimeoutHandler.h> +#include "framing/ProtocolVersion.h" #include "Broker.h" #include "Exception.h" +#include "BrokerChannel.h" namespace qpid { namespace broker { -class Settings { - public: - const u_int32_t timeout;//timeout for auto-deleted queues (in ms) - const u_int64_t stagingThreshold; - - Settings(u_int32_t _timeout, u_int64_t _stagingThreshold) : timeout(_timeout), stagingThreshold(_stagingThreshold) {} -}; +class Channel; class Connection : public sys::ConnectionInputHandler, public ConnectionToken { public: Connection(sys::ConnectionOutputHandler* out, Broker& broker); - // ConnectionInputHandler methods - void received(framing::AMQFrame* frame); - void initiated(framing::ProtocolInitiation* header); - void idleOut(); - void idleIn(); - void closed(); - sys::ConnectionOutputHandler& getOutput() { return *out; } + /** Get a channel. Create if it does not already exist */ + Channel& getChannel(framing::ChannelId channel); - const framing::ProtocolVersion& getVersion() { - return client->getProtocolVersion(); } + /** Close a channel */ + void closeChannel(framing::ChannelId channel); + + /** Close the connection */ + void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); + + sys::ConnectionOutputHandler& getOutput() const { return *out; } + framing::ProtocolVersion getVersion() const { return version; } u_int32_t getFrameMax() const { return framemax; } u_int16_t getHeartbeat() const { return heartbeat; } + u_int32_t getTimeout() const { return timeout; } + u_int64_t getStagingThreshold() const { return stagingThreshold; } void setFrameMax(u_int32_t fm) { framemax = fm; } void setHeartbeat(u_int16_t hb) { heartbeat = hb; } - - Broker& broker; - std::auto_ptr<framing::AMQP_ClientProxy> client; - Settings settings; - - std::vector<Queue::shared_ptr> exclusiveQueues; - + /** * Get named queue, never returns 0. * @return: named queue or default queue for channel if name="" * @exception: ChannelException if no queue of that name is found. - * @exception: ConnectionException if no queue specified and channel has not declared one. + * @exception: ConnectionException if name="" and channel has no default. */ Queue::shared_ptr getQueue(const string& name, u_int16_t channel); - Channel& newChannel(framing::ChannelId channel); - Channel& getChannel(framing::ChannelId channel); - void closeChannel(framing::ChannelId channel); - void close(framing::ReplyCode code, const string& text, framing::ClassId classId, framing::MethodId methodId); + Broker& broker; + std::vector<Queue::shared_ptr> exclusiveQueues; + + // ConnectionInputHandler methods + void received(framing::AMQFrame* frame); + void initiated(framing::ProtocolInitiation* header); + void idleOut(); + void idleIn(); + void closed(); private: typedef boost::ptr_map<framing::ChannelId, Channel> ChannelMap; @@ -94,10 +92,15 @@ class Connection : public sys::ConnectionInputHandler, typedef std::vector<Queue::shared_ptr>::iterator queue_iterator; Exchange::shared_ptr findExchange(const string& name); + framing::ProtocolVersion version; ChannelMap channels; sys::ConnectionOutputHandler* out; u_int32_t framemax; u_int16_t heartbeat; + framing::AMQP_ClientProxy::Connection* client; + const u_int32_t timeout; //timeout for auto-deleted queues (in ms) + const u_int64_t stagingThreshold; + }; }} diff --git a/cpp/lib/broker/HandlerImpl.h b/cpp/lib/broker/HandlerImpl.h new file mode 100644 index 0000000000..c55a36da45 --- /dev/null +++ b/cpp/lib/broker/HandlerImpl.h @@ -0,0 +1,71 @@ +#ifndef _broker_HandlerImpl_h +#define _broker_HandlerImpl_h + +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "BrokerChannel.h" +#include "AMQP_ClientProxy.h" + +namespace qpid { + +namespace framing { +class AMQP_ClientProxy; +} + +namespace broker { + +class Broker; +class Channel; +class Connection; + +/** + * A collection of references to the core objects required by an adapter, + * and a client proxy. + */ +struct CoreRefs +{ + CoreRefs(Channel& ch, Connection& c, Broker& b) + : channel(ch), connection(c), broker(b), proxy(ch) {} + + Channel& channel; + Connection& connection; + Broker& broker; + framing::AMQP_ClientProxy proxy; +}; + + +/** + * Base template for protocol handler implementations. + * Provides the core references and appropriate AMQP class proxy. + */ +template <class ProxyType> +struct HandlerImpl : public CoreRefs { + typedef HandlerImpl<ProxyType> HandlerImplType; + HandlerImpl(CoreRefs& parent) + : CoreRefs(parent), client(ProxyType::get(proxy)) {} + ProxyType client; +}; + + + +}} // namespace qpid::broker + + + +#endif /*!_broker_HandlerImpl_h*/ diff --git a/cpp/lib/broker/MessageHandlerImpl.cpp b/cpp/lib/broker/MessageHandlerImpl.cpp index 797e3fbbf9..0853aebcb1 100644 --- a/cpp/lib/broker/MessageHandlerImpl.cpp +++ b/cpp/lib/broker/MessageHandlerImpl.cpp @@ -25,16 +25,15 @@ #include "BrokerMessageMessage.h" #include "MessageAppendBody.h" #include "MessageTransferBody.h" +#include "BrokerAdapter.h" namespace qpid { namespace broker { using namespace framing; -MessageHandlerImpl::MessageHandlerImpl(Channel& ch, Connection& c, Broker& b) - : channel(ch), connection(c), broker(b), references(ch), - client(connection.client->getMessage()) -{} +MessageHandlerImpl::MessageHandlerImpl(CoreRefs& parent) + : HandlerImplType(parent), references(channel) {} // // Message class method handlers @@ -47,7 +46,7 @@ MessageHandlerImpl::append(const MethodContext& context, references.get(reference).append( boost::shared_polymorphic_downcast<MessageAppendBody>( context.methodBody)); - client.ok(context); + client.ok(context.getRequestId()); } @@ -56,7 +55,7 @@ MessageHandlerImpl::cancel(const MethodContext& context, const string& destination ) { channel.cancel(destination); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -73,7 +72,7 @@ MessageHandlerImpl::close(const MethodContext& context, const string& reference) { references.get(reference).close(); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -84,7 +83,7 @@ MessageHandlerImpl::consume(const MethodContext& context, bool noLocal, bool noAck, bool exclusive, - const qpid::framing::FieldTable& filter ) + const framing::FieldTable& filter ) { Queue::shared_ptr queue = connection.getQueue(queueName, channel.getId()); if(!destination.empty() && channel.exists(destination)) @@ -93,7 +92,7 @@ MessageHandlerImpl::consume(const MethodContext& context, channel.consume( tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); - client.ok(context); + client.ok(context.getRequestId()); // Dispatch messages as there is now a consumer. queue->dispatch(); } @@ -117,9 +116,9 @@ MessageHandlerImpl::get( const MethodContext& context, connection.getQueue(queueName, context.channel->getId()); if(channel.get(queue, destination, !noAck)) - client.ok(context); + client.ok(context.getRequestId()); else - client.empty(context); + client.empty(context.getRequestId()); } void @@ -141,7 +140,7 @@ MessageHandlerImpl::open(const MethodContext& context, const string& reference) { references.open(reference); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -153,7 +152,7 @@ MessageHandlerImpl::qos(const MethodContext& context, //TODO: handle global channel.setPrefetchSize(prefetchSize); channel.setPrefetchCount(prefetchCount); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -161,7 +160,7 @@ MessageHandlerImpl::recover(const MethodContext& context, bool requeue) { channel.recover(requeue); - client.ok(context); + client.ok(context.getRequestId()); } void @@ -204,8 +203,8 @@ MessageHandlerImpl::transfer(const MethodContext& context, const string& /*appId*/, const string& /*transactionId*/, const string& /*securityToken*/, - const qpid::framing::FieldTable& /*applicationHeaders*/, - qpid::framing::Content body, + const framing::FieldTable& /*applicationHeaders*/, + const framing::Content& body, bool /*mandatory*/) { MessageTransferBody::shared_ptr transfer( @@ -218,7 +217,7 @@ MessageHandlerImpl::transfer(const MethodContext& context, channel.handleInlineTransfer(message); else references.get(body.getValue()).addMessage(message); - client.ok(context); + client.ok(context.getRequestId()); } diff --git a/cpp/lib/broker/MessageHandlerImpl.h b/cpp/lib/broker/MessageHandlerImpl.h index 0fef45bb19..cb7e7e3126 100644 --- a/cpp/lib/broker/MessageHandlerImpl.h +++ b/cpp/lib/broker/MessageHandlerImpl.h @@ -24,7 +24,7 @@ #include "AMQP_ServerOperations.h" #include "AMQP_ClientProxy.h" #include "Reference.h" -#include "BrokerChannel.h" +#include "HandlerImpl.h" namespace qpid { namespace broker { @@ -34,10 +34,11 @@ class Broker; class MessageMessage; class MessageHandlerImpl : - public framing::AMQP_ServerOperations::MessageHandler + public framing::AMQP_ServerOperations::MessageHandler, + public HandlerImpl<framing::AMQP_ClientProxy::Message> { public: - MessageHandlerImpl(Channel& ch, Connection& c, Broker& b); + MessageHandlerImpl(CoreRefs& parent); void append(const framing::MethodContext&, const std::string& reference, @@ -116,14 +117,10 @@ class MessageHandlerImpl : const std::string& transactionId, const std::string& securityToken, const framing::FieldTable& applicationHeaders, - framing::Content body, + const framing::Content& body, bool mandatory ); private: - Channel& channel; - Connection& connection; - Broker& broker; ReferenceRegistry references; - framing::AMQP_ClientProxy::Message& client; }; }} // namespace qpid::broker |
