#ifndef RDMA_WRAP_H #define RDMA_WRAP_H #include "rdma_factories.h" #include #include "qpid/RefCounted.h" #include "qpid/sys/IOHandle.h" #include "qpid/sys/posix/PrivatePosix.h" #include #include #include #include #include #include #include namespace Rdma { const int DEFAULT_TIMEOUT = 2000; // 2 secs const int DEFAULT_BACKLOG = 100; const int DEFAULT_CQ_ENTRIES = 256; const int DEFAULT_WR_ENTRIES = 64; const ::rdma_conn_param DEFAULT_CONNECT_PARAM = { 0, // .private_data 0, // .private_data_len 4, // .responder_resources 4, // .initiator_depth 0, // .flow_control 5, // .retry_count 7 // .rnr_retry_count }; struct Buffer { friend class QueuePair; char* const bytes; const int32_t byteCount; int32_t dataStart; int32_t dataCount; Buffer(::ibv_pd* pd, char* const b, const int32_t s) : bytes(b), byteCount(s), dataStart(0), dataCount(0), mr(CHECK_NULL(::ibv_reg_mr( pd, bytes, byteCount, ::IBV_ACCESS_LOCAL_WRITE))) {} ~Buffer() { (void) ::ibv_dereg_mr(mr); delete [] bytes; } private: ::ibv_mr* mr; }; class Connection; enum QueueDirection { NONE, SEND, RECV }; class QueuePairEvent { boost::shared_ptr< ::ibv_cq > cq; ::ibv_wc wc; QueueDirection dir; friend class QueuePair; QueuePairEvent() : dir(NONE) {} QueuePairEvent( const ::ibv_wc& w, boost::shared_ptr< ::ibv_cq > c, QueueDirection d) : cq(c), wc(w), dir(d) { assert(dir != NONE); } public: operator bool() const { return dir != NONE; } QueueDirection getDirection() const { return dir; } ::ibv_wc_opcode getEventType() const { return wc.opcode; } ::ibv_wc_status getEventStatus() const { return wc.status; } Buffer* getBuffer() const { Buffer* b = reinterpret_cast(wc.wr_id); b->dataCount = wc.byte_len; return b; } }; // Wrapper for a queue pair - this has the functionality for // putting buffers on the receive queue and for sending buffers // to the other end of the connection. // // Currently QueuePairs are contained inside Connections and have no // separate lifetime class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted { boost::shared_ptr< ::ibv_pd > pd; boost::shared_ptr< ::ibv_comp_channel > cchannel; boost::shared_ptr< ::ibv_cq > scq; boost::shared_ptr< ::ibv_cq > rcq; boost::shared_ptr< ::rdma_cm_id > id; int outstandingSendEvents; int outstandingRecvEvents; friend class Connection; QueuePair(boost::shared_ptr< ::rdma_cm_id > id); ~QueuePair(); public: typedef boost::intrusive_ptr intrusive_ptr; // Create a buffer to use for writing Buffer* createBuffer(int s) { return new Buffer(pd.get(), new char[s], s); } // Make channel non-blocking by making // associated fd nonblocking void nonblocking() { ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK); } // If we get EAGAIN because the channel has been set non blocking // and we'd have to wait then return an empty event QueuePair::intrusive_ptr getNextChannelEvent() { // First find out which cq has the event ::ibv_cq* cq; void* ctx; int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx); if (rc == -1 && errno == EAGAIN) return 0; CHECK(rc); // Batch acknowledge the event if (cq == scq.get()) { if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) { ::ibv_ack_cq_events(cq, outstandingSendEvents); outstandingSendEvents = 0; } } else if (cq == rcq.get()) { if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) { ::ibv_ack_cq_events(cq, outstandingRecvEvents); outstandingRecvEvents = 0; } } return static_cast(ctx); } QueuePairEvent getNextEvent() { ::ibv_wc w; if (::ibv_poll_cq(scq.get(), 1, &w) == 1) return QueuePairEvent(w, scq, SEND); else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1) return QueuePairEvent(w, rcq, RECV); else return QueuePairEvent(); } void postRecv(Buffer* buf); void postSend(Buffer* buf); void notifyRecv(); void notifySend(); }; class ConnectionEvent { friend class Connection; // The order of the members is important as we have to acknowledge // the event before destroying the ids on destruction boost::intrusive_ptr id; boost::intrusive_ptr listen_id; boost::shared_ptr< ::rdma_cm_event > event; ConnectionEvent() {} ConnectionEvent(::rdma_cm_event* e); // Default copy, assignment and destructor ok public: operator bool() const { return event; } ::rdma_cm_event_type getEventType() const { return event->event; } ::rdma_conn_param getConnectionParam() const { if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) { return event->param.conn; } else { ::rdma_conn_param p = {}; return p; } } boost::intrusive_ptr getConnection () const { return id; } boost::intrusive_ptr getListenId() const { return listen_id; } }; // For the moment this is a fairly simple wrapper for rdma_cm_id. // // NB: It allocates a protection domain (pd) per connection which means that // registered buffers can't be shared between different connections // (this can only happen between connections on the same controller in any case, // so needs careful management if used) class Connection : public qpid::sys::IOHandle, public qpid::RefCounted { boost::shared_ptr< ::rdma_event_channel > channel; boost::shared_ptr< ::rdma_cm_id > id; QueuePair::intrusive_ptr qp; void* context; friend class ConnectionEvent; friend class QueuePair; // Wrap the passed in rdma_cm_id with a Connection // this basically happens only on connection request Connection(::rdma_cm_id* i) : qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), id(i, destroyId), context(0) { impl->fd = id->channel->fd; // Just overwrite the previous context as it will // have come from the listening connection if (i) i->context = this; } Connection() : qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), channel(mkEChannel()), id(mkId(channel.get(), this, RDMA_PS_TCP)), context(0) { impl->fd = channel->fd; } // Default destructor fine void ensureQueuePair() { assert(id.get()); // Only allocate a queue pair if there isn't one already if (qp) return; qp = new QueuePair(id); } public: typedef boost::intrusive_ptr intrusive_ptr; static intrusive_ptr make() { return new Connection(); } static intrusive_ptr find(::rdma_cm_id* i) { if (!i) return 0; Connection* id = static_cast< Connection* >(i->context); if (!id) throw std::logic_error("Couldn't find existing Connection"); return id; } template void addContext(T* c) { // Don't allow replacing context if (!context) context = c; } template T* getContext() { return static_cast(context); } // Make channel non-blocking by making // associated fd nonblocking void nonblocking() { assert(id.get()); ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK); } // If we get EAGAIN because the channel has been set non blocking // and we'd have to wait then return an empty event ConnectionEvent getNextEvent() { assert(id.get()); ::rdma_cm_event* e; int rc = ::rdma_get_cm_event(id->channel, &e); if (rc == -1 && errno == EAGAIN) return ConnectionEvent(); CHECK(rc); return ConnectionEvent(e); } void bind(sockaddr& src_addr) const { assert(id.get()); CHECK(::rdma_bind_addr(id.get(), &src_addr)); } void listen(int backlog = DEFAULT_BACKLOG) const { assert(id.get()); CHECK(::rdma_listen(id.get(), backlog)); } void resolve_addr( sockaddr& dst_addr, sockaddr* src_addr = 0, int timeout_ms = DEFAULT_TIMEOUT) const { assert(id.get()); CHECK(::rdma_resolve_addr(id.get(), src_addr, &dst_addr, timeout_ms)); } void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const { assert(id.get()); CHECK(::rdma_resolve_route(id.get(), timeout_ms)); } void disconnect() const { assert(id.get()); CHECK(::rdma_disconnect(id.get())); } // TODO: Currently you can only connect with the default connection parameters void connect() { assert(id.get()); // Need to have a queue pair before we can connect ensureQueuePair(); ::rdma_conn_param p = DEFAULT_CONNECT_PARAM; CHECK(::rdma_connect(id.get(), &p)); } template void connect(const T* data) { assert(id.get()); // Need to have a queue pair before we can connect ensureQueuePair(); ::rdma_conn_param p = DEFAULT_CONNECT_PARAM; p.private_data = data; p.private_data_len = sizeof(T); CHECK(::rdma_connect(id.get(), &p)); } // TODO: Not sure how to default accept params - they come from the connection request // event template void accept(const ::rdma_conn_param& param, const T* data) { assert(id.get()); // Need to have a queue pair before we can accept ensureQueuePair(); ::rdma_conn_param p = param; p.private_data = data; p.private_data_len = sizeof(T); CHECK(::rdma_accept(id.get(), &p)); } void accept(const ::rdma_conn_param& param) { assert(id.get()); // Need to have a queue pair before we can accept ensureQueuePair(); ::rdma_conn_param p = param; p.private_data = 0; p.private_data_len = 0; CHECK(::rdma_accept(id.get(), &p)); } template void reject(const T* data) const { assert(id.get()); CHECK(::rdma_reject(id.get(), data, sizeof(T))); } void reject() const { assert(id.get()); CHECK(::rdma_reject(id.get(), 0, 0)); } QueuePair::intrusive_ptr getQueuePair() { assert(id.get()); ensureQueuePair(); return qp; } }; inline QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) : qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate), pd(allocPd(i->verbs)), cchannel(mkCChannel(i->verbs)), scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())), id(i), outstandingSendEvents(0), outstandingRecvEvents(0) { impl->fd = cchannel->fd; // Set cq context to this QueuePair object so we can find // ourselves again scq->cq_context = this; rcq->cq_context = this; ::ibv_qp_init_attr qp_attr = {}; // TODO: make a default struct for this qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES; qp_attr.cap.max_send_sge = 4; qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES; qp_attr.cap.max_recv_sge = 4; qp_attr.send_cq = scq.get(); qp_attr.recv_cq = rcq.get(); qp_attr.qp_type = IBV_QPT_RC; CHECK(::rdma_create_qp(id.get(), pd.get(), &qp_attr)); // Set the qp context to this so we can find ourselves again id->qp->qp_context = this; } inline QueuePair::~QueuePair() { if (outstandingSendEvents > 0) ::ibv_ack_cq_events(scq.get(), outstandingSendEvents); if (outstandingRecvEvents > 0) ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents); ::rdma_destroy_qp(id.get()); } inline void QueuePair::notifyRecv() { CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0)); } inline void QueuePair::notifySend() { CHECK_IBV(ibv_req_notify_cq(scq.get(), 0)); } inline void QueuePair::postRecv(Buffer* buf) { ::ibv_recv_wr rwr = {}; ::ibv_sge sge; sge.addr = (uintptr_t) buf->bytes+buf->dataStart; sge.length = buf->dataCount; sge.lkey = buf->mr->lkey; rwr.wr_id = reinterpret_cast(buf); rwr.sg_list = &sge; rwr.num_sge = 1; ::ibv_recv_wr* badrwr = 0; CHECK_IBV(::ibv_post_recv(id->qp, &rwr, &badrwr)); if (badrwr) throw std::logic_error("ibv_post_recv(): Bad rwr"); } inline void QueuePair::postSend(Buffer* buf) { ::ibv_send_wr swr = {}; ::ibv_sge sge; sge.addr = (uintptr_t) buf->bytes+buf->dataStart; sge.length = buf->dataCount; sge.lkey = buf->mr->lkey; swr.wr_id = reinterpret_cast(buf); swr.opcode = IBV_WR_SEND; swr.send_flags = IBV_SEND_SIGNALED; swr.sg_list = &sge; swr.num_sge = 1; ::ibv_send_wr* badswr = 0; CHECK_IBV(::ibv_post_send(id->qp, &swr, &badswr)); if (badswr) throw std::logic_error("ibv_post_send(): Bad swr"); } inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) : id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ? Connection::find(e->id) : new Connection(e->id)), listen_id(Connection::find(e->listen_id)), event(e, acker) {} } inline std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) { # define CHECK_TYPE(t) case t: o << #t; break; switch(t) { CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED) CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR) CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED) CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR) CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST) CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE) CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR) CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE) CHECK_TYPE(RDMA_CM_EVENT_REJECTED) CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED) CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED) CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL) CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN) CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR) } # undef CHECK_TYPE return o; } #endif // RDMA_WRAP_H