summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/RdmaServer.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-09-11 06:16:19 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-09-11 06:16:19 +0000
commit2e05b7082f5e387fc686925e5ac006485e4686db (patch)
treeb0a43e45da7cc24b65407ce6f7254e21b3fcde78 /cpp/src/qpid/sys/rdma/RdmaServer.cpp
parent468b4b6ddaa3d96bb743cdbd27ded651eea31847 (diff)
downloadqpid-python-2e05b7082f5e387fc686925e5ac006485e4686db.tar.gz
Implementation of AMQP over RDMA protocols (Infiniband)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaServer.cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaServer.cpp45
1 files changed, 24 insertions, 21 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
index dee2d17eed..594578a265 100644
--- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp
@@ -42,12 +42,10 @@ using qpid::sys::Dispatcher;
struct ConRec {
Rdma::Connection::intrusive_ptr connection;
Rdma::AsynchIO* data;
- bool writable;
queue<Rdma::Buffer*> queuedWrites;
ConRec(Rdma::Connection::intrusive_ptr c) :
- connection(c),
- writable(true)
+ connection(c)
{}
};
@@ -55,50 +53,53 @@ void dataError(Rdma::AsynchIO&) {
cout << "Data error:\n";
}
+void idle(ConRec* cr, Rdma::AsynchIO& a) {
+ // Need to make sure full is not called as it would reorder messages
+ while (!cr->queuedWrites.empty() && a.writable()) {
+ Rdma::Buffer* buf = cr->queuedWrites.front();
+ cr->queuedWrites.pop();
+ a.queueWrite(buf);
+ }
+}
+
void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
// Echo data back
Rdma::Buffer* buf = a.getBuffer();
std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes);
buf->dataCount = b->dataCount;
- if (cr->queuedWrites.empty() && cr->writable) {
+ if (cr->queuedWrites.empty()) {
+ // If can't write then full will be called and push buffer on back of queue
a.queueWrite(buf);
} else {
cr->queuedWrites.push(buf);
+ // Try to empty queue
+ idle(cr, a);
}
}
-void full(ConRec* cr, Rdma::AsynchIO&) {
- cr->writable = false;
-}
-
-void idle(ConRec* cr, Rdma::AsynchIO& a) {
- cr->writable = true;
- while (!cr->queuedWrites.empty() && cr->writable) {
- Rdma::Buffer* buf = cr->queuedWrites.front();
- cr->queuedWrites.pop();
- a.queueWrite(buf);
- }
+void full(ConRec* cr, Rdma::AsynchIO&, Rdma::Buffer* buf) {
+ cr->queuedWrites.push(buf);
}
void disconnected(Rdma::Connection::intrusive_ptr& ci) {
ConRec* cr = ci->getContext<ConRec>();
cr->connection->disconnect();
- delete cr->data;
+ cr->data->queueWriteClose();
delete cr;
cout << "Disconnected: " << cr << "\n";
}
-void connectionError(Rdma::Connection::intrusive_ptr& ci) {
+void connectionError(Rdma::Connection::intrusive_ptr& ci, Rdma::ErrorType) {
ConRec* cr = ci->getContext<ConRec>();
cr->connection->disconnect();
if (cr) {
- delete cr->data;
+ cr->data->queueWriteClose();
delete cr;
}
cout << "Connection error: " << cr << "\n";
}
-bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
+bool connectionRequest(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) {
cout << "Incoming connection: ";
// For fun reject alternate connection attempts
@@ -109,10 +110,11 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
if (x) {
ConRec* cr = new ConRec(ci);
Rdma::AsynchIO* aio =
- new Rdma::AsynchIO(ci->getQueuePair(), 8000,
+ new Rdma::AsynchIO(ci->getQueuePair(),
+ cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES,
boost::bind(data, cr, _1, _2),
boost::bind(idle, cr, _1),
- boost::bind(full, cr, _1),
+ boost::bind(full, cr, _1, _2),
dataError);
ci->addContext(cr);
cr->data = aio;
@@ -149,6 +151,7 @@ int main(int argc, char* argv[]) {
Dispatcher d(p);
Rdma::Listener a((const sockaddr&)(sin),
+ Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(connected, p, _1),
connectionError,
disconnected,