summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-09-19 14:15:54 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-09-19 14:15:54 +0000
commit7afcca6c7f08224ad5c7b44d8215f4c68c08dd65 (patch)
treea5e74c27844ce87319f7f2c482ce4beddc7bb0c5 /cpp/src/qpid/sys/rdma
parentefa73f05b8b5e300c46ff6ab78c334f5d8b7fa2b (diff)
downloadqpid-python-7afcca6c7f08224ad5c7b44d8215f4c68c08dd65.tar.gz
RDMA bugfixes:
- Changed Rdma connection creation to allocate all necessary buffer memory immediately. This has the effect that no later buffer allocations happen which can fail so that once accepted connections won't fail because of lack of locked memory. - Fixed connection logic so we reject a new connection if we can't create the necessary handlers rather than kill the entire broker (this includes not enough locked memory) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@697101 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp28
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h7
2 files changed, 23 insertions, 12 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index dd4fbefcaf..e3dc0cbf8f 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -62,11 +62,21 @@ namespace Rdma {
// Prepost some recv buffers before we go any further
for (int i = 0; i<recvBufferCount; ++i) {
+ // Allocate recv buffer
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
b->dataCount = b->byteCount;
qp->postRecv(b);
}
+
+ for (int i = 0; i<xmitBufferCount; ++i) {
+ // Allocate xmit buffer
+ Buffer* b = qp->createBuffer(bufferSize);
+ buffers.push_front(b);
+ bufferQueue.push_front(b);
+ b->dataCount = 0;
+ b->dataStart = 0;
+ }
}
AsynchIO::~AsynchIO() {
@@ -378,18 +388,12 @@ namespace Rdma {
Buffer* AsynchIO::getBuffer() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
- if (bufferQueue.empty()) {
- Buffer* b = qp->createBuffer(bufferSize);
- buffers.push_front(b);
- return b;
- } else {
- Buffer* b = bufferQueue.front();
- bufferQueue.pop_front();
- b->dataCount = 0;
- b->dataStart = 0;
- return b;
- }
-
+ assert(!bufferQueue.empty());
+ Buffer* b = bufferQueue.front();
+ bufferQueue.pop_front();
+ b->dataCount = 0;
+ b->dataStart = 0;
+ return b;
}
void AsynchIO::returnBuffer(Buffer* b) {
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index 8b1422a1af..29132b8967 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -65,6 +65,9 @@ namespace Rdma {
ErrorCallback errorCallback;
public:
+ // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use
+ // for buffers both read and write (allocate half to each up front) and fail if we cannot allocate that much
+ // locked memory
AsynchIO(
QueuePair::intrusive_ptr q,
int size,
@@ -78,6 +81,7 @@ namespace Rdma {
void start(qpid::sys::Poller::shared_ptr poller);
bool writable() const;
+ bool bufferAvailable() const;
void queueWrite(Buffer* buff);
void notifyPendingWrite();
void queueWriteClose();
@@ -109,6 +113,9 @@ namespace Rdma {
return outstandingWrites;
}
+ inline bool AsynchIO::bufferAvailable() const {
+ return !bufferQueue.empty();
+ }
// These are the parameters necessary to start the conversation
// * Each peer HAS to allocate buffers of the size of the maximum receive from its peer
// * Each peer HAS to know the initial "credit" it has for transmitting to its peer