From 3e33a73b2f351be2c4d7bbf71eab6320bebb8204 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Tue, 23 Sep 2008 18:43:08 +0000 Subject: Removed the state lock from the RdmaIO code git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@698276 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/AtomicValue_gcc.h | 2 +- cpp/src/qpid/sys/rdma/RdmaIO.cpp | 217 +++++++++++++++++++++---------------- cpp/src/qpid/sys/rdma/RdmaIO.h | 6 +- 3 files changed, 129 insertions(+), 96 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/sys/AtomicValue_gcc.h b/cpp/src/qpid/sys/AtomicValue_gcc.h index da60edad65..d022b07c1d 100644 --- a/cpp/src/qpid/sys/AtomicValue_gcc.h +++ b/cpp/src/qpid/sys/AtomicValue_gcc.h @@ -57,7 +57,7 @@ class AtomicValue /** If current value == testval then set to newval. Returns true if the swap was performed. */ bool boolCompareAndSwap(T testval, T newval) { return __sync_bool_compare_and_swap(&value, testval, newval); } - T get() const { return const_cast*>(this)->fetchAndAdd(0); } + T get() const { return const_cast*>(this)->fetchAndAdd(static_cast(0)); } private: T value; diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index e3dc0cbf8f..77e766dd79 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -99,14 +99,25 @@ namespace Rdma { // Mark for deletion/Delete this object when we have no outstanding writes void AsynchIO::deferDelete() { - { - qpid::sys::ScopedLock l(stateLock); - if (outstandingWrites > 0 || state != IDLE) { - deleting = true; + State oldState; + State newState; + bool doReturn; + //qpid::sys::ScopedLock l(stateLock); + // It is safe to assign to deleting here as we either delete ourselves + // before leaving this function or deleting is set on exit + do { + newState = oldState = state.get(); + doReturn = false; + if (outstandingWrites > 0 || oldState != IDLE) { + deleting = true; + doReturn = true; + } else{ + newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor + } + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { return; } - state = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor - } delete this; } @@ -136,7 +147,7 @@ namespace Rdma { // Mark now closed (so we don't accept any more writes or make any idle callbacks) void AsynchIO::queueWriteClose() { - qpid::sys::ScopedLock l(stateLock); + // Don't think we actually need to lock here as transition is 1 way only to closed closed = true; } @@ -144,130 +155,150 @@ namespace Rdma { // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not. // If we are then we just return as we know that we will eventually do the idle callback anyway. // - { - qpid::sys::ScopedLock l(stateLock); + // qpid::sys::ScopedLock l(stateLock); // We can get here in any state (as the caller could be in any thread) - switch (state) { - case NOTIFY_WRITE: - case PENDING_NOTIFY: - // We only need to note a pending notify if we're already doing a notify as data processing - // is always followed by write notification processing - state = PENDING_NOTIFY; - return; - case PENDING_DATA: - return; - case DATA: - // Only need to return here as data processing will do the idleCallback itself anyway + State oldState; + State newState; + bool doReturn; + do { + newState = oldState = state.get(); + doReturn = false; + switch (oldState) { + case NOTIFY_WRITE: + case PENDING_NOTIFY: + // We only need to note a pending notify if we're already doing a notify as data processing + // is always followed by write notification processing + newState = PENDING_NOTIFY; + doReturn = true; + break; + case PENDING_DATA: + doReturn = true; + break; + case DATA: + // Only need to return here as data processing will do the idleCallback itself anyway + doReturn = true; + break; + case IDLE: + newState = NOTIFY_WRITE; + break; + case DELETED: + assert(oldState!=DELETED); + doReturn = true; + }; + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { return; - case IDLE: - state = NOTIFY_WRITE; - break; - case DELETED: - assert(state!=DELETED); - } } doWriteCallback(); // Keep track of what we need to do so that we can release the lock - enum {COMPLETION, NOTIFY} action; - { - qpid::sys::ScopedLock l(stateLock); + enum {COMPLETION, NOTIFY, RETURN, EXIT} action; // If there was pending data whilst we were doing this, process it now - switch (state) { - case NOTIFY_WRITE: - state = IDLE; - return; - case PENDING_DATA: - action = COMPLETION; - break; - case PENDING_NOTIFY: - action = NOTIFY; - break; - default: - assert(state!=IDLE && state!=DATA && state!=DELETED); - return; - } - // Using NOTIFY_WRITE for both cases is a bit strange, but we're making sure we get the + // + // Using NOTIFY_WRITE for both NOTIFY & COMPLETION is a bit strange, but we're making sure we get the // correct result if we reenter notifyPendingWrite(), in which case we want to // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks // not IDLE) - state = NOTIFY_WRITE; - } do { + //qpid::sys::ScopedLock l(stateLock); + do { + newState = oldState = state.get(); + action = RETURN; // Anything but COMPLETION + switch (oldState) { + case NOTIFY_WRITE: + newState = IDLE; + action = (action == COMPLETION) ? EXIT : RETURN; + break; + case PENDING_DATA: + newState = NOTIFY_WRITE; + action = COMPLETION; + break; + case PENDING_NOTIFY: + newState = NOTIFY_WRITE; + action = NOTIFY; + break; + default: + assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED); + action = RETURN; + } + } while (!state.boolCompareAndSwap(oldState, newState)); + // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state // so that we do need to process completions or notifications now switch (action) { case COMPLETION: processCompletions(); + // Fall through case NOTIFY: doWriteCallback(); break; - } - { - qpid::sys::ScopedLock l(stateLock); - switch (state) { - case NOTIFY_WRITE: - state = IDLE; - goto exit; - case PENDING_DATA: - action = COMPLETION; - break; - case PENDING_NOTIFY: - action = NOTIFY; - break; - default: - assert(state!=IDLE && state!=DATA && state!=DELETED); + case RETURN: + return; + case EXIT: + // If we just processed completions we might need to delete ourselves + if (deleting && outstandingWrites == 0) { + delete this; + } return; - } - state = NOTIFY_WRITE; } } while (true); - exit: - // If we just processed completions we might need to delete ourselves - if (action == COMPLETION && deleting && outstandingWrites == 0) { - delete this; - } } void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) { // Keep track of writable notifications - { - qpid::sys::ScopedLock l(stateLock); - // We're already processing a notification - switch (state) { - case IDLE: - break; - default: - state = PENDING_DATA; + // qpid::sys::ScopedLock l(stateLock); + State oldState; + State newState; + bool doReturn; + do { + newState = oldState = state.get(); + doReturn = false; + // We're already processing a notification + switch (oldState) { + case IDLE: + newState = DATA; + break; + default: + // Can't get here in DATA state as that would violate the serialisation rules + assert( oldState!=DATA ); + newState = PENDING_DATA; + doReturn = true; + } + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { return; } - // Can't get here in DATA state as that would violate the serialisation rules - assert( state==IDLE ); - state = DATA; - } processCompletions(); - { - qpid::sys::ScopedLock l(stateLock); - assert( state==DATA ); - state = NOTIFY_WRITE; - } + //qpid::sys::ScopedLock l(stateLock); + do { + newState = oldState = state.get(); + assert( oldState==DATA ); + newState = NOTIFY_WRITE; + } while (!state.boolCompareAndSwap(oldState, newState)); do { doWriteCallback(); - { - qpid::sys::ScopedLock l(stateLock); - if ( state==NOTIFY_WRITE ) { - state = IDLE; + // qpid::sys::ScopedLock l(stateLock); + bool doBreak; + do { + newState = oldState = state.get(); + doBreak = false; + if ( oldState==NOTIFY_WRITE ) { + newState = IDLE; + doBreak = true; + } else { + // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered + assert( oldState==PENDING_NOTIFY ); + newState = NOTIFY_WRITE; + } + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doBreak) { break; } - // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered - assert( state==PENDING_NOTIFY ); - state = NOTIFY_WRITE; - } } while (true); // We might need to delete ourselves diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 29132b8967..57bf735307 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -23,6 +23,7 @@ #include "rdma_wrap.h" +#include "qpid/sys/AtomicValue.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Mutex.h" @@ -53,8 +54,9 @@ namespace Rdma { int outstandingWrites; bool closed; // TODO: Perhaps (probably) this state can be merged with the following... bool deleting; // TODO: Perhaps (probably) this state can be merged with the following... - enum { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED } state; - qpid::sys::Mutex stateLock; + enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED }; + qpid::sys::AtomicValue state; + //qpid::sys::Mutex stateLock; std::deque bufferQueue; qpid::sys::Mutex bufferQueueLock; boost::ptr_deque buffers; -- cgit v1.2.1