summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-04-17 00:19:14 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-04-17 00:19:14 +0000
commit71d805b6086ae19ed774589c25702d791ee91cf2 (patch)
tree67d99aa0b37a9592786ebd0897b6f631db24ca70 /cpp/src/qpid/broker
parentc6faf3b28d0cf89051b2bff6476f3113285ed9a6 (diff)
downloadqpid-python-71d805b6086ae19ed774589c25702d791ee91cf2.tar.gz
Refactored IO Thread creation so that it happens in the Broker class
- There is now a single Poller created by the Broker class that is passed to the Acceptor for use in network IO. It can also now be passed to anything else that wants to put work in the IO threads - The Broker class itself is now responsible for actually creating the threads git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@648904 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp34
-rw-r--r--cpp/src/qpid/broker/Broker.h5
2 files changed, 33 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 689fb0687c..9773b49b93 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -37,6 +37,9 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Thread.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -53,6 +56,9 @@
#endif
using qpid::sys::Acceptor;
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+using qpid::sys::Thread;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
using qpid::management::ManagementAgent;
@@ -121,6 +127,7 @@ const std::string amq_match("amq.match");
const std::string qpid_management("qpid.management");
Broker::Broker(const Broker::Options& conf) :
+ poller(new Poller),
config(conf),
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
@@ -253,15 +260,31 @@ void Broker::setStore (MessageStore* _store)
}
void Broker::run() {
- getAcceptor().run(&factory);
+
+ getAcceptor().run(poller, &factory);
+
+ Dispatcher d(poller);
+ int numIOThreads = config.workerThreads;
+ std::vector<Thread> t(numIOThreads-1);
+
+ // Run n-1 io threads
+ for (int i=0; i<numIOThreads-1; ++i)
+ t[i] = Thread(d);
+
+ // Run final thread
+ d.run();
+
+ // Now wait for n-1 io threads to exit
+ for (int i=0; i<numIOThreads-1; ++i) {
+ t[i].join();
+ }
}
void Broker::shutdown() {
// NB: this function must be async-signal safe, it must not
// call any function that is not async-signal safe.
// Any unsafe shutdown actions should be done in the destructor.
- if (acceptor)
- acceptor->shutdown();
+ poller->shutdown();
}
Broker::~Broker() {
@@ -281,8 +304,7 @@ Acceptor& Broker::getAcceptor() const {
if (!acceptor) {
const_cast<Acceptor::shared_ptr&>(acceptor) =
Acceptor::create(config.port,
- config.connectionBacklog,
- config.workerThreads);
+ config.connectionBacklog);
QPID_LOG(info, "Listening on port " << getPort());
}
return *acceptor;
@@ -330,7 +352,7 @@ void Broker::connect(
const std::string& host, uint16_t port,
sys::ConnectionCodec::Factory* f)
{
- getAcceptor().connect(host, port, f ? f : &factory);
+ getAcceptor().connect(poller, host, port, f ? f : &factory);
}
void Broker::connect(
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index ea29348c16..49c4b203c4 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -50,6 +50,10 @@
namespace qpid {
+namespace sys {
+ class Poller;
+}
+
class Url;
namespace broker {
@@ -129,6 +133,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
private:
sys::Acceptor& getAcceptor() const;
+ boost::shared_ptr<qpid::sys::Poller> poller;
Options config;
sys::Acceptor::shared_ptr acceptor;
MessageStore* store;