summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/RdmaClient.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-09-11 06:16:19 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-09-11 06:16:19 +0000
commit2e05b7082f5e387fc686925e5ac006485e4686db (patch)
treeb0a43e45da7cc24b65407ce6f7254e21b3fcde78 /cpp/src/qpid/sys/rdma/RdmaClient.cpp
parent468b4b6ddaa3d96bb743cdbd27ded651eea31847 (diff)
downloadqpid-python-2e05b7082f5e387fc686925e5ac006485e4686db.tar.gz
Implementation of AMQP over RDMA protocols (Infiniband)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaClient.cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaClient.cpp60
1 files changed, 32 insertions, 28 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaClient.cpp b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
index 1a24cb9c80..0d3dd83131 100644
--- a/cpp/src/qpid/sys/rdma/RdmaClient.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaClient.cpp
@@ -50,8 +50,6 @@ int64_t smsgs = 0;
int64_t sbytes = 0;
int64_t rmsgs = 0;
int64_t rbytes = 0;
-int64_t cmsgs = 0;
-int writable = true;
int target = 1000000;
int msgsize = 200;
@@ -62,17 +60,15 @@ Duration fullTestDuration(TIME_INFINITE);
vector<char> testString;
void write(Rdma::AsynchIO& aio) {
- if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
- while (writable) {
- if (smsgs >= target)
- return;
- Rdma::Buffer* b = aio.getBuffer();
- std::copy(testString.begin(), testString.end(), b->bytes);
- b->dataCount = msgsize;
- aio.queueWrite(b);
- ++smsgs;
- sbytes += b->byteCount;
- }
+ while (aio.writable()) {
+ if (smsgs >= target)
+ return;
+ Rdma::Buffer* b = aio.getBuffer();
+ std::copy(testString.begin(), testString.end(), b->bytes);
+ b->dataCount = msgsize;
+ aio.queueWrite(b);
+ ++smsgs;
+ sbytes += msgsize;
}
}
@@ -82,39 +78,46 @@ void dataError(Rdma::AsynchIO&) {
void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
++rmsgs;
- rbytes += b->byteCount;
+ rbytes += b->dataCount;
// When all messages have been recvd stop
if (rmsgs < target) {
write(aio);
} else {
fullTestDuration = std::min(fullTestDuration, Duration(startTime, AbsTime::now()));
- if (cmsgs >= target)
+ if (aio.incompletedWrites() == 0)
p->shutdown();
}
}
-void full(Rdma::AsynchIO&) {
- writable = false;
+void full(Rdma::AsynchIO& a, Rdma::Buffer* b) {
+ // Warn as we shouldn't get here anymore
+ cerr << "!";
+
+ // Don't need to keep buffer just adjust the counts
+ --smsgs;
+ sbytes -= b->dataCount;
+
+ // Give buffer back
+ a.returnBuffer(b);
}
void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
- writable = true;
- ++cmsgs;
if (smsgs < target) {
write(aio);
} else {
sendingDuration = std::min(sendingDuration, Duration(startTime, AbsTime::now()));
- if (rmsgs >= target && cmsgs >= target)
+ if (rmsgs >= target && aio.incompletedWrites() == 0)
p->shutdown();
}
}
-void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) {
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
cout << "Connected\n";
Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
- Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize,
+ Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit , Rdma::DEFAULT_WR_ENTRIES,
boost::bind(&data, poller, _1, _2),
boost::bind(&idle, poller, _1),
&full,
@@ -131,12 +134,12 @@ void disconnected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&)
p->shutdown();
}
-void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+void connectionError(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ErrorType) {
cout << "Connection error\n";
p->shutdown();
}
-void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&, const Rdma::ConnectionParams&) {
cout << "Connection rejected\n";
p->shutdown();
}
@@ -164,7 +167,7 @@ int main(int argc, char* argv[]) {
// Make a random message of that size
testString.resize(msgsize);
for (int i = 0; i < msgsize; ++i) {
- testString[i] = 32 + rand() & 0x3f;
+ testString[i] = 32 + (rand() & 0x3f);
}
try {
@@ -173,10 +176,11 @@ int main(int argc, char* argv[]) {
Rdma::Connector c(
*res->ai_addr,
- boost::bind(&connected, p, _1),
- boost::bind(&connectionError, p, _1),
+ Rdma::ConnectionParams(msgsize, Rdma::DEFAULT_WR_ENTRIES),
+ boost::bind(&connected, p, _1, _2),
+ boost::bind(&connectionError, p, _1, _2),
boost::bind(&disconnected, p, _1),
- boost::bind(&rejected, p, _1));
+ boost::bind(&rejected, p, _1, _2));
c.start(p);
d.run();