diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-04-17 00:19:14 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-17 00:19:14 +0000 |
| commit | 71d805b6086ae19ed774589c25702d791ee91cf2 (patch) | |
| tree | 67d99aa0b37a9592786ebd0897b6f631db24ca70 /cpp/src/qpid/broker | |
| parent | c6faf3b28d0cf89051b2bff6476f3113285ed9a6 (diff) | |
| download | qpid-python-71d805b6086ae19ed774589c25702d791ee91cf2.tar.gz | |
Refactored IO Thread creation so that it happens in the Broker class
- There is now a single Poller created by the Broker class that is
passed to the Acceptor for use in network IO. It can also now be passed
to anything else that wants to put work in the IO threads
- The Broker class itself is now responsible for actually creating the
threads
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648904 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 34 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 5 |
2 files changed, 33 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 689fb0687c..9773b49b93 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -37,6 +37,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Acceptor.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Thread.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" @@ -53,6 +56,9 @@ #endif using qpid::sys::Acceptor; +using qpid::sys::Poller; +using qpid::sys::Dispatcher; +using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; using qpid::management::ManagementAgent; @@ -121,6 +127,7 @@ const std::string amq_match("amq.match"); const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : + poller(new Poller), config(conf), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), @@ -253,15 +260,31 @@ void Broker::setStore (MessageStore* _store) } void Broker::run() { - getAcceptor().run(&factory); + + getAcceptor().run(poller, &factory); + + Dispatcher d(poller); + int numIOThreads = config.workerThreads; + std::vector<Thread> t(numIOThreads-1); + + // Run n-1 io threads + for (int i=0; i<numIOThreads-1; ++i) + t[i] = Thread(d); + + // Run final thread + d.run(); + + // Now wait for n-1 io threads to exit + for (int i=0; i<numIOThreads-1; ++i) { + t[i].join(); + } } void Broker::shutdown() { // NB: this function must be async-signal safe, it must not // call any function that is not async-signal safe. // Any unsafe shutdown actions should be done in the destructor. - if (acceptor) - acceptor->shutdown(); + poller->shutdown(); } Broker::~Broker() { @@ -281,8 +304,7 @@ Acceptor& Broker::getAcceptor() const { if (!acceptor) { const_cast<Acceptor::shared_ptr&>(acceptor) = Acceptor::create(config.port, - config.connectionBacklog, - config.workerThreads); + config.connectionBacklog); QPID_LOG(info, "Listening on port " << getPort()); } return *acceptor; @@ -330,7 +352,7 @@ void Broker::connect( const std::string& host, uint16_t port, sys::ConnectionCodec::Factory* f) { - getAcceptor().connect(host, port, f ? f : &factory); + getAcceptor().connect(poller, host, port, f ? f : &factory); } void Broker::connect( diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index ea29348c16..49c4b203c4 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -50,6 +50,10 @@ namespace qpid { +namespace sys { + class Poller; +} + class Url; namespace broker { @@ -129,6 +133,7 @@ class Broker : public sys::Runnable, public Plugin::Target, private: sys::Acceptor& getAcceptor() const; + boost::shared_ptr<qpid::sys::Poller> poller; Options config; sys::Acceptor::shared_ptr acceptor; MessageStore* store; |
