summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/rdma/rdma_wrap.cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.cpp36
1 files changed, 34 insertions, 2 deletions
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
index b046b012db..071d453933 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -61,9 +61,20 @@ namespace Rdma {
sge.lkey = mr->lkey;
}
+ Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) :
+ bufferSize(byteCount),
+ mr(0)
+ {
+ sge.addr = (uintptr_t) bytes;
+ sge.length = 0;
+ sge.lkey = lkey;
+ }
+
Buffer::~Buffer() {
- (void) ::ibv_dereg_mr(mr);
- delete [] bytes();
+ if (mr) {
+ (void) ::ibv_dereg_mr(mr);
+ delete [] bytes();
+ }
}
QueuePairEvent::QueuePairEvent() :
@@ -155,6 +166,9 @@ namespace Rdma {
// Reset back pointer in case someone else has the qp
qp->qp_context = 0;
+ // Deallocate recv buffer memory
+ if (rmr) delete [] static_cast<char*>(rmr->addr);
+
// The buffers ptr_deque automatically deletes all the buffers we've allocated
}
@@ -165,6 +179,24 @@ namespace Rdma {
return b;
}
+ void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize)
+ {
+ assert(!rmr);
+
+ // Round up buffersize to cacheline (64 bytes)
+ bufferSize = (bufferSize+63) & (~63);
+
+ // Allocate memory block for all receive buffers
+ char* mem = new char [recvBufferCount * bufferSize];
+ rmr = regMr(pd.get(), mem, recvBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE);
+ for (int i = 0; i<recvBufferCount; ++i) {
+ // Allocate recv buffer
+ Buffer* b = new Buffer(rmr->lkey, &mem[i*bufferSize], bufferSize);
+ buffers.push_front(b);
+ postRecv(b);
+ }
+ }
+
// Make channel non-blocking by making
// associated fd nonblocking
void QueuePair::nonblocking() {