diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-04-28 04:41:46 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-28 04:41:46 +0000 |
| commit | 9f153bc328112ed2ee25a801eff1f6a277c7bb19 (patch) | |
| tree | acd13eebcfe1a3ee196ab229741ce6a20e9eb27c /cpp/src/qpid/sys/rdma/RdmaServer.cpp | |
| parent | a301f95243dd1cd367a0a8d041c1168b8adc1e86 (diff) | |
| download | qpid-python-9f153bc328112ed2ee25a801eff1f6a277c7bb19.tar.gz | |
Work In Progress:
Added initial rdma code including test server and client
Turn off rdma support by default but autoconf should now detect whether
necessary rdma/ibverbs libs and headers are present
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652053 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaServer.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaServer.cpp | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaServer.cpp b/cpp/src/qpid/sys/rdma/RdmaServer.cpp new file mode 100644 index 0000000000..488fe28658 --- /dev/null +++ b/cpp/src/qpid/sys/rdma/RdmaServer.cpp @@ -0,0 +1,142 @@ +#include "RdmaIO.h" + +#include <arpa/inet.h> + +#include <vector> +#include <queue> +#include <string> +#include <iostream> + +#include <boost/bind.hpp> + +using std::vector; +using std::queue; +using std::string; +using std::cout; +using std::cerr; + +using qpid::sys::Poller; +using qpid::sys::Dispatcher; + +// All the accepted connections +struct ConRec { + Rdma::Connection::intrusive_ptr connection; + Rdma::AsynchIO* data; + int outstandingWrites; + queue<Rdma::Buffer*> queuedWrites; + + ConRec(Rdma::Connection::intrusive_ptr c) : + connection(c), + outstandingWrites(0) + {} +}; + +void dataError(Rdma::AsynchIO&) { + cout << "Data error:\n"; +} + +void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) { + // Echo data back + Rdma::Buffer* buf = a.getBuffer(); + std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, buf->bytes); + buf->dataCount = b->dataCount; + if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { + a.queueWrite(buf); + ++(cr->outstandingWrites); + } else { + cr->queuedWrites.push(buf); + } +} + +void idle(ConRec* cr, Rdma::AsynchIO& a) { + --(cr->outstandingWrites); + //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4) + while (!cr->queuedWrites.empty() && cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) { + Rdma::Buffer* buf = cr->queuedWrites.front(); + cr->queuedWrites.pop(); + a.queueWrite(buf); + ++(cr->outstandingWrites); + } +} + +void disconnected(Rdma::Connection::intrusive_ptr& ci) { + ConRec* cr = ci->getContext<ConRec>(); + cr->connection->disconnect(); + delete cr->data; + delete cr; + cout << "Disconnected: " << cr << "\n"; +} + +void connectionError(Rdma::Connection::intrusive_ptr& ci) { + ConRec* cr = ci->getContext<ConRec>(); + cr->connection->disconnect(); + if (cr) { + delete cr->data; + delete cr; + } + cout << "Connection error: " << cr << "\n"; +} + +bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) { + cout << "Incoming connection: "; + + // For fun reject alternate connection attempts + static bool x = false; + x ^= 1; + + // Must create aio here so as to prepost buffers *before* we accept connection + if (x) { + ConRec* cr = new ConRec(ci); + Rdma::AsynchIO* aio = + new Rdma::AsynchIO(ci->getQueuePair(), 8000, + boost::bind(data, cr, _1, _2), + boost::bind(idle, cr, _1), + dataError); + ci->addContext(cr); + cr->data = aio; + cout << "Accept=>" << cr << "\n"; + } else { + cout << "Reject\n"; + } + + return x; +} + +void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) { + static int cnt = 0; + ConRec* cr = ci->getContext<ConRec>(); + cout << "Connected: " << cr << "(" << ++cnt << ")\n"; + + cr->data->start(poller); +} + +int main(int argc, char* argv[]) { + vector<string> args(&argv[0], &argv[argc]); + + ::sockaddr_in sin; + + int port = (args.size() < 2) ? 20079 : atoi(args[1].c_str()); + cout << "Listening on port: " << port << "\n"; + + sin.sin_family = AF_INET; + sin.sin_port = htons(port); + sin.sin_addr.s_addr = INADDR_ANY; + + try { + boost::shared_ptr<Poller> p(new Poller()); + Dispatcher d(p); + + Rdma::Listener a((const sockaddr&)(sin), + boost::bind(connected, p, _1), + connectionError, + disconnected, + connectionRequest); + + + a.start(p); + d.run(); + } catch (Rdma::Exception& e) { + int err = e.getError(); + cerr << "Error: " << e.what() << "(" << err << ")\n"; + } +} |
