diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-09-19 14:15:54 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-09-19 14:15:54 +0000 |
| commit | 7afcca6c7f08224ad5c7b44d8215f4c68c08dd65 (patch) | |
| tree | a5e74c27844ce87319f7f2c482ce4beddc7bb0c5 /cpp/src/qpid/sys/rdma | |
| parent | efa73f05b8b5e300c46ff6ab78c334f5d8b7fa2b (diff) | |
| download | qpid-python-7afcca6c7f08224ad5c7b44d8215f4c68c08dd65.tar.gz | |
RDMA bugfixes:
- Changed Rdma connection creation to allocate all necessary buffer
memory immediately. This has the effect that no later buffer allocations happen
which can fail so that once accepted connections won't fail because of lack of
locked memory.
- Fixed connection logic so we reject a new connection if we can't create the necessary
handlers rather than kill the entire broker (this includes not enough locked memory)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 28 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 7 |
2 files changed, 23 insertions, 12 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index dd4fbefcaf..e3dc0cbf8f 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -62,11 +62,21 @@ namespace Rdma { // Prepost some recv buffers before we go any further for (int i = 0; i<recvBufferCount; ++i) { + // Allocate recv buffer Buffer* b = qp->createBuffer(bufferSize); buffers.push_front(b); b->dataCount = b->byteCount; qp->postRecv(b); } + + for (int i = 0; i<xmitBufferCount; ++i) { + // Allocate xmit buffer + Buffer* b = qp->createBuffer(bufferSize); + buffers.push_front(b); + bufferQueue.push_front(b); + b->dataCount = 0; + b->dataStart = 0; + } } AsynchIO::~AsynchIO() { @@ -378,18 +388,12 @@ namespace Rdma { Buffer* AsynchIO::getBuffer() { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); - if (bufferQueue.empty()) { - Buffer* b = qp->createBuffer(bufferSize); - buffers.push_front(b); - return b; - } else { - Buffer* b = bufferQueue.front(); - bufferQueue.pop_front(); - b->dataCount = 0; - b->dataStart = 0; - return b; - } - + assert(!bufferQueue.empty()); + Buffer* b = bufferQueue.front(); + bufferQueue.pop_front(); + b->dataCount = 0; + b->dataStart = 0; + return b; } void AsynchIO::returnBuffer(Buffer* b) { diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 8b1422a1af..29132b8967 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -65,6 +65,9 @@ namespace Rdma { ErrorCallback errorCallback; public: + // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use + // for buffers both read and write (allocate half to each up front) and fail if we cannot allocate that much + // locked memory AsynchIO( QueuePair::intrusive_ptr q, int size, @@ -78,6 +81,7 @@ namespace Rdma { void start(qpid::sys::Poller::shared_ptr poller); bool writable() const; + bool bufferAvailable() const; void queueWrite(Buffer* buff); void notifyPendingWrite(); void queueWriteClose(); @@ -109,6 +113,9 @@ namespace Rdma { return outstandingWrites; } + inline bool AsynchIO::bufferAvailable() const { + return !bufferQueue.empty(); + } // These are the parameters necessary to start the conversation // * Each peer HAS to allocate buffers of the size of the maximum receive from its peer // * Each peer HAS to know the initial "credit" it has for transmitting to its peer |
