From c86a77f2ce6150ce8fc0770604d92502acd996b8 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Tue, 29 Apr 2008 22:46:23 +0000 Subject: More RDMA Work in Progress Changes to client buffering Buffering improvement to server Removed unused state machine from RdmaIO code Move the write throttling due to limited write buffers into the RdmaIO code git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652180 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/rdma/RdmaServer.cpp | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) (limited to 'cpp/src/qpid/sys/rdma/RdmaServer.cpp') diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp index 488fe28658..f7f739d6c2 100644 --- a/cpp/src/qpid/sys/rdma/RdmaServer.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -22,12 +22,12 @@ using qpid::sys::Dispatcher; struct ConRec { Rdma::Connection::intrusive_ptr connection; Rdma::AsynchIO* data; - int outstandingWrites; + bool writable; queue queuedWrites; ConRec(Rdma::Connection::intrusive_ptr c) : connection(c), - outstandingWrites(0) + writable(true) {} }; @@ -40,23 +40,24 @@ void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { Rdma::Buffer* buf = a.getBuffer(); std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes); buf->dataCount = b->dataCount; - if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { + if (cr->queuedWrites.empty() && cr->writable) { a.queueWrite(buf); - ++(cr->outstandingWrites); } else { cr->queuedWrites.push(buf); } } +void full(ConRec* cr, Rdma::AsynchIO&) { + cr->writable = false; +} + void idle(ConRec* cr, Rdma::AsynchIO& a) { - --(cr->outstandingWrites); - //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4) - while (!cr->queuedWrites.empty() && cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { - Rdma::Buffer* buf = cr->queuedWrites.front(); - cr->queuedWrites.pop(); - a.queueWrite(buf); - ++(cr->outstandingWrites); - } + cr->writable = true; + while (!cr->queuedWrites.empty() && cr->writable) { + Rdma::Buffer* buf = cr->queuedWrites.front(); + cr->queuedWrites.pop(); + a.queueWrite(buf); + } } void disconnected(Rdma::Connection::intrusive_ptr& ci) { @@ -82,7 +83,7 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { // For fun reject alternate connection attempts static bool x = false; - x ^= 1; + x = true; // Must create aio here so as to prepost buffers *before* we accept connection if (x) { @@ -91,6 +92,7 @@ bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { new Rdma::AsynchIO(ci->getQueuePair(), 8000, boost::bind(data, cr, _1, _2), boost::bind(idle, cr, _1), + boost::bind(full, cr, _1), dataError); ci->addContext(cr); cr->data = aio; -- cgit v1.2.1