summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/RdmaIO.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp140
1 files changed, 104 insertions, 36 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 588d459b65..9244343ff8 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -49,8 +49,7 @@ namespace Rdma {
recvBufferCount(rCount),
xmitBufferCount(xCredit),
outstandingWrites(0),
- closed(false),
- deleting(false),
+ draining(false),
state(IDLE),
readCallback(rc),
idleCallback(ic),
@@ -85,8 +84,11 @@ namespace Rdma {
if ( outstandingWrites>0 )
QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished");
- // Turn off callbacks (before doing the deletes)
- dataHandle.stopWatch();
+ // Turn off callbacks if necessary (before doing the deletes)
+ if (state.get() != SHUTDOWN) {
+ QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue whilst not shutdown");
+ dataHandle.stopWatch();
+ }
// The buffers ptr_deque automatically deletes all the buffers we've allocated
// TODO: It might turn out to be more efficient in high connection loads to reuse the
@@ -99,27 +101,58 @@ namespace Rdma {
}
// Mark for deletion/Delete this object when we have no outstanding writes
- void AsynchIO::deferDelete() {
+ void AsynchIO::stop(NotifyCallback nc) {
+ State oldState;
+ State newState;
+ bool doReturn;
+ //qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
+ do {
+ newState = oldState = state.get();
+ doReturn = false;
+ if (outstandingWrites > 0 || (oldState != IDLE && oldState != DRAINED)) {
+ doReturn = true;
+ break;
+ }
+
+ newState = SHUTDOWN;
+
+ } while (!state.boolCompareAndSwap(oldState, newState));
+ if (doReturn) {
+ notifyCallback = nc;
+ return;
+ }
+ dataHandle.stopWatch();
+ // Callback, but don't store it - SHUTDOWN state means callback has been called
+ // we *are* allowed to delete the AsynchIO in this callback, so we have to return immediately
+ // after the callback
+ nc(*this);
+ }
+
+ // Mark writing closed (so we don't accept any more writes or make any idle callbacks)
+ void AsynchIO::drainWriteQueue(NotifyCallback nc) {
+ draining = true;
+
State oldState;
State newState;
bool doReturn;
//qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- // It is safe to assign to deleting here as we either delete ourselves
- // before leaving this function or deleting is set on exit
do {
newState = oldState = state.get();
doReturn = false;
- if (outstandingWrites > 0 || oldState != IDLE) {
- deleting = true;
+ if (oldState != IDLE) {
doReturn = true;
- } else{
- newState = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor
+ break;
+ }
+
+ if (outstandingWrites == 0) {
+ newState = DRAINED;
}
} while (!state.boolCompareAndSwap(oldState, newState));
if (doReturn) {
+ notifyCallback = nc;
return;
}
- delete this;
+ nc(*this);
}
void AsynchIO::queueWrite(Buffer* buff) {
@@ -146,12 +179,6 @@ namespace Rdma {
}
}
- // Mark now closed (so we don't accept any more writes or make any idle callbacks)
- void AsynchIO::queueWriteClose() {
- // Don't think we actually need to lock here as transition is 1 way only to closed
- closed = true;
- }
-
void AsynchIO::notifyPendingWrite() {
// As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not.
// If we are then we just return as we know that we will eventually do the idle callback anyway.
@@ -182,8 +209,13 @@ namespace Rdma {
case IDLE:
newState = NOTIFY_WRITE;
break;
- case DELETED:
- assert(oldState!=DELETED);
+ case SHUTDOWN:
+ // This is not allowed - we can't make any more writes as we shut the connection down.
+ assert(oldState!=SHUTDOWN);
+ doReturn = true;
+ case DRAINED:
+ // This is not allowed - we can't make any more writes as we're draining the write queue.
+ assert(oldState!=DRAINED);
doReturn = true;
};
} while (!state.boolCompareAndSwap(oldState, newState));
@@ -220,7 +252,7 @@ namespace Rdma {
action = NOTIFY;
break;
default:
- assert(oldState!=IDLE && oldState!=DATA && oldState!=DELETED);
+ assert(oldState!=IDLE && oldState!=DATA && oldState!=SHUTDOWN);
action = RETURN;
}
} while (!state.boolCompareAndSwap(oldState, newState));
@@ -238,8 +270,8 @@ namespace Rdma {
return;
case EXIT:
// If we just processed completions we might need to delete ourselves
- if (deleting && outstandingWrites == 0) {
- delete this;
+ if (notifyCallback && outstandingWrites == 0) {
+ doStoppedCallback();
}
return;
}
@@ -260,6 +292,8 @@ namespace Rdma {
case IDLE:
newState = DATA;
break;
+ case DRAINED:
+ break;
default:
// Can't get here in DATA state as that would violate the serialisation rules
assert( oldState!=DATA );
@@ -276,35 +310,45 @@ namespace Rdma {
//qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
do {
newState = oldState = state.get();
- assert( oldState==DATA );
- newState = NOTIFY_WRITE;
+ switch (oldState) {
+ case DATA:
+ newState = NOTIFY_WRITE;
+ break;
+ case DRAINED:
+ break;
+ default:
+ assert( oldState==DATA || oldState==DRAINED);
+ }
} while (!state.boolCompareAndSwap(oldState, newState));
- do {
+ while (newState==NOTIFY_WRITE) {
doWriteCallback();
// qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock);
- bool doBreak;
do {
newState = oldState = state.get();
- doBreak = false;
if ( oldState==NOTIFY_WRITE ) {
newState = IDLE;
- doBreak = true;
} else {
- // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered
+ // Can't get DATA/PENDING_DATA/DRAINED here as dataEvent cannot be reentered
assert( oldState==PENDING_NOTIFY );
newState = NOTIFY_WRITE;
}
} while (!state.boolCompareAndSwap(oldState, newState));
- if (doBreak) {
- break;
+ }
+
+ // If we've got all the write confirmations and we're draining
+ if (draining) {
+ if (outstandingWrites == 0) {
+ doDrainedCallback();
+ draining = false;
}
- } while (true);
+ return;
+ }
// We might need to delete ourselves
- if (deleting && outstandingWrites == 0) {
- delete this;
+ if (notifyCallback && outstandingWrites == 0) {
+ doStoppedCallback();
}
}
@@ -418,6 +462,29 @@ namespace Rdma {
}
}
+ void AsynchIO::doDrainedCallback() {
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ // Transition unconditionally to DRAINED
+ State oldState;
+ do {
+ oldState = state.get();
+ } while (!state.boolCompareAndSwap(oldState, DRAINED));
+ nc(*this);
+ }
+
+ void AsynchIO::doStoppedCallback() {
+ dataHandle.stopWatch();
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ // Transition unconditionally to SHUTDOWN
+ State oldState;
+ do {
+ oldState = state.get();
+ } while (!state.boolCompareAndSwap(oldState, SHUTDOWN));
+ nc(*this);
+ }
+
Buffer* AsynchIO::getBuffer() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
assert(!bufferQueue.empty());
@@ -444,12 +511,13 @@ namespace Rdma {
errorCallback(errc),
disconnectedCallback(dc)
{
+ QPID_LOG(debug, "RDMA: ci=" << ci << ": Creating ConnectionManager");
ci->nonblocking();
}
ConnectionManager::~ConnectionManager()
{
- handle.stopWatch();
+ QPID_LOG(debug, "RDMA: ci=" << ci << ": Deleting ConnectionManager");
}
void ConnectionManager::start(Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr) {