diff options
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 140 |
1 files changed, 104 insertions, 36 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 588d459b65..9244343ff8 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -49,8 +49,7 @@ namespace Rdma { recvBufferCount(rCount), xmitBufferCount(xCredit), outstandingWrites(0), - closed(false), - deleting(false), + draining(false), state(IDLE), readCallback(rc), idleCallback(ic), @@ -85,8 +84,11 @@ namespace Rdma { if ( outstandingWrites>0 ) QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); - // Turn off callbacks (before doing the deletes) - dataHandle.stopWatch(); + // Turn off callbacks if necessary (before doing the deletes) + if (state.get() != SHUTDOWN) { + QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown"); + dataHandle.stopWatch(); + } // The buffers ptr_deque automatically deletes all the buffers we've allocated // TODO: It might turn out to be more efficient in high connection loads to reuse the @@ -99,27 +101,58 @@ namespace Rdma { } // Mark for deletion/Delete this object when we have no outstanding writes - void AsynchIO::deferDelete() { + void AsynchIO::stop(NotifyCallback nc) { + State oldState; + State newState; + bool doReturn; + //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + do { + newState = oldState = state.get(); + doReturn = false; + if (outstandingWrites > 0 || (oldState != IDLE && oldState != DRAINED)) { + doReturn = true; + break; + } + + newState = SHUTDOWN; + + } while (!state.boolCompareAndSwap(oldState, newState)); + if (doReturn) { + notifyCallback = nc; + return; + } + dataHandle.stopWatch(); + // Callback, but don't store it - SHUTDOWN state means callback has been called + // we *are* allowed to delete the AsynchIO in this callback, so we have to return immediately + // after the callback + nc(*this); + } + + // Mark writing closed (so we don't accept any more writes or make any idle callbacks) + void AsynchIO::drainWriteQueue(NotifyCallback nc) { + draining = true; + State oldState; State newState; bool doReturn; //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - // It is safe to assign to deleting here as we either delete ourselves - // before leaving this function or deleting is set on exit do { newState = oldState = state.get(); doReturn = false; - if (outstandingWrites > 0 || oldState != IDLE) { - deleting = true; + if (oldState != IDLE) { doReturn = true; - } else{ - newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor + break; + } + + if (outstandingWrites == 0) { + newState = DRAINED; } } while (!state.boolCompareAndSwap(oldState, newState)); if (doReturn) { + notifyCallback = nc; return; } - delete this; + nc(*this); } void AsynchIO::queueWrite(Buffer* buff) { @@ -146,12 +179,6 @@ namespace Rdma { } } - // Mark now closed (so we don't accept any more writes or make any idle callbacks) - void AsynchIO::queueWriteClose() { - // Don't think we actually need to lock here as transition is 1 way only to closed - closed = true; - } - void AsynchIO::notifyPendingWrite() { // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not. // If we are then we just return as we know that we will eventually do the idle callback anyway. @@ -182,8 +209,13 @@ namespace Rdma { case IDLE: newState = NOTIFY_WRITE; break; - case DELETED: - assert(oldState!=DELETED); + case SHUTDOWN: + // This is not allowed - we can't make any more writes as we shut the connection down. + assert(oldState!=SHUTDOWN); + doReturn = true; + case DRAINED: + // This is not allowed - we can't make any more writes as we're draining the write queue. + assert(oldState!=DRAINED); doReturn = true; }; } while (!state.boolCompareAndSwap(oldState, newState)); @@ -220,7 +252,7 @@ namespace Rdma { action = NOTIFY; break; default: - assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED); + assert(oldState!=IDLE && oldState!=DATA && oldState!=SHUTDOWN); action = RETURN; } } while (!state.boolCompareAndSwap(oldState, newState)); @@ -238,8 +270,8 @@ namespace Rdma { return; case EXIT: // If we just processed completions we might need to delete ourselves - if (deleting && outstandingWrites == 0) { - delete this; + if (notifyCallback && outstandingWrites == 0) { + doStoppedCallback(); } return; } @@ -260,6 +292,8 @@ namespace Rdma { case IDLE: newState = DATA; break; + case DRAINED: + break; default: // Can't get here in DATA state as that would violate the serialisation rules assert( oldState!=DATA ); @@ -276,35 +310,45 @@ namespace Rdma { //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); do { newState = oldState = state.get(); - assert( oldState==DATA ); - newState = NOTIFY_WRITE; + switch (oldState) { + case DATA: + newState = NOTIFY_WRITE; + break; + case DRAINED: + break; + default: + assert( oldState==DATA || oldState==DRAINED); + } } while (!state.boolCompareAndSwap(oldState, newState)); - do { + while (newState==NOTIFY_WRITE) { doWriteCallback(); // qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); - bool doBreak; do { newState = oldState = state.get(); - doBreak = false; if ( oldState==NOTIFY_WRITE ) { newState = IDLE; - doBreak = true; } else { - // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered + // Can't get DATA/PENDING_DATA/DRAINED here as dataEvent cannot be reentered assert( oldState==PENDING_NOTIFY ); newState = NOTIFY_WRITE; } } while (!state.boolCompareAndSwap(oldState, newState)); - if (doBreak) { - break; + } + + // If we've got all the write confirmations and we're draining + if (draining) { + if (outstandingWrites == 0) { + doDrainedCallback(); + draining = false; } - } while (true); + return; + } // We might need to delete ourselves - if (deleting && outstandingWrites == 0) { - delete this; + if (notifyCallback && outstandingWrites == 0) { + doStoppedCallback(); } } @@ -418,6 +462,29 @@ namespace Rdma { } } + void AsynchIO::doDrainedCallback() { + NotifyCallback nc; + nc.swap(notifyCallback); + // Transition unconditionally to DRAINED + State oldState; + do { + oldState = state.get(); + } while (!state.boolCompareAndSwap(oldState, DRAINED)); + nc(*this); + } + + void AsynchIO::doStoppedCallback() { + dataHandle.stopWatch(); + NotifyCallback nc; + nc.swap(notifyCallback); + // Transition unconditionally to SHUTDOWN + State oldState; + do { + oldState = state.get(); + } while (!state.boolCompareAndSwap(oldState, SHUTDOWN)); + nc(*this); + } + Buffer* AsynchIO::getBuffer() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); assert(!bufferQueue.empty()); @@ -444,12 +511,13 @@ namespace Rdma { errorCallback(errc), disconnectedCallback(dc) { + QPID_LOG(debug, "RDMA: ci=" << ci << ": Creating ConnectionManager"); ci->nonblocking(); } ConnectionManager::~ConnectionManager() { - handle.stopWatch(); + QPID_LOG(debug, "RDMA: ci=" << ci << ": Deleting ConnectionManager"); } void ConnectionManager::start(Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr) { |
