summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp34
-rw-r--r--cpp/src/qpid/broker/Broker.h5
-rw-r--r--cpp/src/qpid/sys/Acceptor.h14
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp60
4 files changed, 58 insertions, 55 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 689fb0687c..9773b49b93 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -37,6 +37,9 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/sys/Acceptor.h"
+#include "qpid/sys/Poller.h"
+#include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Thread.h"
#include "qpid/sys/ConnectionInputHandler.h"
#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/sys/TimeoutHandler.h"
@@ -53,6 +56,9 @@
#endif
using qpid::sys::Acceptor;
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+using qpid::sys::Thread;
using qpid::framing::FrameHandler;
using qpid::framing::ChannelId;
using qpid::management::ManagementAgent;
@@ -121,6 +127,7 @@ const std::string amq_match("amq.match");
const std::string qpid_management("qpid.management");
Broker::Broker(const Broker::Options& conf) :
+ poller(new Poller),
config(conf),
store(0),
dataDir(conf.noDataDir ? std::string () : conf.dataDir),
@@ -253,15 +260,31 @@ void Broker::setStore (MessageStore* _store)
}
void Broker::run() {
- getAcceptor().run(&factory);
+
+ getAcceptor().run(poller, &factory);
+
+ Dispatcher d(poller);
+ int numIOThreads = config.workerThreads;
+ std::vector<Thread> t(numIOThreads-1);
+
+ // Run n-1 io threads
+ for (int i=0; i<numIOThreads-1; ++i)
+ t[i] = Thread(d);
+
+ // Run final thread
+ d.run();
+
+ // Now wait for n-1 io threads to exit
+ for (int i=0; i<numIOThreads-1; ++i) {
+ t[i].join();
+ }
}
void Broker::shutdown() {
// NB: this function must be async-signal safe, it must not
// call any function that is not async-signal safe.
// Any unsafe shutdown actions should be done in the destructor.
- if (acceptor)
- acceptor->shutdown();
+ poller->shutdown();
}
Broker::~Broker() {
@@ -281,8 +304,7 @@ Acceptor& Broker::getAcceptor() const {
if (!acceptor) {
const_cast<Acceptor::shared_ptr&>(acceptor) =
Acceptor::create(config.port,
- config.connectionBacklog,
- config.workerThreads);
+ config.connectionBacklog);
QPID_LOG(info, "Listening on port " << getPort());
}
return *acceptor;
@@ -330,7 +352,7 @@ void Broker::connect(
const std::string& host, uint16_t port,
sys::ConnectionCodec::Factory* f)
{
- getAcceptor().connect(host, port, f ? f : &factory);
+ getAcceptor().connect(poller, host, port, f ? f : &factory);
}
void Broker::connect(
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index ea29348c16..49c4b203c4 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -50,6 +50,10 @@
namespace qpid {
+namespace sys {
+ class Poller;
+}
+
class Url;
namespace broker {
@@ -129,6 +133,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
private:
sys::Acceptor& getAcceptor() const;
+ boost::shared_ptr<qpid::sys::Poller> poller;
Options config;
sys::Acceptor::shared_ptr acceptor;
MessageStore* store;
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
index 1e7827e60c..243e791eeb 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -26,22 +26,24 @@
#include "qpid/SharedObject.h"
#include "ConnectionCodec.h"
+
namespace qpid {
namespace sys {
+class Poller;
+
class Acceptor : public qpid::SharedObject<Acceptor>
{
public:
- static Acceptor::shared_ptr create(int16_t port, int backlog, int threads);
+ static Acceptor::shared_ptr create(int16_t port, int backlog);
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
- virtual void run(ConnectionCodec::Factory*) = 0;
+ virtual void run(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0;
virtual void connect(
- const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0;
-
- /** Note: this function is async-signal safe */
- virtual void shutdown() = 0;
+ boost::shared_ptr<Poller>,
+ const std::string& host, int16_t port,
+ ConnectionCodec::Factory* codec) = 0;
};
inline Acceptor::~Acceptor() {}
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index 43fbfdf7be..5133fde183 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -43,19 +43,15 @@ namespace qpid {
namespace sys {
class AsynchIOAcceptor : public Acceptor {
- Poller::shared_ptr poller;
Socket listener;
- int numIOThreads;
const uint16_t listeningPort;
+ std::auto_ptr<AsynchAcceptor> acceptor;
public:
- AsynchIOAcceptor(int16_t port, int backlog, int threads);
- ~AsynchIOAcceptor() {}
- void run(ConnectionCodec::Factory*);
- void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*);
+ AsynchIOAcceptor(int16_t port, int backlog);
+ void run(Poller::shared_ptr, ConnectionCodec::Factory*);
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*);
- void shutdown();
-
uint16_t getPort() const;
std::string getHost() const;
@@ -63,15 +59,14 @@ class AsynchIOAcceptor : public Acceptor {
void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
};
-Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
+Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog)
{
- return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
+ return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog));
}
-AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
- poller(new Poller),
- numIOThreads(threads),
- listeningPort(listener.listen(port, backlog))
+AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) :
+ listeningPort(listener.listen(port, backlog)),
+ acceptor(0)
{}
// Buffer definition
@@ -157,30 +152,17 @@ std::string AsynchIOAcceptor::getHost() const {
return listener.getSockname();
}
-void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) {
- Dispatcher d(poller);
- AsynchAcceptor
- acceptor(listener,
- boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact));
- acceptor.start(poller);
-
- std::vector<Thread> t(numIOThreads-1);
-
- // Run n-1 io threads
- for (int i=0; i<numIOThreads-1; ++i)
- t[i] = Thread(d);
-
- // Run final thread
- d.run();
-
- // Now wait for n-1 io threads to exit
- for (int i=0; i<numIOThreads-1; ++i) {
- t[i].join();
- }
+void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+ acceptor.reset(
+ new AsynchAcceptor(listener,
+ boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)));
+ acceptor->start(poller);
}
void AsynchIOAcceptor::connect(
- const std::string& host, int16_t port, ConnectionCodec::Factory* f)
+ Poller::shared_ptr poller,
+ const std::string& host, int16_t port,
+ ConnectionCodec::Factory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
@@ -202,14 +184,6 @@ void AsynchIOAcceptor::connect(
aio->start(poller);
}
-
-void AsynchIOAcceptor::shutdown() {
- // NB: this function must be async-signal safe, it must not
- // call any function that is not async-signal safe.
- poller->shutdown();
-}
-
-
void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
{
QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")");