diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:05:26 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2010-10-12 16:05:26 +0000 |
| commit | 8f2c46ab09f89e1bd4d294c3a7289dbffedeac6b (patch) | |
| tree | a3ef8049d3e4e471e95a289c48abe2bbde2f9506 /cpp/src/qpid/sys/rdma/RdmaIO.cpp | |
| parent | 8fdd062794e5add44f7775e697d32d21cfcc41c2 (diff) | |
| download | qpid-python-8f2c46ab09f89e1bd4d294c3a7289dbffedeac6b.tar.gz | |
Improve the performance of the Rdma::AsynchIO by using a very
simple state machine to reduce the context switch for notifyPendingWrite()
by allowing it to "hijack" existing concurrent processing on an IO thread.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1021823 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 62 |
1 files changed, 57 insertions, 5 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 5616c30ae8..1caa9b7e72 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -29,6 +29,8 @@ using qpid::sys::SocketAddress; using qpid::sys::DispatchHandle; using qpid::sys::Poller; +using qpid::sys::ScopedLock; +using qpid::sys::Mutex; namespace Rdma { AsynchIO::AsynchIO( @@ -55,7 +57,7 @@ namespace Rdma { idleCallback(ic), fullCallback(fc), errorCallback(ec), - pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this)) + pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this)) { qp->nonblocking(); qp->notifyRecv(); @@ -74,7 +76,7 @@ namespace Rdma { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); // Turn off callbacks if necessary (before doing the deletes) - if (state.get() != STOPPED) { + if (state != STOPPED) { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); dataHandle.stopWatch(); } @@ -89,6 +91,7 @@ namespace Rdma { // Mark for deletion/Delete this object when we have no outstanding writes void AsynchIO::stop(NotifyCallback nc) { + ScopedLock<Mutex> l(stateLock); state = STOPPED; notifyCallback = nc; dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this)); @@ -140,15 +143,64 @@ namespace Rdma { } void AsynchIO::notifyPendingWrite() { - dataHandle.call(pendingWriteAction); + ScopedLock<Mutex> l(stateLock); + switch (state) { + case IDLE: + dataHandle.call(pendingWriteAction); + break; + case NOTIFY: + state = NOTIFY_PENDING; + break; + case NOTIFY_PENDING: + case STOPPED: + break; + } } void AsynchIO::dataEvent() { - if (state.get() == STOPPED) return; + { + ScopedLock<Mutex> l(stateLock); + + if (state == STOPPED) return; + state = NOTIFY_PENDING; + } processCompletions(); - doWriteCallback(); + writeEvent(); + } + + void AsynchIO::writeEvent() { + State newState; + do { + { + ScopedLock<Mutex> l(stateLock); + + switch (state) { + case STOPPED: + return; + default: + state = NOTIFY; + } + } + + doWriteCallback(); + + { + ScopedLock<Mutex> l(stateLock); + + newState = state; + switch (newState) { + case NOTIFY_PENDING: + state = NOTIFY; + break; + case STOPPED: + break; + default: + state = IDLE; + } + } + } while (newState == NOTIFY_PENDING); } void AsynchIO::processCompletions() { |
