summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Broker.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Broker.cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp34
1 files changed, 28 insertions, 6 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 689fb0687c..9773b49b93 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -37,6 +37,9 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Thread.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -53,6 +56,9 @@
#endif
using qpid::sys::Acceptor;
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+using qpid::sys::Thread;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
using qpid::management::ManagementAgent;
@@ -121,6 +127,7 @@ const std::string amq_match("amq.match");
const std::string qpid_management("qpid.management");
Broker::Broker(const Broker::Options& conf) :
+ poller(new Poller),
config(conf),
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
@@ -253,15 +260,31 @@ void Broker::setStore (MessageStore* _store)
}
void Broker::run() {
- getAcceptor().run(&factory);
+
+ getAcceptor().run(poller, &factory);
+
+ Dispatcher d(poller);
+ int numIOThreads = config.workerThreads;
+ std::vector<Thread> t(numIOThreads-1);
+
+ // Run n-1 io threads
+ for (int i=0; i<numIOThreads-1; ++i)
+ t[i] = Thread(d);
+
+ // Run final thread
+ d.run();
+
+ // Now wait for n-1 io threads to exit
+ for (int i=0; i<numIOThreads-1; ++i) {
+ t[i].join();
+ }
}
void Broker::shutdown() {
// NB: this function must be async-signal safe, it must not
// call any function that is not async-signal safe.
// Any unsafe shutdown actions should be done in the destructor.
- if (acceptor)
- acceptor->shutdown();
+ poller->shutdown();
}
Broker::~Broker() {
@@ -281,8 +304,7 @@ Acceptor& Broker::getAcceptor() const {
if (!acceptor) {
const_cast<Acceptor::shared_ptr&>(acceptor) =
Acceptor::create(config.port,
- config.connectionBacklog,
- config.workerThreads);
+ config.connectionBacklog);
QPID_LOG(info, "Listening on port " << getPort());
}
return *acceptor;
@@ -330,7 +352,7 @@ void Broker::connect(
const std::string& host, uint16_t port,
sys::ConnectionCodec::Factory* f)
{
- getAcceptor().connect(host, port, f ? f : &factory);
+ getAcceptor().connect(poller, host, port, f ? f : &factory);
}
void Broker::connect(