From 87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 15 Jan 2007 18:28:29 +0000 Subject: * Refactor: Moved major broker components (exchanges, queues etc.) from class SessionHandlerImplFactory to more logical class Broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496425 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/lib/broker/Broker.cpp | 55 +++++++++++++++++++++++--- cpp/lib/broker/Broker.h | 20 +++++++++- cpp/lib/broker/SessionHandlerFactoryImpl.cpp | 48 ++++++----------------- cpp/lib/broker/SessionHandlerFactoryImpl.h | 43 ++++++++------------- cpp/lib/broker/SessionHandlerImpl.cpp | 58 ++++++++++++++-------------- cpp/lib/broker/SessionHandlerImpl.h | 13 +++---- cpp/lib/common/framing/Responder.cpp | 4 +- cpp/lib/common/framing/Responder.h | 2 +- 8 files changed, 134 insertions(+), 109 deletions(-) (limited to 'cpp/lib') diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index 6c0d7a3f3f..6a8b1f8538 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -20,19 +20,61 @@ */ #include #include -#include +#include "AMQFrame.h" +#include "DirectExchange.h" +#include "FanOutExchange.h" +#include "HeadersExchange.h" +#include "MessageStoreModule.h" +#include "NullMessageStore.h" +#include "ProtocolInitiation.h" +#include "SessionHandlerImpl.h" +#include "sys/SessionContext.h" +#include "sys/SessionHandler.h" +#include "sys/SessionHandlerFactory.h" +#include "sys/TimeoutHandler.h" -using namespace qpid::broker; -using namespace qpid::sys; +#include "Broker.h" + +namespace qpid { +namespace broker { + +const std::string empty; +const std::string amq_direct("amq.direct"); +const std::string amq_topic("amq.topic"); +const std::string amq_fanout("amq.fanout"); +const std::string amq_match("amq.match"); Broker::Broker(const Configuration& config) : acceptor(Acceptor::create(config.getPort(), config.getConnectionBacklog(), config.getWorkerThreads(), config.isTrace())), - factory(config.getStore()) -{ } + queues(store.get()), + timeout(30000), + stagingThreshold(0), + cleaner(&queues, timeout/10), + factory(*this) +{ + if (config.getStore().empty()) + store.reset(new NullMessageStore()); + else + store.reset(new MessageStoreModule(config.getStore())); + + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. + exchanges.declare(amq_direct, DirectExchange::typeName); + exchanges.declare(amq_topic, TopicExchange::typeName); + exchanges.declare(amq_fanout, FanOutExchange::typeName); + exchanges.declare(amq_match, HeadersExchange::typeName); + + if(store.get()) { + RecoveryManager recoverer(queues, exchanges); + MessageStoreSettings storeSettings = { getStagingThreshold() }; + store->recover(recoverer, &storeSettings); + } + + cleaner.start(); +} Broker::shared_ptr Broker::create(int16_t port) @@ -57,3 +99,6 @@ void Broker::shutdown() { Broker::~Broker() { } const int16_t Broker::DEFAULT_PORT(5672); + + +}} // namespace qpid::broker diff --git a/cpp/lib/broker/Broker.h b/cpp/lib/broker/Broker.h index 8ea1a57c27..f831b680e9 100644 --- a/cpp/lib/broker/Broker.h +++ b/cpp/lib/broker/Broker.h @@ -27,6 +27,8 @@ #include #include #include +#include +#include namespace qpid { namespace broker { @@ -69,13 +71,27 @@ class Broker : public qpid::sys::Runnable, /** Shut down the broker */ virtual void shutdown(); + MessageStore& getStore() { return *store; } + QueueRegistry& getQueues() { return queues; } + ExchangeRegistry& getExchanges() { return exchanges; } + u_int32_t getTimeout() { return timeout; } + u_int64_t getStagingThreshold() { return stagingThreshold; } + AutoDelete& getCleaner() { return cleaner; } + private: Broker(const Configuration& config); + qpid::sys::Acceptor::shared_ptr acceptor; + std::auto_ptr store; + QueueRegistry queues; + ExchangeRegistry exchanges; + u_int32_t timeout; + u_int64_t stagingThreshold; + AutoDelete cleaner; SessionHandlerFactoryImpl factory; }; -} -} + +}} diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp index 1b5441e3cf..559fd6bca1 100644 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.cpp +++ b/cpp/lib/broker/SessionHandlerFactoryImpl.cpp @@ -19,51 +19,25 @@ * */ #include - -#include -#include -#include -#include -#include #include -using namespace qpid::broker; -using namespace qpid::sys; +namespace qpid { +namespace broker { -namespace -{ -const std::string empty; -const std::string amq_direct("amq.direct"); -const std::string amq_topic("amq.topic"); -const std::string amq_fanout("amq.fanout"); -const std::string amq_match("amq.match"); -} -SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(const std::string& _store, u_int64_t _stagingThreshold, u_int32_t _timeout) : - store(_store.empty() ? (MessageStore*) new NullMessageStore() : (MessageStore*) new MessageStoreModule(_store)), - queues(store.get()), settings(_timeout, _stagingThreshold), cleaner(&queues, _timeout/10) -{ - exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - exchanges.declare(amq_direct, DirectExchange::typeName); - exchanges.declare(amq_topic, TopicExchange::typeName); - exchanges.declare(amq_fanout, FanOutExchange::typeName); - exchanges.declare(amq_match, HeadersExchange::typeName); +SessionHandlerFactoryImpl::SessionHandlerFactoryImpl(Broker& b) : broker(b) +{} - if(store.get()) { - RecoveryManager recoverer(queues, exchanges); - MessageStoreSettings storeSettings = { settings.stagingThreshold }; - store->recover(recoverer, &storeSettings); - } - cleaner.start(); -} - -SessionHandler* SessionHandlerFactoryImpl::create(SessionContext* ctxt) +SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() { - return new SessionHandlerImpl(ctxt, &queues, &exchanges, &cleaner, settings); + broker.getCleaner().stop(); } -SessionHandlerFactoryImpl::~SessionHandlerFactoryImpl() +qpid::sys::SessionHandler* +SessionHandlerFactoryImpl::create(qpid::sys::SessionContext* ctxt) { - cleaner.stop(); + return new SessionHandlerImpl(ctxt, broker); } + +}} // namespace qpid::broker diff --git a/cpp/lib/broker/SessionHandlerFactoryImpl.h b/cpp/lib/broker/SessionHandlerFactoryImpl.h index a69b67b08d..49c42b4d1c 100644 --- a/cpp/lib/broker/SessionHandlerFactoryImpl.h +++ b/cpp/lib/broker/SessionHandlerFactoryImpl.h @@ -21,37 +21,26 @@ #ifndef _SessionHandlerFactoryImpl_ #define _SessionHandlerFactoryImpl_ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "SessionHandlerFactory.h" namespace qpid { - namespace broker { +namespace broker { +class Broker; - class SessionHandlerFactoryImpl : public virtual qpid::sys::SessionHandlerFactory - { - std::auto_ptr store; - QueueRegistry queues; - ExchangeRegistry exchanges; - const Settings settings; - AutoDelete cleaner; - public: - SessionHandlerFactoryImpl(const std::string& store = "", u_int64_t stagingThreshold = 0, u_int32_t timeout = 30000); - virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); - virtual ~SessionHandlerFactoryImpl(); - }; +class SessionHandlerFactoryImpl : public qpid::sys::SessionHandlerFactory +{ + public: + SessionHandlerFactoryImpl(Broker& b); + + virtual qpid::sys::SessionHandler* create(qpid::sys::SessionContext* ctxt); + + virtual ~SessionHandlerFactoryImpl(); - } -} + private: + Broker& broker; +}; + +}} #endif diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp index f34ef59922..905ac83b92 100644 --- a/cpp/lib/broker/SessionHandlerImpl.cpp +++ b/cpp/lib/broker/SessionHandlerImpl.cpp @@ -26,21 +26,22 @@ #include "assert.h" using namespace boost; -using namespace qpid::broker; using namespace qpid::sys; using namespace qpid::framing; using namespace qpid::sys; -SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, - QueueRegistry* _queues, - ExchangeRegistry* _exchanges, - AutoDelete* _cleaner, - const Settings& _settings) : +namespace qpid { +namespace broker { + +SessionHandlerImpl::SessionHandlerImpl( + SessionContext* _context, Broker& broker) : + context(_context), - queues(_queues), - exchanges(_exchanges), - cleaner(_cleaner), - settings(_settings), + client(0), + queues(broker.getQueues()), + exchanges(broker.getExchanges()), + cleaner(broker.getCleaner()), + settings(broker.getTimeout(), broker.getStagingThreshold()), basicHandler(new BasicHandlerImpl(this)), channelHandler(new ChannelHandlerImpl(this)), connectionHandler(new ConnectionHandlerImpl(this)), @@ -49,10 +50,8 @@ SessionHandlerImpl::SessionHandlerImpl(SessionContext* _context, txHandler(new TxHandlerImpl(this)), messageHandler(new MessageHandlerImpl(this)), framemax(65536), - heartbeat(0){ - - client =NULL; -} + heartbeat(0) +{} SessionHandlerImpl::~SessionHandlerImpl(){ @@ -75,7 +74,7 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha queue = getChannel(channel)->getDefaultQueue(); if (!queue) throw ConnectionException( 530, "Queue must be specified or previously declared" ); } else { - queue = queues->find(name); + queue = queues.find(name); if (queue == 0) { throw ChannelException( 404, "Queue not found: " + name); } @@ -85,7 +84,7 @@ Queue::shared_ptr SessionHandlerImpl::getQueue(const string& name, u_int16_t cha Exchange::shared_ptr SessionHandlerImpl::findExchange(const string& name){ - return exchanges->get(name); + return exchanges.get(name); } void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ @@ -96,8 +95,10 @@ void SessionHandlerImpl::received(qpid::framing::AMQFrame* frame){ switch(body->type()) { case REQUEST_BODY: + // responder.received(frame); case RESPONSE_BODY: - case METHOD_BODY: + // requester.received(frame); + case METHOD_BODY: // method = dynamic_pointer_cast(body); try{ method->invoke(*this, channel); @@ -164,7 +165,7 @@ void SessionHandlerImpl::closed(){ } for(queue_iterator i = exclusiveQueues.begin(); i < exclusiveQueues.end(); i = exclusiveQueues.begin()){ string name = (*i)->getName(); - queues->destroy(name); + queues.destroy(name); exclusiveQueues.erase(i); } } catch(std::exception& e) { @@ -221,7 +222,7 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin parent->channels[channel] = new Channel( parent->client->getProtocolVersion() , parent->context, channel, - parent->framemax, parent->queues->getStore(), + parent->framemax, parent->queues.getStore(), parent->settings.stagingThreshold); // FIXME aconway 2007-01-04: provide valid channel Id as per ampq 0-9 @@ -251,12 +252,12 @@ void SessionHandlerImpl::ExchangeHandlerImpl::declare(u_int16_t channel, u_int16 const FieldTable& /*arguments*/){ if(passive){ - if(!parent->exchanges->get(exchange)){ + if(!parent->exchanges.get(exchange)){ throw ChannelException(404, "Exchange not found: " + exchange); } }else{ try{ - std::pair response = parent->exchanges->declare(exchange, type); + std::pair response = parent->exchanges.declare(exchange, type); if(!response.second && response.first->getType() != type){ throw ConnectionException(507, "Exchange already declared to be of type " + response.first->getType() + ", requested " + type); @@ -288,7 +289,7 @@ void SessionHandlerImpl::ExchangeHandlerImpl::delete_(u_int16_t channel, u_int16 const string& exchange, bool /*ifUnused*/, bool nowait){ //TODO: implement unused - parent->exchanges->destroy(exchange); + parent->exchanges.destroy(exchange); if(!nowait) parent->client->getExchange().deleteOk(channel); } @@ -300,7 +301,7 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue = parent->getQueue(name, channel); } else { std::pair queue_created = - parent->queues->declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); + parent->queues.declare(name, durable, autoDelete ? parent->settings.timeout : 0, exclusive ? parent : 0); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -310,11 +311,11 @@ void SessionHandlerImpl::QueueHandlerImpl::declare(u_int16_t channel, u_int16_t queue_created.first->create(arguments); //add default binding: - parent->exchanges->getDefault()->bind(queue, name, 0); + parent->exchanges.getDefault()->bind(queue, name, 0); if (exclusive) { parent->exclusiveQueues.push_back(queue); } else if(autoDelete){ - parent->cleaner->add(queue); + parent->cleaner.add(queue); } } } @@ -332,7 +333,7 @@ void SessionHandlerImpl::QueueHandlerImpl::bind(u_int16_t channel, u_int16_t /*t const FieldTable& arguments){ Queue::shared_ptr queue = parent->getQueue(queueName, channel); - Exchange::shared_ptr exchange = parent->exchanges->get(exchangeName); + Exchange::shared_ptr exchange = parent->exchanges.get(exchangeName); if(exchange){ // kpvdr - cannot use this any longer as routingKey is now const // if(routingKey.empty() && queueName.empty()) routingKey = queue->getName(); @@ -369,7 +370,7 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t } count = q->getMessageCount(); q->destroy(); - parent->queues->destroy(queue); + parent->queues.destroy(queue); } if(!nowait) parent->client->getQueue().deleteOk(channel, count); @@ -424,7 +425,7 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t const string& exchangeName, const string& routingKey, bool mandatory, bool immediate){ - Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges->getDefault() : parent->exchanges->get(exchangeName); + Exchange::shared_ptr exchange = exchangeName.empty() ? parent->exchanges.getDefault() : parent->exchanges.get(exchangeName); if(exchange){ Message* msg = new Message(parent, exchangeName, routingKey, mandatory, immediate); parent->getChannel(channel)->handlePublish(msg, exchange); @@ -652,3 +653,4 @@ SessionHandlerImpl::MessageHandlerImpl::transfer( u_int16_t /*channel*/, assert(0); // FIXME astitcher 2007-01-11: 0-9 feature } +}} diff --git a/cpp/lib/broker/SessionHandlerImpl.h b/cpp/lib/broker/SessionHandlerImpl.h index 25803fe1f7..08b05a11b6 100644 --- a/cpp/lib/broker/SessionHandlerImpl.h +++ b/cpp/lib/broker/SessionHandlerImpl.h @@ -40,6 +40,7 @@ #include #include #include +#include "Broker.h" namespace qpid { namespace broker { @@ -77,11 +78,10 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, qpid::sys::SessionContext* context; qpid::framing::AMQP_ClientProxy* client; - QueueRegistry* queues; - ExchangeRegistry* const exchanges; - AutoDelete* const cleaner; - const Settings settings; - + QueueRegistry& queues; + ExchangeRegistry& exchanges; + AutoDelete& cleaner; + Settings settings; std::auto_ptr basicHandler; std::auto_ptr channelHandler; std::auto_ptr connectionHandler; @@ -112,8 +112,7 @@ class SessionHandlerImpl : public qpid::sys::SessionHandler, Exchange::shared_ptr findExchange(const string& name); public: - SessionHandlerImpl(qpid::sys::SessionContext* context, QueueRegistry* queues, - ExchangeRegistry* exchanges, AutoDelete* cleaner, const Settings& settings); + SessionHandlerImpl(qpid::sys::SessionContext* context, Broker& broker); virtual void received(qpid::framing::AMQFrame* frame); virtual void initiated(qpid::framing::ProtocolInitiation* header); virtual void idleOut(); diff --git a/cpp/lib/common/framing/Responder.cpp b/cpp/lib/common/framing/Responder.cpp index efe3609c7b..1fbbfb8542 100644 --- a/cpp/lib/common/framing/Responder.cpp +++ b/cpp/lib/common/framing/Responder.cpp @@ -30,9 +30,9 @@ void Responder::received(const AMQRequestBody::Data& request) { responseMark = request.responseMark; } -void Responder::sending(AMQResponseBody::Data& response, RequestId toRequest) { +void Responder::sending(AMQResponseBody::Data& response) { response.responseId = ++lastId; - response.requestId = toRequest; + // response.requestId should have been set by caller. response.batchOffset = 0; } diff --git a/cpp/lib/common/framing/Responder.h b/cpp/lib/common/framing/Responder.h index a11967acc1..0e1785256b 100644 --- a/cpp/lib/common/framing/Responder.h +++ b/cpp/lib/common/framing/Responder.h @@ -40,7 +40,7 @@ class Responder void received(const AMQRequestBody::Data& request); /** Called before sending a response to set respose data. */ - void sending(AMQResponseBody::Data& response, RequestId toRequest); + void sending(AMQResponseBody::Data& response); /** Get the ID of the highest response acknowledged by the peer. */ ResponseId getResponseMark() { return responseMark; } -- cgit v1.2.1