diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2010-09-08 16:48:58 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2010-09-08 16:48:58 +0000 |
| commit | 9048ed46bb240aa3839f74d8b6daf837592186be (patch) | |
| tree | bfd8c6e2d704a93b36ab575395bf4af6297a9b60 /cpp/src/qpid/sys/rdma/rdma_wrap.cpp | |
| parent | 8db918da6cb8d883c2f6c506823293c9029f1b18 (diff) | |
| download | qpid-python-9048ed46bb240aa3839f74d8b6daf837592186be.tar.gz | |
Refactored Rdma write buffers to be controlled by the rdma_wrapper layer
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995131 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/rdma_wrap.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 61 |
1 files changed, 37 insertions, 24 deletions
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index 071d453933..c286782c96 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -50,33 +50,14 @@ namespace Rdma { return count; } - Buffer::Buffer(::ibv_pd* pd, const int32_t s) : - bufferSize(s), - mr(CHECK_NULL(::ibv_reg_mr( - pd, new char[s], s, - ::IBV_ACCESS_LOCAL_WRITE))) - { - sge.addr = (uintptr_t) mr->addr; - sge.length = 0; - sge.lkey = mr->lkey; - } - Buffer::Buffer(uint32_t lkey, char* bytes, const int32_t byteCount) : - bufferSize(byteCount), - mr(0) + bufferSize(byteCount) { sge.addr = (uintptr_t) bytes; sge.length = 0; sge.lkey = lkey; } - Buffer::~Buffer() { - if (mr) { - (void) ::ibv_dereg_mr(mr); - delete [] bytes(); - } - } - QueuePairEvent::QueuePairEvent() : dir(NONE) {} @@ -169,16 +150,48 @@ namespace Rdma { // Deallocate recv buffer memory if (rmr) delete [] static_cast<char*>(rmr->addr); + // Deallocate recv buffer memory + if (smr) delete [] static_cast<char*>(smr->addr); + // The buffers ptr_deque automatically deletes all the buffers we've allocated } - // Create a buffer to use for writing - Buffer* QueuePair::createBuffer(int s) { - Buffer* b = new Buffer(pd.get(), s); - buffers.push_front(b); + // Create buffers to use for writing + void QueuePair::createSendBuffers(int sendBufferCount, int bufferSize) + { + assert(!smr); + + // Round up buffersize to cacheline (64 bytes) + bufferSize = (bufferSize+63) & (~63); + + // Allocate memory block for all receive buffers + char* mem = new char [sendBufferCount * bufferSize]; + smr = regMr(pd.get(), mem, sendBufferCount * bufferSize, ::IBV_ACCESS_LOCAL_WRITE); + for (int i = 0; i<sendBufferCount; ++i) { + // Allocate xmit buffer + Buffer* b = new Buffer(smr->lkey, &mem[i*bufferSize], bufferSize); + buffers.push_front(b); + bufferQueue.push_back(b); + } + } + + Buffer* QueuePair::getBuffer() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + assert(!bufferQueue.empty()); + Buffer* b = bufferQueue.back(); + bufferQueue.pop_back(); return b; } + void QueuePair::returnBuffer(Buffer* b) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + bufferQueue.push_back(b); + } + + bool QueuePair::bufferAvailable() const { + return !bufferQueue.empty(); + } + void QueuePair::allocateRecvBuffers(int recvBufferCount, int bufferSize) { assert(!rmr); |
