diff options
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 95 |
1 files changed, 66 insertions, 29 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index a782caef6b..9be9e7127c 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -75,14 +75,12 @@ class RdmaConnector : public Connector, public sys::Codec framing::OutputHandler* output; Rdma::AsynchIO* aio; + std::auto_ptr<Rdma::Connector> acon; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); - void handleClosed(); - bool closeInternal(); - // Callbacks void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&); void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType); @@ -92,7 +90,9 @@ class RdmaConnector : public Connector, public sys::Codec void readbuff(Rdma::AsynchIO&, Rdma::Buffer*); void writebuff(Rdma::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); - void eof(Rdma::AsynchIO&); + void dataError(Rdma::AsynchIO&); + void drained(); + void stopped(Rdma::AsynchIO* aio=0); std::string identifier; @@ -153,26 +153,33 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p, QPID_LOG(debug, "RdmaConnector created for " << version); } +namespace { + void deleteAsynchIO(Rdma::AsynchIO& aio) { + delete &aio; + } +} + RdmaConnector::~RdmaConnector() { + QPID_LOG(debug, "~RdmaConnector " << identifier); close(); - if (aio) aio->deferDelete(); + if (aio) aio->stop(deleteAsynchIO); } void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); - Rdma::Connector* c = new Rdma::Connector( + acon.reset(new Rdma::Connector( Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaConnector::connected, this, poller, _1, _2), boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), boost::bind(&RdmaConnector::disconnected, this, poller, _1), - boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); + boost::bind(&RdmaConnector::rejected, this, poller, _1, _2))); polling = true; SocketAddress sa(host, boost::lexical_cast<std::string>(port)); - c->start(poller, sa); + acon->start(poller, sa); } // The following only gets run when connected @@ -184,7 +191,7 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru boost::bind(&RdmaConnector::readbuff, this, _1, _2), boost::bind(&RdmaConnector::writebuff, this, _1), 0, // write buffers full - boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection + boost::bind(&RdmaConnector::dataError, this, _1)); identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); ProtocolInitiation init(version); @@ -194,31 +201,70 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru } void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) { - QPID_LOG(trace, "Connection Error " << identifier); - eof(*aio); + QPID_LOG(debug, "Connection Error " << identifier); + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drain() anyway + if (!polling) return; + polling = false; + } + stopped(); } void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) { - eof(*aio); + QPID_LOG(debug, "Connection disconnected " << identifier); + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drain() anyway + if (!polling) return; + polling = false; + } + drained(); } void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) { - QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); - eof(*aio); + QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drain() anyway + if (!polling) return; + polling = false; + } + stopped(); } -bool RdmaConnector::closeInternal() { +void RdmaConnector::dataError(Rdma::AsynchIO&) { + QPID_LOG(debug, "Data Error " << identifier); + { Mutex::ScopedLock l(pollingLock); - bool ret = polling; + // If we're closed already then we'll get to drain() anyway + if (!polling) return; polling = false; - if (ret) { - if (aio) aio->queueWriteClose(); } - return ret; + drained(); +} + +void RdmaConnector::stopped(Rdma::AsynchIO* aio) { + delete aio; + if (shutdownHandler) { + shutdownHandler->shutdown(); + } +} + +void RdmaConnector::drained() { + QPID_LOG(debug, "RdmaConnector::drained " << identifier); + if (aio) { + aio->stop(boost::bind(&RdmaConnector::stopped, this, aio)); + aio = 0; + } } void RdmaConnector::close() { - closeInternal(); + QPID_LOG(debug, "RdmaConnector::close " << identifier); + Mutex::ScopedLock l(pollingLock); + if (!polling) return; + polling = false; + if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::setInputHandler(InputHandler* handler){ @@ -259,11 +305,6 @@ void RdmaConnector::send(AMQFrame& frame) { if (notifyWrite && polling) aio->notifyPendingWrite(); } -void RdmaConnector::handleClosed() { - if (closeInternal() && shutdownHandler) - shutdownHandler->shutdown(); -} - // Called in IO thread. (write idle routine) // This is NOT only called in response to previously calling notifyPendingWrite void RdmaConnector::writebuff(Rdma::AsynchIO&) { @@ -340,10 +381,6 @@ void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { aio->queueWrite(buff); } -void RdmaConnector::eof(Rdma::AsynchIO&) { - handleClosed(); -} - void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; |
