diff options
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaClient.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaClient.cpp | 37 |
1 files changed, 21 insertions, 16 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 7c2d9de505..afff96b72f 100644 --- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -30,8 +30,8 @@ int64_t smsgs = 0; int64_t sbytes = 0; int64_t rmsgs = 0; int64_t rbytes = 0; - -int outstandingwrites = 0; +int64_t cmsgs = 0; +int writable = true; int target = 1000000; int msgsize = 200; @@ -42,17 +42,18 @@ Duration fullTestDuration(TIME_INFINITE); vector<char> testString; void write(Rdma::AsynchIO& aio) { - //if ((smsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) { - while (smsgs < target && outstandingwrites < (3*Rdma::DEFAULT_WR_ENTRIES/4)) { + if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) { + while (writable) { + if (smsgs >= target) + return; Rdma::Buffer* b = aio.getBuffer(); std::copy(testString.begin(), testString.end(), b->bytes); b->dataCount = msgsize; aio.queueWrite(b); - ++outstandingwrites; ++smsgs; sbytes += b->byteCount; } - //} + } } void dataError(Rdma::AsynchIO&) { @@ -66,24 +67,27 @@ void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) { // When all messages have been recvd stop if (rmsgs < target) { write(aio); - return; + } else { + fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now())); + if (cmsgs >= target) + p->shutdown(); } +} - fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now())); - if (outstandingwrites == 0) - p->shutdown(); +void full(Rdma::AsynchIO&) { + writable = false; } void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) { - --outstandingwrites; + writable = true; + ++cmsgs; if (smsgs < target) { write(aio); - return; + } else { + sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now())); + if (rmsgs >= target && cmsgs >= target) + p->shutdown(); } - - sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now())); - if (smsgs >= target && rmsgs >= target && outstandingwrites == 0) - p->shutdown(); } void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { @@ -93,6 +97,7 @@ void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize, boost::bind(&data, poller, _1, _2), boost::bind(&idle, poller, _1), + &full, dataError); startTime = AbsTime::now(); |
