summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/RdmaClient.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaClient.cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaClient.cpp37
1 files changed, 21 insertions, 16 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
index 7c2d9de505..afff96b72f 100644
--- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -30,8 +30,8 @@ int64_t smsgs = 0;
int64_t sbytes = 0;
int64_t rmsgs = 0;
int64_t rbytes = 0;
-
-int outstandingwrites = 0;
+int64_t cmsgs = 0;
+int writable = true;
int target = 1000000;
int msgsize = 200;
@@ -42,17 +42,18 @@ Duration fullTestDuration(TIME_INFINITE);
vector<char> testString;
void write(Rdma::AsynchIO& aio) {
- //if ((smsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
- while (smsgs < target && outstandingwrites < (3*Rdma::DEFAULT_WR_ENTRIES/4)) {
+ if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
+ while (writable) {
+ if (smsgs >= target)
+ return;
Rdma::Buffer* b = aio.getBuffer();
std::copy(testString.begin(), testString.end(), b->bytes);
b->dataCount = msgsize;
aio.queueWrite(b);
- ++outstandingwrites;
++smsgs;
sbytes += b->byteCount;
}
- //}
+ }
}
void dataError(Rdma::AsynchIO&) {
@@ -66,24 +67,27 @@ void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
// When all messages have been recvd stop
if (rmsgs < target) {
write(aio);
- return;
+ } else {
+ fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
+ if (cmsgs >= target)
+ p->shutdown();
}
+}
- fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
- if (outstandingwrites == 0)
- p->shutdown();
+void full(Rdma::AsynchIO&) {
+ writable = false;
}
void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
- --outstandingwrites;
+ writable = true;
+ ++cmsgs;
if (smsgs < target) {
write(aio);
- return;
+ } else {
+ sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
+ if (rmsgs >= target && cmsgs >= target)
+ p->shutdown();
}
-
- sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
- if (smsgs >= target && rmsgs >= target && outstandingwrites == 0)
- p->shutdown();
}
void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
@@ -93,6 +97,7 @@ void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize,
boost::bind(&data, poller, _1, _2),
boost::bind(&idle, poller, _1),
+ &full,
dataError);
startTime = AbsTime::now();