diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 06:16:19 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 06:16:19 +0000 |
| commit | 2e05b7082f5e387fc686925e5ac006485e4686db (patch) | |
| tree | b0a43e45da7cc24b65407ce6f7254e21b3fcde78 /cpp/src/qpid/sys/rdma/RdmaServer.cpp | |
| parent | 468b4b6ddaa3d96bb743cdbd27ded651eea31847 (diff) | |
| download | qpid-python-2e05b7082f5e387fc686925e5ac006485e4686db.tar.gz | |
Implementation of AMQP over RDMA protocols (Infiniband)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaServer.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index dee2d17eed..594578a265 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -42,12 +42,10 @@ using qpid::sys::Dispatcher; struct ConRec { Rdma::Connection::intrusive_ptr connection; Rdma::AsynchIO* data; - bool writable; queue<Rdma::Buffer*> queuedWrites; ConRec(Rdma::Connection::intrusive_ptr c) : - connection(c), - writable(true) + connection(c) {} }; @@ -55,50 +53,53 @@ void dataError(Rdma::AsynchIO&) { cout << "Data error:\n"; } +void idle(ConRec* cr, Rdma::AsynchIO& a) { + // Need to make sure full is not called as it would reorder messages + while (!cr->queuedWrites.empty() && a.writable()) { + Rdma::Buffer* buf = cr->queuedWrites.front(); + cr->queuedWrites.pop(); + a.queueWrite(buf); + } +} + void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { // Echo data back Rdma::Buffer* buf = a.getBuffer(); std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes); buf->dataCount = b->dataCount; - if (cr->queuedWrites.empty() && cr->writable) { + if (cr->queuedWrites.empty()) { + // If can't write then full will be called and push buffer on back of queue a.queueWrite(buf); } else { cr->queuedWrites.push(buf); + // Try to empty queue + idle(cr, a); } } -void full(ConRec* cr, Rdma::AsynchIO&) { - cr->writable = false; -} - -void idle(ConRec* cr, Rdma::AsynchIO& a) { - cr->writable = true; - while (!cr->queuedWrites.empty() && cr->writable) { - Rdma::Buffer* buf = cr->queuedWrites.front(); - cr->queuedWrites.pop(); - a.queueWrite(buf); - } +void full(ConRec* cr, Rdma::AsynchIO&, Rdma::Buffer* buf) { + cr->queuedWrites.push(buf); } void disconnected(Rdma::Connection::intrusive_ptr& ci) { ConRec* cr = ci->getContext<ConRec>(); cr->connection->disconnect(); - delete cr->data; + cr->data->queueWriteClose(); delete cr; cout << "Disconnected: " << cr << "\n"; } -void connectionError(Rdma::Connection::intrusive_ptr& ci) { +void connectionError(Rdma::Connection::intrusive_ptr& ci, Rdma::ErrorType) { ConRec* cr = ci->getContext<ConRec>(); cr->connection->disconnect(); if (cr) { - delete cr->data; + cr->data->queueWriteClose(); delete cr; } cout << "Connection error: " << cr << "\n"; } -bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { +bool connectionRequest(Rdma::Connection::intrusive_ptr& ci, const Rdma::ConnectionParams& cp) { cout << "Incoming connection: "; // For fun reject alternate connection attempts @@ -109,10 +110,11 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { if (x) { ConRec* cr = new ConRec(ci); Rdma::AsynchIO* aio = - new Rdma::AsynchIO(ci->getQueuePair(), 8000, + new Rdma::AsynchIO(ci->getQueuePair(), + cp.maxRecvBufferSize, cp.initialXmitCredit, Rdma::DEFAULT_WR_ENTRIES, boost::bind(data, cr, _1, _2), boost::bind(idle, cr, _1), - boost::bind(full, cr, _1), + boost::bind(full, cr, _1, _2), dataError); ci->addContext(cr); cr->data = aio; @@ -149,6 +151,7 @@ int main(int argc, char* argv[]) { Dispatcher d(p); Rdma::Listener a((const sockaddr&)(sin), + Rdma::ConnectionParams(16384, Rdma::DEFAULT_WR_ENTRIES), boost::bind(connected, p, _1), connectionError, disconnected, |
