From 9f153bc328112ed2ee25a801eff1f6a277c7bb19 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Mon, 28 Apr 2008 04:41:46 +0000 Subject: Work In Progress: Added initial rdma code including test server and client Turn off rdma support by default but autoconf should now detect whether necessary rdma/ibverbs libs and headers are present git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652053 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/sys/rdma/RdmaServer.cpp | 142 +++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 cpp/src/qpid/sys/rdma/RdmaServer.cpp (limited to 'cpp/src/qpid/sys/rdma/RdmaServer.cpp') diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp new file mode 100644 index 0000000000..488fe28658 --- /dev/null +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -0,0 +1,142 @@ +#include "RdmaIO.h" + +#include + +#include +#include +#include +#include + +#include + +using std::vector; +using std::queue; +using std::string; +using std::cout; +using std::cerr; + +using qpid::sys::Poller; +using qpid::sys::Dispatcher; + +// All the accepted connections +struct ConRec { + Rdma::Connection::intrusive_ptr connection; + Rdma::AsynchIO* data; + int outstandingWrites; + queue queuedWrites; + + ConRec(Rdma::Connection::intrusive_ptr c) : + connection(c), + outstandingWrites(0) + {} +}; + +void dataError(Rdma::AsynchIO&) { + cout << "Data error:\n"; +} + +void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { + // Echo data back + Rdma::Buffer* buf = a.getBuffer(); + std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes); + buf->dataCount = b->dataCount; + if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { + a.queueWrite(buf); + ++(cr->outstandingWrites); + } else { + cr->queuedWrites.push(buf); + } +} + +void idle(ConRec* cr, Rdma::AsynchIO& a) { + --(cr->outstandingWrites); + //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4) + while (!cr->queuedWrites.empty() && cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { + Rdma::Buffer* buf = cr->queuedWrites.front(); + cr->queuedWrites.pop(); + a.queueWrite(buf); + ++(cr->outstandingWrites); + } +} + +void disconnected(Rdma::Connection::intrusive_ptr& ci) { + ConRec* cr = ci->getContext(); + cr->connection->disconnect(); + delete cr->data; + delete cr; + cout << "Disconnected: " << cr << "\n"; +} + +void connectionError(Rdma::Connection::intrusive_ptr& ci) { + ConRec* cr = ci->getContext(); + cr->connection->disconnect(); + if (cr) { + delete cr->data; + delete cr; + } + cout << "Connection error: " << cr << "\n"; +} + +bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { + cout << "Incoming connection: "; + + // For fun reject alternate connection attempts + static bool x = false; + x ^= 1; + + // Must create aio here so as to prepost buffers *before* we accept connection + if (x) { + ConRec* cr = new ConRec(ci); + Rdma::AsynchIO* aio = + new Rdma::AsynchIO(ci->getQueuePair(), 8000, + boost::bind(data, cr, _1, _2), + boost::bind(idle, cr, _1), + dataError); + ci->addContext(cr); + cr->data = aio; + cout << "Accept=>" << cr << "\n"; + } else { + cout << "Reject\n"; + } + + return x; +} + +void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { + static int cnt = 0; + ConRec* cr = ci->getContext(); + cout << "Connected: " << cr << "(" << ++cnt << ")\n"; + + cr->data->start(poller); +} + +int main(int argc, char* argv[]) { + vector args(&argv[0], &argv[argc]); + + ::sockaddr_in sin; + + int port = (args.size() < 2) ? 20079 : atoi(args[1].c_str()); + cout << "Listening on port: " << port << "\n"; + + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + sin.sin_addr.s_addr = INADDR_ANY; + + try { + boost::shared_ptr p(new Poller()); + Dispatcher d(p); + + Rdma::Listener a((const sockaddr&)(sin), + boost::bind(connected, p, _1), + connectionError, + disconnected, + connectionRequest); + + + a.start(p); + d.run(); + } catch (Rdma::Exception& e) { + int err = e.getError(); + cerr << "Error: " << e.what() << "(" << err << ")\n"; + } +} -- cgit v1.2.1