diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 06:16:19 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 06:16:19 +0000 |
| commit | 2e05b7082f5e387fc686925e5ac006485e4686db (patch) | |
| tree | b0a43e45da7cc24b65407ce6f7254e21b3fcde78 /cpp/src/qpid/sys/rdma/RdmaClient.cpp | |
| parent | 468b4b6ddaa3d96bb743cdbd27ded651eea31847 (diff) | |
| download | qpid-python-2e05b7082f5e387fc686925e5ac006485e4686db.tar.gz | |
Implementation of AMQP over RDMA protocols (Infiniband)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaClient.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaClient.cpp | 60 |
1 files changed, 32 insertions, 28 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp index 1a24cb9c80..0d3dd83131 100644 --- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp @@ -50,8 +50,6 @@ int64_t smsgs = 0; int64_t sbytes = 0; int64_t rmsgs = 0; int64_t rbytes = 0; -int64_t cmsgs = 0; -int writable = true; int target = 1000000; int msgsize = 200; @@ -62,17 +60,15 @@ Duration fullTestDuration(TIME_INFINITE); vector<char> testString; void write(Rdma::AsynchIO& aio) { - if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) { - while (writable) { - if (smsgs >= target) - return; - Rdma::Buffer* b = aio.getBuffer(); - std::copy(testString.begin(), testString.end(), b->bytes); - b->dataCount = msgsize; - aio.queueWrite(b); - ++smsgs; - sbytes += b->byteCount; - } + while (aio.writable()) { + if (smsgs >= target) + return; + Rdma::Buffer* b = aio.getBuffer(); + std::copy(testString.begin(), testString.end(), b->bytes); + b->dataCount = msgsize; + aio.queueWrite(b); + ++smsgs; + sbytes += msgsize; } } @@ -82,39 +78,46 @@ void dataError(Rdma::AsynchIO&) { void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) { ++rmsgs; - rbytes += b->byteCount; + rbytes += b->dataCount; // When all messages have been recvd stop if (rmsgs < target) { write(aio); } else { fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now())); - if (cmsgs >= target) + if (aio.incompletedWrites() == 0) p->shutdown(); } } -void full(Rdma::AsynchIO&) { - writable = false; +void full(Rdma::AsynchIO& a, Rdma::Buffer* b) { + // Warn as we shouldn't get here anymore + cerr << "!"; + + // Don't need to keep buffer just adjust the counts + --smsgs; + sbytes -= b->dataCount; + + // Give buffer back + a.returnBuffer(b); } void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) { - writable = true; - ++cmsgs; if (smsgs < target) { write(aio); } else { sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now())); - if (rmsgs >= target && cmsgs >= target) + if (rmsgs >= target && aio.incompletedWrites() == 0) p->shutdown(); } } -void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { +void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { cout << "Connected\n"; Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair(); - Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize, + Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), + cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES, boost::bind(&data, poller, _1, _2), boost::bind(&idle, poller, _1), &full, @@ -131,12 +134,12 @@ void disconnected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) p->shutdown(); } -void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) { +void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ErrorType) { cout << "Connection error\n"; p->shutdown(); } -void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) { +void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) { cout << "Connection rejected\n"; p->shutdown(); } @@ -164,7 +167,7 @@ int main(int argc, char* argv[]) { // Make a random message of that size testString.resize(msgsize); for (int i = 0; i < msgsize; ++i) { - testString[i] = 32 + rand() & 0x3f; + testString[i] = 32 + (rand() & 0x3f); } try { @@ -173,10 +176,11 @@ int main(int argc, char* argv[]) { Rdma::Connector c( *res->ai_addr, - boost::bind(&connected, p, _1), - boost::bind(&connectionError, p, _1), + Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES), + boost::bind(&connected, p, _1, _2), + boost::bind(&connectionError, p, _1, _2), boost::bind(&disconnected, p, _1), - boost::bind(&rejected, p, _1)); + boost::bind(&rejected, p, _1, _2)); c.start(p); d.run(); |
