From ff152807775cf3ad146742a59bbe44146cbb9a34 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Thu, 23 Dec 2010 17:11:48 +0000 Subject: Changes due to review comments from Doug Ledford: - Removed lock unsafe operation Rdma::QueuePair::bufferAvailable() and replaced the unavailable case with failing getBuffer(). - Improved asserts in the Rdma::QueuePair::getBuffer() code. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1052330 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/rdma/RdmaClient.cpp | 3 ++- cpp/src/qpid/sys/rdma/RdmaIO.h | 5 ----- cpp/src/qpid/sys/rdma/RdmaServer.cpp | 12 ++++++++---- cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 11 +++++------ cpp/src/qpid/sys/rdma/rdma_wrap.h | 3 --- 5 files changed, 15 insertions(+), 19 deletions(-) (limited to 'cpp/src/qpid/sys/rdma') diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 67c672f857..e53ebb0520 100644 --- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -93,8 +93,9 @@ Xor128Generator output; Xor128Generator input; void write(Rdma::AsynchIO& aio) { - while (aio.writable() && aio.bufferAvailable() && smsgs < target) { + while (aio.writable() && smsgs < target) { Rdma::Buffer* b = aio.getBuffer(); + if (!b) break; b->dataCount(msgsize); uint32_t* ip = reinterpret_cast(b->bytes()); uint32_t* lip = ip + b->dataCount() / sizeof(uint32_t); diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index adf27542fb..330c2395bd 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -87,7 +87,6 @@ namespace Rdma { void start(qpid::sys::Poller::shared_ptr poller); bool writable() const; - bool bufferAvailable() const; void queueWrite(Buffer* buff); void notifyPendingWrite(); void drainWriteQueue(NotifyCallback); @@ -134,10 +133,6 @@ namespace Rdma { return outstandingWrites; } - inline bool AsynchIO::bufferAvailable() const { - return qp->bufferAvailable(); - } - inline Buffer* AsynchIO::getBuffer() { return qp->getBuffer(); } diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index d924c388ec..33bb8247a1 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -79,10 +79,11 @@ void dataError(Rdma::AsynchIO&) { void idle(ConRec* cr, Rdma::AsynchIO& a) { // Need to make sure full is not called as it would reorder messages - while (!cr->queuedWrites.empty() && a.writable() && a.bufferAvailable()) { + while (!cr->queuedWrites.empty() && a.writable()) { + Rdma::Buffer* rbuf = a.getBuffer(); + if (!rbuf) break; Buffer* buf = cr->queuedWrites.front(); cr->queuedWrites.pop(); - Rdma::Buffer* rbuf = a.getBuffer(); std::copy(buf->bytes(), buf->bytes()+buf->byteCount(), rbuf->bytes()); rbuf->dataCount(buf->byteCount()); delete buf; @@ -92,8 +93,11 @@ void idle(ConRec* cr, Rdma::AsynchIO& a) { void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { // Echo data back - if (cr->queuedWrites.empty() && a.writable() && a.bufferAvailable()) { - Rdma::Buffer* buf = a.getBuffer(); + Rdma::Buffer* buf = 0; + if (cr->queuedWrites.empty() && a.writable()) { + buf = a.getBuffer(); + } + if (buf) { std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes()); buf->dataCount(b->dataCount()); a.queueWrite(buf); diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index ec6e6c6b99..a51244a7dc 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -184,9 +184,12 @@ namespace Rdma { Buffer* QueuePair::getBuffer() { qpid::sys::ScopedLock l(bufferLock); - assert(!freeBuffers.empty()); - Buffer* b = &sendBuffers[freeBuffers.back()]; + if (freeBuffers.empty()) + return 0; + int i = freeBuffers.back(); freeBuffers.pop_back(); + assert(i >= 0 && i < int(sendBuffers.size())); + Buffer* b = &sendBuffers[i]; b->dataCount(0); return b; } @@ -198,10 +201,6 @@ namespace Rdma { freeBuffers.push_back(i); } - bool QueuePair::bufferAvailable() const { - return !freeBuffers.empty(); - } - void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize) { assert(!rmr); diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index 1d72abcd03..a3cd584102 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -144,9 +144,6 @@ namespace Rdma { // Return buffer to pool after use void returnBuffer(Buffer* b); - // Check whether any buffers are available - bool bufferAvailable() const; - // Create and post recv buffers void allocateRecvBuffers(int recvBufferCount, int bufferSize); -- cgit v1.2.1