diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2010-05-18 21:41:25 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2010-05-18 21:41:25 +0000 |
| commit | 27920d16e70f590b49548877a2129a1d2162d985 (patch) | |
| tree | e3b90b82b63c89ee03317d129192299074fae15f /cpp/src/qpid/client | |
| parent | bbf31a9b3113ad6d37ed24d2ce767dd5f830afa3 (diff) | |
| download | qpid-python-27920d16e70f590b49548877a2129a1d2162d985.tar.gz | |
Fix RDMA for upstream changes which now require notification on shutdown
differently from before
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945904 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 95 |
1 files changed, 66 insertions, 29 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index a782caef6b..9be9e7127c 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -75,14 +75,12 @@ class RdmaConnector : public Connector, public sys::Codec framing::OutputHandler* output; Rdma::AsynchIO* aio; + std::auto_ptr<Rdma::Connector> acon; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); - void handleClosed(); - bool closeInternal(); - // Callbacks void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&); void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType); @@ -92,7 +90,9 @@ class RdmaConnector : public Connector, public sys::Codec void readbuff(Rdma::AsynchIO&, Rdma::Buffer*); void writebuff(Rdma::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); - void eof(Rdma::AsynchIO&); + void dataError(Rdma::AsynchIO&); + void drained(); + void stopped(Rdma::AsynchIO* aio=0); std::string identifier; @@ -153,26 +153,33 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p, QPID_LOG(debug, "RdmaConnector created for " << version); } +namespace { + void deleteAsynchIO(Rdma::AsynchIO& aio) { + delete &aio; + } +} + RdmaConnector::~RdmaConnector() { + QPID_LOG(debug, "~RdmaConnector " << identifier); close(); - if (aio) aio->deferDelete(); + if (aio) aio->stop(deleteAsynchIO); } void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); - Rdma::Connector* c = new Rdma::Connector( + acon.reset(new Rdma::Connector( Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaConnector::connected, this, poller, _1, _2), boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), boost::bind(&RdmaConnector::disconnected, this, poller, _1), - boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); + boost::bind(&RdmaConnector::rejected, this, poller, _1, _2))); polling = true; SocketAddress sa(host, boost::lexical_cast<std::string>(port)); - c->start(poller, sa); + acon->start(poller, sa); } // The following only gets run when connected @@ -184,7 +191,7 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru boost::bind(&RdmaConnector::readbuff, this, _1, _2), boost::bind(&RdmaConnector::writebuff, this, _1), 0, // write buffers full - boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection + boost::bind(&RdmaConnector::dataError, this, _1)); identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName()); ProtocolInitiation init(version); @@ -194,31 +201,70 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru } void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) { - QPID_LOG(trace, "Connection Error " << identifier); - eof(*aio); + QPID_LOG(debug, "Connection Error " << identifier); + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drain() anyway + if (!polling) return; + polling = false; + } + stopped(); } void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) { - eof(*aio); + QPID_LOG(debug, "Connection disconnected " << identifier); + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drain() anyway + if (!polling) return; + polling = false; + } + drained(); } void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) { - QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); - eof(*aio); + QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize); + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drain() anyway + if (!polling) return; + polling = false; + } + stopped(); } -bool RdmaConnector::closeInternal() { +void RdmaConnector::dataError(Rdma::AsynchIO&) { + QPID_LOG(debug, "Data Error " << identifier); + { Mutex::ScopedLock l(pollingLock); - bool ret = polling; + // If we're closed already then we'll get to drain() anyway + if (!polling) return; polling = false; - if (ret) { - if (aio) aio->queueWriteClose(); } - return ret; + drained(); +} + +void RdmaConnector::stopped(Rdma::AsynchIO* aio) { + delete aio; + if (shutdownHandler) { + shutdownHandler->shutdown(); + } +} + +void RdmaConnector::drained() { + QPID_LOG(debug, "RdmaConnector::drained " << identifier); + if (aio) { + aio->stop(boost::bind(&RdmaConnector::stopped, this, aio)); + aio = 0; + } } void RdmaConnector::close() { - closeInternal(); + QPID_LOG(debug, "RdmaConnector::close " << identifier); + Mutex::ScopedLock l(pollingLock); + if (!polling) return; + polling = false; + if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::setInputHandler(InputHandler* handler){ @@ -259,11 +305,6 @@ void RdmaConnector::send(AMQFrame& frame) { if (notifyWrite && polling) aio->notifyPendingWrite(); } -void RdmaConnector::handleClosed() { - if (closeInternal() && shutdownHandler) - shutdownHandler->shutdown(); -} - // Called in IO thread. (write idle routine) // This is NOT only called in response to previously calling notifyPendingWrite void RdmaConnector::writebuff(Rdma::AsynchIO&) { @@ -340,10 +381,6 @@ void RdmaConnector::writeDataBlock(const AMQDataBlock& data) { aio->queueWrite(buff); } -void RdmaConnector::eof(Rdma::AsynchIO&) { - handleClosed(); -} - void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; |
