From 71d805b6086ae19ed774589c25702d791ee91cf2 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Thu, 17 Apr 2008 00:19:14 +0000 Subject: Refactored IO Thread creation so that it happens in the Broker class - There is now a single Poller created by the Broker class that is passed to the Acceptor for use in network IO. It can also now be passed to anything else that wants to put work in the IO threads - The Broker class itself is now responsible for actually creating the threads git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648904 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.cpp | 34 ++++++++++++++++++++++++++++------ cpp/src/qpid/broker/Broker.h | 5 +++++ 2 files changed, 33 insertions(+), 6 deletions(-) (limited to 'cpp/src/qpid/broker') diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 689fb0687c..9773b49b93 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -37,6 +37,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/sys/Acceptor.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/Dispatcher.h" +#include "qpid/sys/Thread.h" #include "qpid/sys/ConnectionInputHandler.h" #include "qpid/sys/ConnectionInputHandlerFactory.h" #include "qpid/sys/TimeoutHandler.h" @@ -53,6 +56,9 @@ #endif using qpid::sys::Acceptor; +using qpid::sys::Poller; +using qpid::sys::Dispatcher; +using qpid::sys::Thread; using qpid::framing::FrameHandler; using qpid::framing::ChannelId; using qpid::management::ManagementAgent; @@ -121,6 +127,7 @@ const std::string amq_match("amq.match"); const std::string qpid_management("qpid.management"); Broker::Broker(const Broker::Options& conf) : + poller(new Poller), config(conf), store(0), dataDir(conf.noDataDir ? std::string () : conf.dataDir), @@ -253,15 +260,31 @@ void Broker::setStore (MessageStore* _store) } void Broker::run() { - getAcceptor().run(&factory); + + getAcceptor().run(poller, &factory); + + Dispatcher d(poller); + int numIOThreads = config.workerThreads; + std::vector t(numIOThreads-1); + + // Run n-1 io threads + for (int i=0; ishutdown(); + poller->shutdown(); } Broker::~Broker() { @@ -281,8 +304,7 @@ Acceptor& Broker::getAcceptor() const { if (!acceptor) { const_cast(acceptor) = Acceptor::create(config.port, - config.connectionBacklog, - config.workerThreads); + config.connectionBacklog); QPID_LOG(info, "Listening on port " << getPort()); } return *acceptor; @@ -330,7 +352,7 @@ void Broker::connect( const std::string& host, uint16_t port, sys::ConnectionCodec::Factory* f) { - getAcceptor().connect(host, port, f ? f : &factory); + getAcceptor().connect(poller, host, port, f ? f : &factory); } void Broker::connect( diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index ea29348c16..49c4b203c4 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -50,6 +50,10 @@ namespace qpid { +namespace sys { + class Poller; +} + class Url; namespace broker { @@ -129,6 +133,7 @@ class Broker : public sys::Runnable, public Plugin::Target, private: sys::Acceptor& getAcceptor() const; + boost::shared_ptr poller; Options config; sys::Acceptor::shared_ptr acceptor; MessageStore* store; -- cgit v1.2.1