From 8f2c46ab09f89e1bd4d294c3a7289dbffedeac6b Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Tue, 12 Oct 2010 16:05:26 +0000 Subject: Improve the performance of the Rdma::AsynchIO by using a very simple state machine to reduce the context switch for notifyPendingWrite() by allowing it to "hijack" existing concurrent processing on an IO thread. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1021823 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/rdma/RdmaIO.cpp | 62 ++++++++++++++++++++++++++++++++++++---- cpp/src/qpid/sys/rdma/RdmaIO.h | 7 +++-- 2 files changed, 62 insertions(+), 7 deletions(-) (limited to 'cpp') diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 5616c30ae8..1caa9b7e72 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -29,6 +29,8 @@ using qpid::sys::SocketAddress; using qpid::sys::DispatchHandle; using qpid::sys::Poller; +using qpid::sys::ScopedLock; +using qpid::sys::Mutex; namespace Rdma { AsynchIO::AsynchIO( @@ -55,7 +57,7 @@ namespace Rdma { idleCallback(ic), fullCallback(fc), errorCallback(ec), - pendingWriteAction(boost::bind(&AsynchIO::doWriteCallback, this)) + pendingWriteAction(boost::bind(&AsynchIO::writeEvent, this)) { qp->nonblocking(); qp->notifyRecv(); @@ -74,7 +76,7 @@ namespace Rdma { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); // Turn off callbacks if necessary (before doing the deletes) - if (state.get() != STOPPED) { + if (state != STOPPED) { QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); dataHandle.stopWatch(); } @@ -89,6 +91,7 @@ namespace Rdma { // Mark for deletion/Delete this object when we have no outstanding writes void AsynchIO::stop(NotifyCallback nc) { + ScopedLock l(stateLock); state = STOPPED; notifyCallback = nc; dataHandle.call(boost::bind(&AsynchIO::doStoppedCallback, this)); @@ -140,15 +143,64 @@ namespace Rdma { } void AsynchIO::notifyPendingWrite() { - dataHandle.call(pendingWriteAction); + ScopedLock l(stateLock); + switch (state) { + case IDLE: + dataHandle.call(pendingWriteAction); + break; + case NOTIFY: + state = NOTIFY_PENDING; + break; + case NOTIFY_PENDING: + case STOPPED: + break; + } } void AsynchIO::dataEvent() { - if (state.get() == STOPPED) return; + { + ScopedLock l(stateLock); + + if (state == STOPPED) return; + state = NOTIFY_PENDING; + } processCompletions(); - doWriteCallback(); + writeEvent(); + } + + void AsynchIO::writeEvent() { + State newState; + do { + { + ScopedLock l(stateLock); + + switch (state) { + case STOPPED: + return; + default: + state = NOTIFY; + } + } + + doWriteCallback(); + + { + ScopedLock l(stateLock); + + newState = state; + switch (newState) { + case NOTIFY_PENDING: + state = NOTIFY; + break; + case STOPPED: + break; + default: + state = IDLE; + } + } + } while (newState == NOTIFY_PENDING); } void AsynchIO::processCompletions() { diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 62779e4e78..70c1a2a76a 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -26,6 +26,7 @@ #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/SocketAddress.h" #include @@ -51,8 +52,9 @@ namespace Rdma { int xmitBufferCount; int outstandingWrites; bool draining; - enum State {IDLE, STOPPED}; - qpid::sys::AtomicValue state; + enum State {IDLE, NOTIFY, NOTIFY_PENDING, STOPPED}; + State state; + qpid::sys::Mutex stateLock; QueuePair::intrusive_ptr qp; qpid::sys::DispatchHandleRef dataHandle; @@ -101,6 +103,7 @@ namespace Rdma { const static int IgnoreData = 0x10000000; // Message contains no application data void dataEvent(); + void writeEvent(); void processCompletions(); void doWriteCallback(); void checkDrained(); -- cgit v1.2.1