diff options
Diffstat (limited to 'cpp/src/qpid/sys/TCPIOPlugin.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/TCPIOPlugin.cpp | 54 |
1 files changed, 38 insertions, 16 deletions
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp index 045bc56e90..5d2cadbe03 100644 --- a/cpp/src/qpid/sys/TCPIOPlugin.cpp +++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp @@ -41,13 +41,15 @@ class AsynchIOProtocolFactory : public ProtocolFactory { public: AsynchIOProtocolFactory(int16_t port, int backlog); void accept(Poller::shared_ptr, ConnectionCodec::Factory*); - void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*); + void connect(Poller::shared_ptr, const std::string& host, int16_t port, + ConnectionCodec::Factory*, ProtocolAccess*); uint16_t getPort() const; std::string getHost() const; private: - void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient); + void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, + bool isClient, ProtocolAccess*); }; // Static instance to initialise plugin @@ -72,17 +74,32 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) : {} void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s, - ConnectionCodec::Factory* f, bool isClient) { - AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f); + ConnectionCodec::Factory* f, bool isClient, + ProtocolAccess* a) { + AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a); + AsynchIO* aio; + if (isClient) async->setClient(); - AsynchIO* aio = new AsynchIO(s, - boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), - boost::bind(&AsynchIOHandler::eof, async, _1), - boost::bind(&AsynchIOHandler::disconnect, async, _1), - boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), - boost::bind(&AsynchIOHandler::nobuffs, async, _1), - boost::bind(&AsynchIOHandler::idle, async, _1)); + if (a == 0) + aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&AsynchIOHandler::eof, async, _1), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + else { + aio = new AsynchIO(s, + boost::bind(&AsynchIOHandler::readbuff, async, _1, _2), + boost::bind(&ProtocolAccess::closedEof, a, async), + boost::bind(&AsynchIOHandler::disconnect, async, _1), + boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2), + boost::bind(&AsynchIOHandler::nobuffs, async, _1), + boost::bind(&AsynchIOHandler::idle, async, _1)); + a->setAio(aio); + } + async->init(aio, 4); aio->start(poller); } @@ -95,26 +112,31 @@ std::string AsynchIOProtocolFactory::getHost() const { return listener.getSockname(); } -void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) { +void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, + ConnectionCodec::Factory* fact) { acceptor.reset( new AsynchAcceptor(listener, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false))); + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false, + (ProtocolAccess*) 0))); acceptor->start(poller); } void AsynchIOProtocolFactory::connect( Poller::shared_ptr poller, const std::string& host, int16_t port, - ConnectionCodec::Factory* f) + ConnectionCodec::Factory* fact, + ProtocolAccess* access) { // Note that the following logic does not cause a memory leak. // The allocated Socket is freed either by the AsynchConnector // upon connection failure or by the AsynchIO upon connection // shutdown. The allocated AsynchConnector frees itself when it // is no longer needed. + Socket* socket = new Socket(); - new AsynchConnector(*socket, poller, host, port, - boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, f, true)); + new AsynchConnector (*socket, poller, host, port, + boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access), + boost::bind(&ProtocolAccess::closed, access, _1, _2)); } }} // namespace qpid::sys |
