summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-10-12 16:04:47 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-10-12 16:04:47 +0000
commit4a73c1b04d155a4d6c6e6a7b822aa456f3425689 (patch)
tree7f7978b468611cc86846bfb0e139d1c791facf8e /cpp/src/qpid/client
parentc1ceb35c3d386e6bae5bc4dafd4e635805a97451 (diff)
downloadqpid-python-4a73c1b04d155a4d6c6e6a7b822aa456f3425689.tar.gz
Serialise close into the data callbacks:
Rejig Rdma::ConnectionManager to have a stop function with a callback and use this to ensure that the Rdma::Connector used by qpid::sys::RdmaConnector is correctly deleted only after it has been actually stopped git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1021819 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/RdmaConnector.cpp84
1 files changed, 50 insertions, 34 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp
index 208d42f672..026952bd99 100644
--- a/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/cpp/src/qpid/client/RdmaConnector.cpp
@@ -73,7 +73,7 @@ class RdmaConnector : public Connector, public sys::Codec
framing::OutputHandler* output;
Rdma::AsynchIO* aio;
- std::auto_ptr<Rdma::Connector> acon;
+ Rdma::Connector* acon;
sys::Poller::shared_ptr poller;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
@@ -82,7 +82,6 @@ class RdmaConnector : public Connector, public sys::Codec
// Callbacks
void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&);
void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType);
- void disconnectAction();
void disconnected();
void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&);
@@ -91,7 +90,8 @@ class RdmaConnector : public Connector, public sys::Codec
void writeDataBlock(const framing::AMQDataBlock& data);
void dataError(Rdma::AsynchIO&);
void drained();
- void stopped(Rdma::AsynchIO* aio=0);
+ void connectionStopped(Rdma::Connector* acon);
+ void dataStopped(Rdma::AsynchIO* aio);
std::string identifier;
@@ -147,6 +147,7 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p,
polling(false),
shutdownHandler(0),
aio(0),
+ acon(0),
poller(p)
{
QPID_LOG(debug, "RdmaConnector created for " << version);
@@ -156,14 +157,20 @@ namespace {
void deleteAsynchIO(Rdma::AsynchIO& aio) {
delete &aio;
}
+
+ void deleteConnector(Rdma::ConnectionManager& con) {
+ delete &con;
+ }
}
RdmaConnector::~RdmaConnector() {
QPID_LOG(debug, "~RdmaConnector " << identifier);
- close();
if (aio) {
aio->stop(deleteAsynchIO);
}
+ if (acon) {
+ acon->stop(deleteConnector);
+ }
if (shutdownHandler) {
shutdownHandler->shutdown();
}
@@ -173,12 +180,12 @@ void RdmaConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(pollingLock);
assert(!polling);
- acon.reset(new Rdma::Connector(
+ acon = new Rdma::Connector(
Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
boost::bind(&RdmaConnector::disconnected, this),
- boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)));
+ boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
polling = true;
@@ -211,11 +218,10 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru
}
{
Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to stopped() anyway
- if (!polling) return;
+ assert(polling);
polling = false;
}
- stopped();
+ connectionStopped(acon);
}
void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) {
@@ -226,10 +232,10 @@ void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::i
if (!polling) return;
polling = false;
}
- stopped();
+ connectionStopped(acon);
}
-void RdmaConnector::disconnectAction() {
+void RdmaConnector::disconnected() {
QPID_LOG(debug, "Connection disconnected " << identifier);
{
Mutex::ScopedLock l(pollingLock);
@@ -237,11 +243,8 @@ void RdmaConnector::disconnectAction() {
if (!polling) return;
polling = false;
}
- drained();
-}
-
-void RdmaConnector::disconnected() {
- aio->requestCallback(boost::bind(&RdmaConnector::disconnectAction, this));
+ // Make sure that all the disconnected actions take place on the data "thread"
+ aio->requestCallback(boost::bind(&RdmaConnector::drained, this));
}
void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) {
@@ -252,7 +255,7 @@ void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusiv
if (!polling) return;
polling = false;
}
- stopped();
+ connectionStopped(acon);
}
void RdmaConnector::dataError(Rdma::AsynchIO&) {
@@ -266,35 +269,48 @@ void RdmaConnector::dataError(Rdma::AsynchIO&) {
drained();
}
-void RdmaConnector::stopped(Rdma::AsynchIO* a) {
- QPID_LOG(debug, "RdmaConnector::stopped " << identifier);
- assert(!polling);
- aio = 0;
- delete a;
- if (shutdownHandler) {
- ShutdownHandler* s = shutdownHandler;
- shutdownHandler = 0;
- s->shutdown();
+void RdmaConnector::close() {
+ QPID_LOG(debug, "RdmaConnector::close " << identifier);
+ {
+ Mutex::ScopedLock l(pollingLock);
+ if (!polling) return;
+ polling = false;
}
+ if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
}
void RdmaConnector::drained() {
QPID_LOG(debug, "RdmaConnector::drained " << identifier);
assert(!polling);
- acon->stop();
if (aio) {
Rdma::AsynchIO* a = aio;
aio = 0;
- a->stop(boost::bind(&RdmaConnector::stopped, this, a));
+ a->stop(boost::bind(&RdmaConnector::dataStopped, this, a));
}
}
-void RdmaConnector::close() {
- QPID_LOG(debug, "RdmaConnector::close " << identifier);
- Mutex::ScopedLock l(pollingLock);
- if (!polling) return;
- polling = false;
- if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
+void RdmaConnector::dataStopped(Rdma::AsynchIO* a) {
+ QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier);
+ assert(!polling);
+ aio = 0;
+ delete a;
+ if (acon) {
+ Rdma::Connector* c = acon;
+ acon = 0;
+ c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c));
+ }
+}
+
+void RdmaConnector::connectionStopped(Rdma::Connector* c) {
+ QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier);
+ assert(!polling);
+ acon = 0;
+ delete c;
+ if (shutdownHandler) {
+ ShutdownHandler* s = shutdownHandler;
+ shutdownHandler = 0;
+ s->shutdown();
+ }
}
void RdmaConnector::setInputHandler(InputHandler* handler){