summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-05-18 21:41:25 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-05-18 21:41:25 +0000
commit27920d16e70f590b49548877a2129a1d2162d985 (patch)
treee3b90b82b63c89ee03317d129192299074fae15f /cpp/src/qpid/client
parentbbf31a9b3113ad6d37ed24d2ce767dd5f830afa3 (diff)
downloadqpid-python-27920d16e70f590b49548877a2129a1d2162d985.tar.gz
Fix RDMA for upstream changes which now require notification on shutdown
differently from before git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945904 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/RdmaConnector.cpp95
1 files changed, 66 insertions, 29 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp
index a782caef6b..9be9e7127c 100644
--- a/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/cpp/src/qpid/client/RdmaConnector.cpp
@@ -75,14 +75,12 @@ class RdmaConnector : public Connector, public sys::Codec
framing::OutputHandler* output;
Rdma::AsynchIO* aio;
+ std::auto_ptr<Rdma::Connector> acon;
sys::Poller::shared_ptr poller;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
~RdmaConnector();
- void handleClosed();
- bool closeInternal();
-
// Callbacks
void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&);
void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType);
@@ -92,7 +90,9 @@ class RdmaConnector : public Connector, public sys::Codec
void readbuff(Rdma::AsynchIO&, Rdma::Buffer*);
void writebuff(Rdma::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
- void eof(Rdma::AsynchIO&);
+ void dataError(Rdma::AsynchIO&);
+ void drained();
+ void stopped(Rdma::AsynchIO* aio=0);
std::string identifier;
@@ -153,26 +153,33 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p,
QPID_LOG(debug, "RdmaConnector created for " << version);
}
+namespace {
+ void deleteAsynchIO(Rdma::AsynchIO& aio) {
+ delete &aio;
+ }
+}
+
RdmaConnector::~RdmaConnector() {
+ QPID_LOG(debug, "~RdmaConnector " << identifier);
close();
- if (aio) aio->deferDelete();
+ if (aio) aio->stop(deleteAsynchIO);
}
void RdmaConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(pollingLock);
assert(!polling);
- Rdma::Connector* c = new Rdma::Connector(
+ acon.reset(new Rdma::Connector(
Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
boost::bind(&RdmaConnector::disconnected, this, poller, _1),
- boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
+ boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)));
polling = true;
SocketAddress sa(host, boost::lexical_cast<std::string>(port));
- c->start(poller, sa);
+ acon->start(poller, sa);
}
// The following only gets run when connected
@@ -184,7 +191,7 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru
boost::bind(&RdmaConnector::readbuff, this, _1, _2),
boost::bind(&RdmaConnector::writebuff, this, _1),
0, // write buffers full
- boost::bind(&RdmaConnector::eof, this, _1)); // data error - just close connection
+ boost::bind(&RdmaConnector::dataError, this, _1));
identifier = str(format("[%1% %2%]") % ci->getLocalName() % ci->getPeerName());
ProtocolInitiation init(version);
@@ -194,31 +201,70 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru
}
void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, Rdma::ErrorType) {
- QPID_LOG(trace, "Connection Error " << identifier);
- eof(*aio);
+ QPID_LOG(debug, "Connection Error " << identifier);
+ {
+ Mutex::ScopedLock l(pollingLock);
+ // If we're closed already then we'll get to drain() anyway
+ if (!polling) return;
+ polling = false;
+ }
+ stopped();
}
void RdmaConnector::disconnected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&) {
- eof(*aio);
+ QPID_LOG(debug, "Connection disconnected " << identifier);
+ {
+ Mutex::ScopedLock l(pollingLock);
+ // If we're closed already then we'll get to drain() anyway
+ if (!polling) return;
+ polling = false;
+ }
+ drained();
}
void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams& cp) {
- QPID_LOG(trace, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
- eof(*aio);
+ QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
+ {
+ Mutex::ScopedLock l(pollingLock);
+ // If we're closed already then we'll get to drain() anyway
+ if (!polling) return;
+ polling = false;
+ }
+ stopped();
}
-bool RdmaConnector::closeInternal() {
+void RdmaConnector::dataError(Rdma::AsynchIO&) {
+ QPID_LOG(debug, "Data Error " << identifier);
+ {
Mutex::ScopedLock l(pollingLock);
- bool ret = polling;
+ // If we're closed already then we'll get to drain() anyway
+ if (!polling) return;
polling = false;
- if (ret) {
- if (aio) aio->queueWriteClose();
}
- return ret;
+ drained();
+}
+
+void RdmaConnector::stopped(Rdma::AsynchIO* aio) {
+ delete aio;
+ if (shutdownHandler) {
+ shutdownHandler->shutdown();
+ }
+}
+
+void RdmaConnector::drained() {
+ QPID_LOG(debug, "RdmaConnector::drained " << identifier);
+ if (aio) {
+ aio->stop(boost::bind(&RdmaConnector::stopped, this, aio));
+ aio = 0;
+ }
}
void RdmaConnector::close() {
- closeInternal();
+ QPID_LOG(debug, "RdmaConnector::close " << identifier);
+ Mutex::ScopedLock l(pollingLock);
+ if (!polling) return;
+ polling = false;
+ if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
}
void RdmaConnector::setInputHandler(InputHandler* handler){
@@ -259,11 +305,6 @@ void RdmaConnector::send(AMQFrame& frame) {
if (notifyWrite && polling) aio->notifyPendingWrite();
}
-void RdmaConnector::handleClosed() {
- if (closeInternal() && shutdownHandler)
- shutdownHandler->shutdown();
-}
-
// Called in IO thread. (write idle routine)
// This is NOT only called in response to previously calling notifyPendingWrite
void RdmaConnector::writebuff(Rdma::AsynchIO&) {
@@ -340,10 +381,6 @@ void RdmaConnector::writeDataBlock(const AMQDataBlock& data) {
aio->queueWrite(buff);
}
-void RdmaConnector::eof(Rdma::AsynchIO&) {
- handleClosed();
-}
-
void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl)
{
securityLayer = sl;