summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-06-14 14:50:33 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-06-14 14:50:33 +0000
commite5cdd98d3765bf41a9a6bbcb72470a8077a43a64 (patch)
tree1b9e9356e2582b43e66e8d845f7f2752b325f79d /cpp/src/qpid/sys/rdma
parent227246d4d1052242396fa08682da1ffc40f684a2 (diff)
downloadqpid-python-e5cdd98d3765bf41a9a6bbcb72470a8077a43a64.tar.gz
Combine Rdma::Buffer and ibv_sge needed to send it
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@954496 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaClient.cpp8
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp13
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaServer.cpp4
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.cpp46
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.h29
5 files changed, 49 insertions, 51 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
index c95cda7b37..d33c609344 100644
--- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -67,8 +67,8 @@ vector<char> testString;
void write(Rdma::AsynchIO& aio) {
while (aio.writable() && aio.bufferAvailable() && smsgs < target) {
Rdma::Buffer* b = aio.getBuffer();
- std::copy(testString.begin(), testString.end(), b->bytes);
- b->dataCount = msgsize;
+ std::copy(testString.begin(), testString.end(), b->bytes());
+ b->dataCount(msgsize);
aio.queueWrite(b);
++smsgs;
sbytes += msgsize;
@@ -81,7 +81,7 @@ void dataError(Rdma::AsynchIO&) {
void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
++rmsgs;
- rbytes += b->dataCount;
+ rbytes += b->dataCount();
// When all messages have been recvd stop
if (rmsgs < target) {
@@ -99,7 +99,7 @@ void full(Rdma::AsynchIO& a, Rdma::Buffer* b) {
// Don't need to keep buffer just adjust the counts
--smsgs;
- sbytes -= b->dataCount;
+ sbytes -= b->dataCount();
// Give buffer back
a.returnBuffer(b);
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 45295d470c..3b49e9759e 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -65,7 +65,6 @@ namespace Rdma {
// Allocate recv buffer
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
- b->dataCount = b->byteCount;
qp->postRecv(b);
}
@@ -74,8 +73,6 @@ namespace Rdma {
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
bufferQueue.push_front(b);
- b->dataCount = 0;
- b->dataStart = 0;
}
}
@@ -410,8 +407,6 @@ namespace Rdma {
}
// At this point the buffer has been consumed so put it back on the recv queue
- b->dataStart = 0;
- b->dataCount = 0;
qp->postRecv(b);
// Received another message
@@ -425,8 +420,8 @@ namespace Rdma {
if (writable()) {
Buffer* ob = getBuffer();
// Have to send something as adapters hate it when you try to transfer 0 bytes
- *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit);
- ob->dataCount = sizeof(uint32_t);
+ *reinterpret_cast< uint32_t* >(ob->bytes()) = htonl(recvCredit);
+ ob->dataCount(sizeof(uint32_t));
int creditSent = recvCredit & ~FlagsMask;
qp->postSend(creditSent | IgnoreData, ob);
@@ -498,16 +493,12 @@ namespace Rdma {
assert(!bufferQueue.empty());
Buffer* b = bufferQueue.front();
bufferQueue.pop_front();
- b->dataCount = 0;
- b->dataStart = 0;
return b;
}
void AsynchIO::returnBuffer(Buffer* b) {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
bufferQueue.push_front(b);
- b->dataCount = 0;
- b->dataStart = 0;
}
ConnectionManager::ConnectionManager(
diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
index d42784fbaa..97715326d5 100644
--- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
@@ -70,8 +70,8 @@ void idle(ConRec* cr, Rdma::AsynchIO& a) {
void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
// Echo data back
Rdma::Buffer* buf = a.getBuffer();
- std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes);
- buf->dataCount = b->dataCount;
+ std::copy(b->bytes(), b->bytes()+b->dataCount(), buf->bytes());
+ buf->dataCount(b->dataCount());
if (cr->queuedWrites.empty()) {
// If can't write then full will be called and push buffer on back of queue
a.queueWrite(buf);
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
index 8944be2034..2581aaedcb 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -50,19 +50,20 @@ namespace Rdma {
return count;
}
- Buffer::Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
- bytes(b),
- byteCount(s),
- dataStart(0),
- dataCount(0),
+ Buffer::Buffer(::ibv_pd* pd, const int32_t s) :
+ bufferSize(s),
mr(CHECK_NULL(::ibv_reg_mr(
- pd, bytes, byteCount,
+ pd, new char[s], s,
::IBV_ACCESS_LOCAL_WRITE)))
- {}
+ {
+ sge.addr = (uintptr_t) mr->addr;
+ sge.length = 0;
+ sge.lkey = mr->lkey;
+ }
Buffer::~Buffer() {
(void) ::ibv_dereg_mr(mr);
- delete [] bytes;
+ delete [] bytes();
}
QueuePairEvent::QueuePairEvent() :
@@ -106,7 +107,7 @@ namespace Rdma {
Buffer* QueuePairEvent::getBuffer() const {
Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
- b->dataCount = wc.byte_len;
+ b->dataCount(wc.byte_len);
return b;
}
@@ -157,7 +158,7 @@ namespace Rdma {
// Create a buffer to use for writing
Buffer* QueuePair::createBuffer(int s) {
- return new Buffer(pd.get(), new char[s], s);
+ return new Buffer(pd.get(), s);
}
// Make channel non-blocking by making
@@ -213,14 +214,11 @@ namespace Rdma {
void QueuePair::postRecv(Buffer* buf) {
::ibv_recv_wr rwr = {};
- ::ibv_sge sge;
-
- sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
- sge.length = buf->dataCount;
- sge.lkey = buf->mr->lkey;
rwr.wr_id = reinterpret_cast<uint64_t>(buf);
- rwr.sg_list = &sge;
+ // We are given the whole buffer
+ buf->dataCount(buf->byteCount());
+ rwr.sg_list = &buf->sge;
rwr.num_sge = 1;
::ibv_recv_wr* badrwr = 0;
@@ -231,16 +229,11 @@ namespace Rdma {
void QueuePair::postSend(Buffer* buf) {
::ibv_send_wr swr = {};
- ::ibv_sge sge;
-
- sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
- sge.length = buf->dataCount;
- sge.lkey = buf->mr->lkey;
swr.wr_id = reinterpret_cast<uint64_t>(buf);
swr.opcode = IBV_WR_SEND;
swr.send_flags = IBV_SEND_SIGNALED;
- swr.sg_list = &sge;
+ swr.sg_list = &buf->sge;
swr.num_sge = 1;
::ibv_send_wr* badswr = 0;
@@ -251,17 +244,12 @@ namespace Rdma {
void QueuePair::postSend(uint32_t imm, Buffer* buf) {
::ibv_send_wr swr = {};
- ::ibv_sge sge;
-
- sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
- sge.length = buf->dataCount;
- sge.lkey = buf->mr->lkey;
- swr.send_flags = IBV_SEND_SIGNALED;
swr.wr_id = reinterpret_cast<uint64_t>(buf);
swr.imm_data = htonl(imm);
swr.opcode = IBV_WR_SEND_WITH_IMM;
- swr.sg_list = &sge;
+ swr.send_flags = IBV_SEND_SIGNALED;
+ swr.sg_list = &buf->sge;
swr.num_sge = 1;
::ibv_send_wr* badswr = 0;
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.h b/cpp/src/qpid/sys/rdma/rdma_wrap.h
index 5803ae5545..54066d1d41 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.h
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.h
@@ -45,19 +45,38 @@ namespace Rdma {
struct Buffer {
friend class QueuePair;
+ friend class QueuePairEvent;
- char* const bytes;
- const int32_t byteCount;
- int32_t dataStart;
- int32_t dataCount;
+ char* bytes() const;
+ int32_t byteCount() const;
+ int32_t dataCount() const;
+ void dataCount(int32_t);
- Buffer(::ibv_pd* pd, char* const b, const int32_t s);
+ Buffer(::ibv_pd* pd, const int32_t s);
~Buffer();
private:
+ const int32_t bufferSize;
::ibv_mr* mr;
+ ::ibv_sge sge;
};
+ inline char* Buffer::bytes() const {
+ return (char*) sge.addr;
+ }
+
+ inline int32_t Buffer::byteCount() const {
+ return bufferSize;
+ }
+
+ inline int32_t Buffer::dataCount() const {
+ return sge.length;
+ }
+
+ inline void Buffer::dataCount(int32_t s) {
+ sge.length = s;
+ }
+
class Connection;
enum QueueDirection {