summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/Broker.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-01-15 18:28:29 +0000
committerAlan Conway <aconway@apache.org>2007-01-15 18:28:29 +0000
commit87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274 (patch)
tree03929353228d063bf8892d6c57d4bf46fa467ddd /cpp/lib/broker/Broker.cpp
parentfa15e6d52022cc1576b19e3caaecf66260c1923e (diff)
downloadqpid-python-87d97b5bcc5b7fe44cfc71ce28ccfeae1d9b2274.tar.gz
* Refactor: Moved major broker components (exchanges, queues etc.) from
class SessionHandlerImplFactory to more logical class Broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@496425 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/Broker.cpp')
-rw-r--r--cpp/lib/broker/Broker.cpp55
1 files changed, 50 insertions, 5 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp
index 6c0d7a3f3f..6a8b1f8538 100644
--- a/cpp/lib/broker/Broker.cpp
+++ b/cpp/lib/broker/Broker.cpp
@@ -20,19 +20,61 @@
*/
#include <iostream>
#include <memory>
-#include <Broker.h>
+#include "AMQFrame.h"
+#include "DirectExchange.h"
+#include "FanOutExchange.h"
+#include "HeadersExchange.h"
+#include "MessageStoreModule.h"
+#include "NullMessageStore.h"
+#include "ProtocolInitiation.h"
+#include "SessionHandlerImpl.h"
+#include "sys/SessionContext.h"
+#include "sys/SessionHandler.h"
+#include "sys/SessionHandlerFactory.h"
+#include "sys/TimeoutHandler.h"
-using namespace qpid::broker;
-using namespace qpid::sys;
+#include "Broker.h"
+
+namespace qpid {
+namespace broker {
+
+const std::string empty;
+const std::string amq_direct("amq.direct");
+const std::string amq_topic("amq.topic");
+const std::string amq_fanout("amq.fanout");
+const std::string amq_match("amq.match");
Broker::Broker(const Configuration& config) :
acceptor(Acceptor::create(config.getPort(),
config.getConnectionBacklog(),
config.getWorkerThreads(),
config.isTrace())),
- factory(config.getStore())
-{ }
+ queues(store.get()),
+ timeout(30000),
+ stagingThreshold(0),
+ cleaner(&queues, timeout/10),
+ factory(*this)
+{
+ if (config.getStore().empty())
+ store.reset(new NullMessageStore());
+ else
+ store.reset(new MessageStoreModule(config.getStore()));
+
+ exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
+ exchanges.declare(amq_direct, DirectExchange::typeName);
+ exchanges.declare(amq_topic, TopicExchange::typeName);
+ exchanges.declare(amq_fanout, FanOutExchange::typeName);
+ exchanges.declare(amq_match, HeadersExchange::typeName);
+
+ if(store.get()) {
+ RecoveryManager recoverer(queues, exchanges);
+ MessageStoreSettings storeSettings = { getStagingThreshold() };
+ store->recover(recoverer, &storeSettings);
+ }
+
+ cleaner.start();
+}
Broker::shared_ptr Broker::create(int16_t port)
@@ -57,3 +99,6 @@ void Broker::shutdown() {
Broker::~Broker() { }
const int16_t Broker::DEFAULT_PORT(5672);
+
+
+}} // namespace qpid::broker