summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/rdma/rdma_wrap.cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/rdma_wrap.cpp364
1 files changed, 349 insertions, 15 deletions
diff --git a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
index 53e31ca766..8944be2034 100644
--- a/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
+++ b/cpp/src/qpid/sys/rdma/rdma_wrap.cpp
@@ -21,6 +21,17 @@
#include "qpid/sys/rdma/rdma_wrap.h"
+#include "qpid/sys/rdma/rdma_factories.h"
+#include "qpid/sys/rdma/rdma_exception.h"
+
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include <fcntl.h>
+#include <netdb.h>
+
+#include <iostream>
+#include <stdexcept>
+
namespace Rdma {
const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
0, // .private_data
@@ -39,20 +50,64 @@ namespace Rdma {
return count;
}
- ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
- // It's badly documented, but it seems from the librdma source code that all the following
- // event types have a valid param.conn
- switch (event->event) {
- case RDMA_CM_EVENT_CONNECT_REQUEST:
- case RDMA_CM_EVENT_ESTABLISHED:
- case RDMA_CM_EVENT_REJECTED:
- case RDMA_CM_EVENT_DISCONNECTED:
- case RDMA_CM_EVENT_CONNECT_ERROR:
- return event->param.conn;
- default:
- ::rdma_conn_param p = {};
- return p;
- }
+ Buffer::Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
+ bytes(b),
+ byteCount(s),
+ dataStart(0),
+ dataCount(0),
+ mr(CHECK_NULL(::ibv_reg_mr(
+ pd, bytes, byteCount,
+ ::IBV_ACCESS_LOCAL_WRITE)))
+ {}
+
+ Buffer::~Buffer() {
+ (void) ::ibv_dereg_mr(mr);
+ delete [] bytes;
+ }
+
+ QueuePairEvent::QueuePairEvent() :
+ dir(NONE)
+ {}
+
+ QueuePairEvent::QueuePairEvent(
+ const ::ibv_wc& w,
+ boost::shared_ptr< ::ibv_cq > c,
+ QueueDirection d) :
+ cq(c),
+ wc(w),
+ dir(d)
+ {
+ assert(dir != NONE);
+ }
+
+ QueuePairEvent::operator bool() const {
+ return dir != NONE;
+ }
+
+ bool QueuePairEvent::immPresent() const {
+ return wc.wc_flags & IBV_WC_WITH_IMM;
+ }
+
+ uint32_t QueuePairEvent::getImm() const {
+ return ntohl(wc.imm_data);
+ }
+
+ QueueDirection QueuePairEvent::getDirection() const {
+ return dir;
+ }
+
+ ::ibv_wc_opcode QueuePairEvent::getEventType() const {
+ return wc.opcode;
+ }
+
+ ::ibv_wc_status QueuePairEvent::getEventStatus() const {
+ return wc.status;
+ }
+
+ Buffer* QueuePairEvent::getBuffer() const {
+ Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
+ b->dataCount = wc.byte_len;
+ return b;
}
QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
@@ -84,7 +139,7 @@ namespace Rdma {
qp_attr.qp_type = IBV_QPT_RC;
CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr));
- qp = boost::shared_ptr< ::ibv_qp >(i->qp, destroyQp);
+ qp = mkQp(i->qp);
// Set the qp context to this so we can find ourselves again
qp->qp_context = this;
@@ -100,6 +155,62 @@ namespace Rdma {
qp->qp_context = 0;
}
+ // Create a buffer to use for writing
+ Buffer* QueuePair::createBuffer(int s) {
+ return new Buffer(pd.get(), new char[s], s);
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void QueuePair::nonblocking() {
+ ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ QueuePair::intrusive_ptr QueuePair::getNextChannelEvent() {
+ // First find out which cq has the event
+ ::ibv_cq* cq;
+ void* ctx;
+ int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
+ if (rc == -1 && errno == EAGAIN)
+ return 0;
+ CHECK(rc);
+
+ // Batch acknowledge the event
+ if (cq == scq.get()) {
+ if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingSendEvents);
+ outstandingSendEvents = 0;
+ }
+ } else if (cq == rcq.get()) {
+ if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
+ ::ibv_ack_cq_events(cq, outstandingRecvEvents);
+ outstandingRecvEvents = 0;
+ }
+ }
+
+ return static_cast<QueuePair*>(ctx);
+ }
+
+ QueuePairEvent QueuePair::getNextEvent() {
+ ::ibv_wc w;
+ if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, scq, SEND);
+ else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
+ return QueuePairEvent(w, rcq, RECV);
+ else
+ return QueuePairEvent();
+ }
+
+ void QueuePair::notifyRecv() {
+ CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
+ }
+
+ void QueuePair::notifySend() {
+ CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
+ }
+
void QueuePair::postRecv(Buffer* buf) {
::ibv_recv_wr rwr = {};
::ibv_sge sge;
@@ -158,6 +269,229 @@ namespace Rdma {
if (badswr)
throw std::logic_error("ibv_post_send(): Bad swr");
}
+
+ ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
+ id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
+ Connection::find(e->id) : new Connection(e->id)),
+ listen_id(Connection::find(e->listen_id)),
+ event(mkEvent(e))
+ {}
+
+ ConnectionEvent::operator bool() const {
+ return event;
+ }
+
+ ::rdma_cm_event_type ConnectionEvent::getEventType() const {
+ return event->event;
+ }
+
+ ::rdma_conn_param ConnectionEvent::getConnectionParam() const {
+ // It's badly documented, but it seems from the librdma source code that all the following
+ // event types have a valid param.conn
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ case RDMA_CM_EVENT_ESTABLISHED:
+ case RDMA_CM_EVENT_REJECTED:
+ case RDMA_CM_EVENT_DISCONNECTED:
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ return event->param.conn;
+ default:
+ ::rdma_conn_param p = {};
+ return p;
+ }
+ }
+
+ boost::intrusive_ptr<Connection> ConnectionEvent::getConnection () const {
+ return id;
+ }
+
+ boost::intrusive_ptr<Connection> ConnectionEvent::getListenId() const {
+ return listen_id;
+ }
+
+ // Wrap the passed in rdma_cm_id with a Connection
+ // this basically happens only on connection request
+ Connection::Connection(::rdma_cm_id* i) :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ id(mkId(i)),
+ context(0)
+ {
+ impl->fd = id->channel->fd;
+
+ // Just overwrite the previous context as it will
+ // have come from the listening connection
+ if (i)
+ i->context = this;
+ }
+
+ Connection::Connection() :
+ qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+ channel(mkEChannel()),
+ id(mkId(channel.get(), this, RDMA_PS_TCP)),
+ context(0)
+ {
+ impl->fd = channel->fd;
+ }
+
+ Connection::~Connection() {
+ // Reset the id context in case someone else has it
+ id->context = 0;
+ }
+
+ void Connection::ensureQueuePair() {
+ assert(id.get());
+
+ // Only allocate a queue pair if there isn't one already
+ if (qp)
+ return;
+
+ qp = new QueuePair(id);
+ }
+
+ Connection::intrusive_ptr Connection::make() {
+ return new Connection();
+ }
+
+ Connection::intrusive_ptr Connection::find(::rdma_cm_id* i) {
+ if (!i)
+ return 0;
+ Connection* id = static_cast< Connection* >(i->context);
+ if (!id)
+ throw std::logic_error("Couldn't find existing Connection");
+ return id;
+ }
+
+ // Make channel non-blocking by making
+ // associated fd nonblocking
+ void Connection::nonblocking() {
+ assert(id.get());
+ ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
+ }
+
+ // If we get EAGAIN because the channel has been set non blocking
+ // and we'd have to wait then return an empty event
+ ConnectionEvent Connection::getNextEvent() {
+ assert(id.get());
+ ::rdma_cm_event* e;
+ int rc = ::rdma_get_cm_event(id->channel, &e);
+ if (GETERR(rc) == EAGAIN)
+ return ConnectionEvent();
+ CHECK(rc);
+ return ConnectionEvent(e);
+ }
+
+ void Connection::bind(const qpid::sys::SocketAddress& src_addr) const {
+ assert(id.get());
+ CHECK(::rdma_bind_addr(id.get(), getAddrInfo(src_addr).ai_addr));
+ }
+
+ void Connection::listen(int backlog) const {
+ assert(id.get());
+ CHECK(::rdma_listen(id.get(), backlog));
+ }
+
+ void Connection::resolve_addr(
+ const qpid::sys::SocketAddress& dst_addr,
+ int timeout_ms) const
+ {
+ assert(id.get());
+ CHECK(::rdma_resolve_addr(id.get(), 0, getAddrInfo(dst_addr).ai_addr, timeout_ms));
+ }
+
+ void Connection::resolve_route(int timeout_ms) const {
+ assert(id.get());
+ CHECK(::rdma_resolve_route(id.get(), timeout_ms));
+ }
+
+ void Connection::disconnect() const {
+ assert(id.get());
+ int rc = ::rdma_disconnect(id.get());
+ // iWarp doesn't let you disconnect a disconnected connection
+ // but Infiniband can do so it's okay to call rdma_disconnect()
+ // in response to a disconnect event, but we may get an error
+ if (GETERR(rc) == EINVAL)
+ return;
+ CHECK(rc);
+ }
+
+ // TODO: Currently you can only connect with the default connection parameters
+ void Connection::connect(const void* data, size_t len) {
+ assert(id.get());
+ // Need to have a queue pair before we can connect
+ ensureQueuePair();
+
+ ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
+ p.private_data = data;
+ p.private_data_len = len;
+ CHECK(::rdma_connect(id.get(), &p));
+ }
+
+ void Connection::connect() {
+ connect(0, 0);
+ }
+
+ void Connection::accept(const ::rdma_conn_param& param, const void* data, size_t len) {
+ assert(id.get());
+ // Need to have a queue pair before we can accept
+ ensureQueuePair();
+
+ ::rdma_conn_param p = param;
+ p.private_data = data;
+ p.private_data_len = len;
+ CHECK(::rdma_accept(id.get(), &p));
+ }
+
+ void Connection::accept(const ::rdma_conn_param& param) {
+ accept(param, 0, 0);
+ }
+
+ void Connection::reject(const void* data, size_t len) const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), data, len));
+ }
+
+ void Connection::reject() const {
+ assert(id.get());
+ CHECK(::rdma_reject(id.get(), 0, 0));
+ }
+
+ QueuePair::intrusive_ptr Connection::getQueuePair() {
+ assert(id.get());
+
+ ensureQueuePair();
+
+ return qp;
+ }
+
+ std::string Connection::getLocalName() const {
+ ::sockaddr* addr = ::rdma_get_local_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
+
+ std::string Connection::getPeerName() const {
+ ::sockaddr* addr = ::rdma_get_peer_addr(id.get());
+ char hostName[NI_MAXHOST];
+ char portName[NI_MAXSERV];
+ CHECK_IBV(::getnameinfo(
+ addr, sizeof(::sockaddr_storage),
+ hostName, sizeof(hostName),
+ portName, sizeof(portName),
+ NI_NUMERICHOST | NI_NUMERICSERV));
+ std::string r(hostName);
+ r += ":";
+ r += portName;
+ return r;
+ }
}
std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {