diff options
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 34 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 5 |
2 files changed, 33 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 689fb0687c..9773b49b93 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -37,6 +37,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Acceptor.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Thread.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" @@ -53,6 +56,9 @@ #endif using qpid::sys::Acceptor; +using qpid::sys::Poller; +using qpid::sys::Dispatcher; +using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; using qpid::management::ManagementAgent; @@ -121,6 +127,7 @@ const std::string amq_match("amq.match"); const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : + poller(new Poller), config(conf), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), @@ -253,15 +260,31 @@ void Broker::setStore (MessageStore* _store) } void Broker::run() { - getAcceptor().run(&factory); + + getAcceptor().run(poller, &factory); + + Dispatcher d(poller); + int numIOThreads = config.workerThreads; + std::vector<Thread> t(numIOThreads-1); + + // Run n-1 io threads + for (int i=0; i<numIOThreads-1; ++i) + t[i] = Thread(d); + + // Run final thread + d.run(); + + // Now wait for n-1 io threads to exit + for (int i=0; i<numIOThreads-1; ++i) { + t[i].join(); + } } void Broker::shutdown() { // NB: this function must be async-signal safe, it must not // call any function that is not async-signal safe. // Any unsafe shutdown actions should be done in the destructor. - if (acceptor) - acceptor->shutdown(); + poller->shutdown(); } Broker::~Broker() { @@ -281,8 +304,7 @@ Acceptor& Broker::getAcceptor() const { if (!acceptor) { const_cast<Acceptor::shared_ptr&>(acceptor) = Acceptor::create(config.port, - config.connectionBacklog, - config.workerThreads); + config.connectionBacklog); QPID_LOG(info, "Listening on port " << getPort()); } return *acceptor; @@ -330,7 +352,7 @@ void Broker::connect( const std::string& host, uint16_t port, sys::ConnectionCodec::Factory* f) { - getAcceptor().connect(host, port, f ? f : &factory); + getAcceptor().connect(poller, host, port, f ? f : &factory); } void Broker::connect( diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index ea29348c16..49c4b203c4 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -50,6 +50,10 @@ namespace qpid { +namespace sys { + class Poller; +} + class Url; namespace broker { @@ -129,6 +133,7 @@ class Broker : public sys::Runnable, public Plugin::Target, private: sys::Acceptor& getAcceptor() const; + boost::shared_ptr<qpid::sys::Poller> poller; Options config; sys::Acceptor::shared_ptr acceptor; MessageStore* store; |
