diff options
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 34 |
1 files changed, 28 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 689fb0687c..9773b49b93 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -37,6 +37,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Acceptor.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Thread.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" @@ -53,6 +56,9 @@ #endif using qpid::sys::Acceptor; +using qpid::sys::Poller; +using qpid::sys::Dispatcher; +using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; using qpid::management::ManagementAgent; @@ -121,6 +127,7 @@ const std::string amq_match("amq.match"); const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : + poller(new Poller), config(conf), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), @@ -253,15 +260,31 @@ void Broker::setStore (MessageStore* _store) } void Broker::run() { - getAcceptor().run(&factory); + + getAcceptor().run(poller, &factory); + + Dispatcher d(poller); + int numIOThreads = config.workerThreads; + std::vector<Thread> t(numIOThreads-1); + + // Run n-1 io threads + for (int i=0; i<numIOThreads-1; ++i) + t[i] = Thread(d); + + // Run final thread + d.run(); + + // Now wait for n-1 io threads to exit + for (int i=0; i<numIOThreads-1; ++i) { + t[i].join(); + } } void Broker::shutdown() { // NB: this function must be async-signal safe, it must not // call any function that is not async-signal safe. // Any unsafe shutdown actions should be done in the destructor. - if (acceptor) - acceptor->shutdown(); + poller->shutdown(); } Broker::~Broker() { @@ -281,8 +304,7 @@ Acceptor& Broker::getAcceptor() const { if (!acceptor) { const_cast<Acceptor::shared_ptr&>(acceptor) = Acceptor::create(config.port, - config.connectionBacklog, - config.workerThreads); + config.connectionBacklog); QPID_LOG(info, "Listening on port " << getPort()); } return *acceptor; @@ -330,7 +352,7 @@ void Broker::connect( const std::string& host, uint16_t port, sys::ConnectionCodec::Factory* f) { - getAcceptor().connect(host, port, f ? f : &factory); + getAcceptor().connect(poller, host, port, f ? f : &factory); } void Broker::connect( |
