diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:04:47 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:04:47 +0000 |
| commit | 4a73c1b04d155a4d6c6e6a7b822aa456f3425689 (patch) | |
| tree | 7f7978b468611cc86846bfb0e139d1c791facf8e /cpp/src/qpid/client | |
| parent | c1ceb35c3d386e6bae5bc4dafd4e635805a97451 (diff) | |
| download | qpid-python-4a73c1b04d155a4d6c6e6a7b822aa456f3425689.tar.gz | |
Serialise close into the data callbacks:
Rejig Rdma::ConnectionManager to have a stop function with a callback and
use this to ensure that the Rdma::Connector used by qpid::sys::RdmaConnector
is correctly deleted only after it has been actually stopped
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1021819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/RdmaConnector.cpp | 84 |
1 files changed, 50 insertions, 34 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp index 208d42f672..026952bd99 100644 --- a/cpp/src/qpid/client/RdmaConnector.cpp +++ b/cpp/src/qpid/client/RdmaConnector.cpp @@ -73,7 +73,7 @@ class RdmaConnector : public Connector, public sys::Codec framing::OutputHandler* output; Rdma::AsynchIO* aio; - std::auto_ptr<Rdma::Connector> acon; + Rdma::Connector* acon; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; @@ -82,7 +82,6 @@ class RdmaConnector : public Connector, public sys::Codec // Callbacks void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType); - void disconnectAction(); void disconnected(); void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&); @@ -91,7 +90,8 @@ class RdmaConnector : public Connector, public sys::Codec void writeDataBlock(const framing::AMQDataBlock& data); void dataError(Rdma::AsynchIO&); void drained(); - void stopped(Rdma::AsynchIO* aio=0); + void connectionStopped(Rdma::Connector* acon); + void dataStopped(Rdma::AsynchIO* aio); std::string identifier; @@ -147,6 +147,7 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p, polling(false), shutdownHandler(0), aio(0), + acon(0), poller(p) { QPID_LOG(debug, "RdmaConnector created for " << version); @@ -156,14 +157,20 @@ namespace { void deleteAsynchIO(Rdma::AsynchIO& aio) { delete &aio; } + + void deleteConnector(Rdma::ConnectionManager& con) { + delete &con; + } } RdmaConnector::~RdmaConnector() { QPID_LOG(debug, "~RdmaConnector " << identifier); - close(); if (aio) { aio->stop(deleteAsynchIO); } + if (acon) { + acon->stop(deleteConnector); + } if (shutdownHandler) { shutdownHandler->shutdown(); } @@ -173,12 +180,12 @@ void RdmaConnector::connect(const std::string& host, int port){ Mutex::ScopedLock l(pollingLock); assert(!polling); - acon.reset(new Rdma::Connector( + acon = new Rdma::Connector( Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES), boost::bind(&RdmaConnector::connected, this, poller, _1, _2), boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2), boost::bind(&RdmaConnector::disconnected, this), - boost::bind(&RdmaConnector::rejected, this, poller, _1, _2))); + boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)); polling = true; @@ -211,11 +218,10 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru } { Mutex::ScopedLock l(pollingLock); - // If we're closed already then we'll get to stopped() anyway - if (!polling) return; + assert(polling); polling = false; } - stopped(); + connectionStopped(acon); } void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) { @@ -226,10 +232,10 @@ void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::i if (!polling) return; polling = false; } - stopped(); + connectionStopped(acon); } -void RdmaConnector::disconnectAction() { +void RdmaConnector::disconnected() { QPID_LOG(debug, "Connection disconnected " << identifier); { Mutex::ScopedLock l(pollingLock); @@ -237,11 +243,8 @@ void RdmaConnector::disconnectAction() { if (!polling) return; polling = false; } - drained(); -} - -void RdmaConnector::disconnected() { - aio->requestCallback(boost::bind(&RdmaConnector::disconnectAction, this)); + // Make sure that all the disconnected actions take place on the data "thread" + aio->requestCallback(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) { @@ -252,7 +255,7 @@ void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusiv if (!polling) return; polling = false; } - stopped(); + connectionStopped(acon); } void RdmaConnector::dataError(Rdma::AsynchIO&) { @@ -266,35 +269,48 @@ void RdmaConnector::dataError(Rdma::AsynchIO&) { drained(); } -void RdmaConnector::stopped(Rdma::AsynchIO* a) { - QPID_LOG(debug, "RdmaConnector::stopped " << identifier); - assert(!polling); - aio = 0; - delete a; - if (shutdownHandler) { - ShutdownHandler* s = shutdownHandler; - shutdownHandler = 0; - s->shutdown(); +void RdmaConnector::close() { + QPID_LOG(debug, "RdmaConnector::close " << identifier); + { + Mutex::ScopedLock l(pollingLock); + if (!polling) return; + polling = false; } + if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); } void RdmaConnector::drained() { QPID_LOG(debug, "RdmaConnector::drained " << identifier); assert(!polling); - acon->stop(); if (aio) { Rdma::AsynchIO* a = aio; aio = 0; - a->stop(boost::bind(&RdmaConnector::stopped, this, a)); + a->stop(boost::bind(&RdmaConnector::dataStopped, this, a)); } } -void RdmaConnector::close() { - QPID_LOG(debug, "RdmaConnector::close " << identifier); - Mutex::ScopedLock l(pollingLock); - if (!polling) return; - polling = false; - if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this)); +void RdmaConnector::dataStopped(Rdma::AsynchIO* a) { + QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier); + assert(!polling); + aio = 0; + delete a; + if (acon) { + Rdma::Connector* c = acon; + acon = 0; + c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c)); + } +} + +void RdmaConnector::connectionStopped(Rdma::Connector* c) { + QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier); + assert(!polling); + acon = 0; + delete c; + if (shutdownHandler) { + ShutdownHandler* s = shutdownHandler; + shutdownHandler = 0; + s->shutdown(); + } } void RdmaConnector::setInputHandler(InputHandler* handler){ |
