From 8f2c46ab09f89e1bd4d294c3a7289dbffedeac6b Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Tue, 12 Oct 2010 16:05:26 +0000 Subject: Improve the performance of the Rdma::AsynchIO by using a very simple state machine to reduce the context switch for notifyPendingWrite() by allowing it to "hijack" existing concurrent processing on an IO thread. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1021823 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/rdma/RdmaIO.cpp | 62 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 5 deletions(-) (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.cpp') diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 5616c30ae8..1caa9b7e72 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -29,6 +29,8 @@ using qpid::sys::SocketAddress; using qpid::sys::DispatchHandle; using qpid::sys::Poller; +using qpid::sys::ScopedLock; +using qpid::sys::Mutex; namespace Rdma { AsynchIO::AsynchIO( @@ -55,7 +57,7 @@ namespace Rdma { idleCallback(ic), fullCallback(fc), errorCallback(ec), - pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this)) + pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this)) { qp->nonblocking(); qp->notifyRecv(); @@ -74,7 +76,7 @@ namespace Rdma { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); // Turn off callbacks if necessary (before doing the deletes) - if (state.get() != STOPPED) { + if (state != STOPPED) { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); dataHandle.stopWatch(); } @@ -89,6 +91,7 @@ namespace Rdma { // Mark for deletion/Delete this object when we have no outstanding writes void AsynchIO::stop(NotifyCallback nc) { + ScopedLock l(stateLock); state = STOPPED; notifyCallback = nc; dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this)); @@ -140,15 +143,64 @@ namespace Rdma { } void AsynchIO::notifyPendingWrite() { - dataHandle.call(pendingWriteAction); + ScopedLock l(stateLock); + switch (state) { + case IDLE: + dataHandle.call(pendingWriteAction); + break; + case NOTIFY: + state = NOTIFY_PENDING; + break; + case NOTIFY_PENDING: + case STOPPED: + break; + } } void AsynchIO::dataEvent() { - if (state.get() == STOPPED) return; + { + ScopedLock l(stateLock); + + if (state == STOPPED) return; + state = NOTIFY_PENDING; + } processCompletions(); - doWriteCallback(); + writeEvent(); + } + + void AsynchIO::writeEvent() { + State newState; + do { + { + ScopedLock l(stateLock); + + switch (state) { + case STOPPED: + return; + default: + state = NOTIFY; + } + } + + doWriteCallback(); + + { + ScopedLock l(stateLock); + + newState = state; + switch (newState) { + case NOTIFY_PENDING: + state = NOTIFY; + break; + case STOPPED: + break; + default: + state = IDLE; + } + } + } while (newState == NOTIFY_PENDING); } void AsynchIO::processCompletions() { -- cgit v1.2.1