diff options
| author | Alan Conway <aconway@apache.org> | 2007-01-15 18:28:29 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-01-15 18:28:29 +0000 |
| commit | 87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274 (patch) | |
| tree | 03929353228d063bf8892d6c57d4bf46fa467ddd /cpp/lib/broker/Broker.cpp | |
| parent | fa15e6d52022cc1576b19e3caaecf66260c1923e (diff) | |
| download | qpid-python-87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274.tar.gz | |
* Refactor: Moved major broker components (exchanges, queues etc.) from
class SessionHandlerImplFactory to more logical class Broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496425 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/Broker.cpp')
| -rw-r--r-- | cpp/lib/broker/Broker.cpp | 55 |
1 files changed, 50 insertions, 5 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index 6c0d7a3f3f..6a8b1f8538 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -20,19 +20,61 @@ */ #include <iostream> #include <memory> -#include <Broker.h> +#include "AMQFrame.h" +#include "DirectExchange.h" +#include "FanOutExchange.h" +#include "HeadersExchange.h" +#include "MessageStoreModule.h" +#include "NullMessageStore.h" +#include "ProtocolInitiation.h" +#include "SessionHandlerImpl.h" +#include "sys/SessionContext.h" +#include "sys/SessionHandler.h" +#include "sys/SessionHandlerFactory.h" +#include "sys/TimeoutHandler.h" -using namespace qpid::broker; -using namespace qpid::sys; +#include "Broker.h" + +namespace qpid { +namespace broker { + +const std::string empty; +const std::string amq_direct("amq.direct"); +const std::string amq_topic("amq.topic"); +const std::string amq_fanout("amq.fanout"); +const std::string amq_match("amq.match"); Broker::Broker(const Configuration& config) : acceptor(Acceptor::create(config.getPort(), config.getConnectionBacklog(), config.getWorkerThreads(), config.isTrace())), - factory(config.getStore()) -{ } + queues(store.get()), + timeout(30000), + stagingThreshold(0), + cleaner(&queues, timeout/10), + factory(*this) +{ + if (config.getStore().empty()) + store.reset(new NullMessageStore()); + else + store.reset(new MessageStoreModule(config.getStore())); + + exchanges.declare(empty, DirectExchange::typeName); // Default exchange. + exchanges.declare(amq_direct, DirectExchange::typeName); + exchanges.declare(amq_topic, TopicExchange::typeName); + exchanges.declare(amq_fanout, FanOutExchange::typeName); + exchanges.declare(amq_match, HeadersExchange::typeName); + + if(store.get()) { + RecoveryManager recoverer(queues, exchanges); + MessageStoreSettings storeSettings = { getStagingThreshold() }; + store->recover(recoverer, &storeSettings); + } + + cleaner.start(); +} Broker::shared_ptr Broker::create(int16_t port) @@ -57,3 +99,6 @@ void Broker::shutdown() { Broker::~Broker() { } const int16_t Broker::DEFAULT_PORT(5672); + + +}} // namespace qpid::broker |
