diff options
Diffstat (limited to 'cpp/src/qpid/sys')
| -rw-r--r-- | cpp/src/qpid/sys/Acceptor.h | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 60 |
2 files changed, 25 insertions, 49 deletions
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h index 1e7827e60c..243e791eeb 100644 --- a/cpp/src/qpid/sys/Acceptor.h +++ b/cpp/src/qpid/sys/Acceptor.h @@ -26,22 +26,24 @@ #include "qpid/SharedObject.h" #include "ConnectionCodec.h" + namespace qpid { namespace sys { +class Poller; + class Acceptor : public qpid::SharedObject<Acceptor> { public: - static Acceptor::shared_ptr create(int16_t port, int backlog, int threads); + static Acceptor::shared_ptr create(int16_t port, int backlog); virtual ~Acceptor() = 0; virtual uint16_t getPort() const = 0; virtual std::string getHost() const = 0; - virtual void run(ConnectionCodec::Factory*) = 0; + virtual void run(boost::shared_ptr<Poller>, ConnectionCodec::Factory*) = 0; virtual void connect( - const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0; - - /** Note: this function is async-signal safe */ - virtual void shutdown() = 0; + boost::shared_ptr<Poller>, + const std::string& host, int16_t port, + ConnectionCodec::Factory* codec) = 0; }; inline Acceptor::~Acceptor() {} diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index 43fbfdf7be..5133fde183 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -43,19 +43,15 @@ namespace qpid { namespace sys { class AsynchIOAcceptor : public Acceptor { - Poller::shared_ptr poller; Socket listener; - int numIOThreads; const uint16_t listeningPort; + std::auto_ptr<AsynchAcceptor> acceptor; public: - AsynchIOAcceptor(int16_t port, int backlog, int threads); - ~AsynchIOAcceptor() {} - void run(ConnectionCodec::Factory*); - void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*); + AsynchIOAcceptor(int16_t port, int backlog); + void run(Poller::shared_ptr, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); - void shutdown(); - uint16_t getPort() const; std::string getHost() const; @@ -63,15 +59,14 @@ class AsynchIOAcceptor : public Acceptor { void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*); }; -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads) +Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog) { - return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads)); + return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog)); } -AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) : - poller(new Poller), - numIOThreads(threads), - listeningPort(listener.listen(port, backlog)) +AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog) : + listeningPort(listener.listen(port, backlog)), + acceptor(0) {} // Buffer definition @@ -157,30 +152,17 @@ std::string AsynchIOAcceptor::getHost() const { return listener.getSockname(); } -void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) { - Dispatcher d(poller); - AsynchAcceptor - acceptor(listener, - boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact)); - acceptor.start(poller); - - std::vector<Thread> t(numIOThreads-1); - - // Run n-1 io threads - for (int i=0; i<numIOThreads-1; ++i) - t[i] = Thread(d); - - // Run final thread - d.run(); - - // Now wait for n-1 io threads to exit - for (int i=0; i<numIOThreads-1; ++i) { - t[i].join(); - } +void AsynchIOAcceptor::run(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { + acceptor.reset( + new AsynchAcceptor(listener, + boost::bind(&AsynchIOAcceptor::accepted, this, poller, _1, fact))); + acceptor->start(poller); } void AsynchIOAcceptor::connect( - const std::string& host, int16_t port, ConnectionCodec::Factory* f) + Poller::shared_ptr poller, + const std::string& host, int16_t port, + ConnectionCodec::Factory* f) { Socket* socket = new Socket();//Should be deleted by handle when socket closes socket->connect(host, port); @@ -202,14 +184,6 @@ void AsynchIOAcceptor::connect( aio->start(poller); } - -void AsynchIOAcceptor::shutdown() { - // NB: this function must be async-signal safe, it must not - // call any function that is not async-signal safe. - poller->shutdown(); -} - - void AsynchIOHandler::write(const framing::ProtocolInitiation& data) { QPID_LOG(debug, "SENT [" << identifier << "] INIT(" << data << ")"); |
