diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2010-06-14 14:50:33 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2010-06-14 14:50:33 +0000 |
| commit | e5cdd98d3765bf41a9a6bbcb72470a8077a43a64 (patch) | |
| tree | 1b9e9356e2582b43e66e8d845f7f2752b325f79d /cpp/src/qpid/sys/rdma | |
| parent | 227246d4d1052242396fa08682da1ffc40f684a2 (diff) | |
| download | qpid-python-e5cdd98d3765bf41a9a6bbcb72470a8077a43a64.tar.gz | |
Combine Rdma::Buffer and ibv_sge needed to send it
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@954496 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaClient.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 13 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.cpp | 46 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/rdma/rdma_wrap.h | 29 |
5 files changed, 49 insertions, 51 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp index c95cda7b37..d33c609344 100644 --- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -67,8 +67,8 @@ vector<char> testString; void write(Rdma::AsynchIO& aio) { while (aio.writable() && aio.bufferAvailable() && smsgs < target) { Rdma::Buffer* b = aio.getBuffer(); - std::copy(testString.begin(), testString.end(), b->bytes); - b->dataCount = msgsize; + std::copy(testString.begin(), testString.end(), b->bytes()); + b->dataCount(msgsize); aio.queueWrite(b); ++smsgs; sbytes += msgsize; @@ -81,7 +81,7 @@ void dataError(Rdma::AsynchIO&) { void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) { ++rmsgs; - rbytes += b->dataCount; + rbytes += b->dataCount(); // When all messages have been recvd stop if (rmsgs < target) { @@ -99,7 +99,7 @@ void full(Rdma::AsynchIO& a, Rdma::Buffer* b) { // Don't need to keep buffer just adjust the counts --smsgs; - sbytes -= b->dataCount; + sbytes -= b->dataCount(); // Give buffer back a.returnBuffer(b); diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 45295d470c..3b49e9759e 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -65,7 +65,6 @@ namespace Rdma { // Allocate recv buffer Buffer* b = qp->createBuffer(bufferSize); buffers.push_front(b); - b->dataCount = b->byteCount; qp->postRecv(b); } @@ -74,8 +73,6 @@ namespace Rdma { Buffer* b = qp->createBuffer(bufferSize); buffers.push_front(b); bufferQueue.push_front(b); - b->dataCount = 0; - b->dataStart = 0; } } @@ -410,8 +407,6 @@ namespace Rdma { } // At this point the buffer has been consumed so put it back on the recv queue - b->dataStart = 0; - b->dataCount = 0; qp->postRecv(b); // Received another message @@ -425,8 +420,8 @@ namespace Rdma { if (writable()) { Buffer* ob = getBuffer(); // Have to send something as adapters hate it when you try to transfer 0 bytes - *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit); - ob->dataCount = sizeof(uint32_t); + *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(recvCredit); + ob->dataCount(sizeof(uint32_t)); int creditSent = recvCredit & ~FlagsMask; qp->postSend(creditSent | IgnoreData, ob); @@ -498,16 +493,12 @@ namespace Rdma { assert(!bufferQueue.empty()); Buffer* b = bufferQueue.front(); bufferQueue.pop_front(); - b->dataCount = 0; - b->dataStart = 0; return b; } void AsynchIO::returnBuffer(Buffer* b) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); bufferQueue.push_front(b); - b->dataCount = 0; - b->dataStart = 0; } ConnectionManager::ConnectionManager( diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index d42784fbaa..97715326d5 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -70,8 +70,8 @@ void idle(ConRec* cr, Rdma::AsynchIO& a) { void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { // Echo data back Rdma::Buffer* buf = a.getBuffer(); - std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes); - buf->dataCount = b->dataCount; + std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes()); + buf->dataCount(b->dataCount()); if (cr->queuedWrites.empty()) { // If can't write then full will be called and push buffer on back of queue a.queueWrite(buf); diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp index 8944be2034..2581aaedcb 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp @@ -50,19 +50,20 @@ namespace Rdma { return count; } - Buffer::Buffer(::ibv_pd* pd, char* const b, const int32_t s) : - bytes(b), - byteCount(s), - dataStart(0), - dataCount(0), + Buffer::Buffer(::ibv_pd* pd, const int32_t s) : + bufferSize(s), mr(CHECK_NULL(::ibv_reg_mr( - pd, bytes, byteCount, + pd, new char[s], s, ::IBV_ACCESS_LOCAL_WRITE))) - {} + { + sge.addr = (uintptr_t) mr->addr; + sge.length = 0; + sge.lkey = mr->lkey; + } Buffer::~Buffer() { (void) ::ibv_dereg_mr(mr); - delete [] bytes; + delete [] bytes(); } QueuePairEvent::QueuePairEvent() : @@ -106,7 +107,7 @@ namespace Rdma { Buffer* QueuePairEvent::getBuffer() const { Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id); - b->dataCount = wc.byte_len; + b->dataCount(wc.byte_len); return b; } @@ -157,7 +158,7 @@ namespace Rdma { // Create a buffer to use for writing Buffer* QueuePair::createBuffer(int s) { - return new Buffer(pd.get(), new char[s], s); + return new Buffer(pd.get(), s); } // Make channel non-blocking by making @@ -213,14 +214,11 @@ namespace Rdma { void QueuePair::postRecv(Buffer* buf) { ::ibv_recv_wr rwr = {}; - ::ibv_sge sge; - - sge.addr = (uintptr_t) buf->bytes+buf->dataStart; - sge.length = buf->dataCount; - sge.lkey = buf->mr->lkey; rwr.wr_id = reinterpret_cast<uint64_t>(buf); - rwr.sg_list = &sge; + // We are given the whole buffer + buf->dataCount(buf->byteCount()); + rwr.sg_list = &buf->sge; rwr.num_sge = 1; ::ibv_recv_wr* badrwr = 0; @@ -231,16 +229,11 @@ namespace Rdma { void QueuePair::postSend(Buffer* buf) { ::ibv_send_wr swr = {}; - ::ibv_sge sge; - - sge.addr = (uintptr_t) buf->bytes+buf->dataStart; - sge.length = buf->dataCount; - sge.lkey = buf->mr->lkey; swr.wr_id = reinterpret_cast<uint64_t>(buf); swr.opcode = IBV_WR_SEND; swr.send_flags = IBV_SEND_SIGNALED; - swr.sg_list = &sge; + swr.sg_list = &buf->sge; swr.num_sge = 1; ::ibv_send_wr* badswr = 0; @@ -251,17 +244,12 @@ namespace Rdma { void QueuePair::postSend(uint32_t imm, Buffer* buf) { ::ibv_send_wr swr = {}; - ::ibv_sge sge; - - sge.addr = (uintptr_t) buf->bytes+buf->dataStart; - sge.length = buf->dataCount; - sge.lkey = buf->mr->lkey; - swr.send_flags = IBV_SEND_SIGNALED; swr.wr_id = reinterpret_cast<uint64_t>(buf); swr.imm_data = htonl(imm); swr.opcode = IBV_WR_SEND_WITH_IMM; - swr.sg_list = &sge; + swr.send_flags = IBV_SEND_SIGNALED; + swr.sg_list = &buf->sge; swr.num_sge = 1; ::ibv_send_wr* badswr = 0; diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h index 5803ae5545..54066d1d41 100644 --- a/cpp/src/qpid/sys/rdma/rdma_wrap.h +++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h @@ -45,19 +45,38 @@ namespace Rdma { struct Buffer { friend class QueuePair; + friend class QueuePairEvent; - char* const bytes; - const int32_t byteCount; - int32_t dataStart; - int32_t dataCount; + char* bytes() const; + int32_t byteCount() const; + int32_t dataCount() const; + void dataCount(int32_t); - Buffer(::ibv_pd* pd, char* const b, const int32_t s); + Buffer(::ibv_pd* pd, const int32_t s); ~Buffer(); private: + const int32_t bufferSize; ::ibv_mr* mr; + ::ibv_sge sge; }; + inline char* Buffer::bytes() const { + return (char*) sge.addr; + } + + inline int32_t Buffer::byteCount() const { + return bufferSize; + } + + inline int32_t Buffer::dataCount() const { + return sge.length; + } + + inline void Buffer::dataCount(int32_t s) { + sge.length = s; + } + class Connection; enum QueueDirection { |
