1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
#include "rdma_wrap.h"
namespace Rdma {
const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
0, // .private_data
0, // .private_data_len
4, // .responder_resources
4, // .initiator_depth
0, // .flow_control
5, // .retry_count
7 // .rnr_retry_count
};
::rdma_conn_param ConnectionEvent::getConnectionParam() const {
// It's badly documented, but it seems from the librdma source code that all the following
// event types have a valid param.conn
switch (event->event) {
case RDMA_CM_EVENT_CONNECT_REQUEST:
case RDMA_CM_EVENT_ESTABLISHED:
case RDMA_CM_EVENT_REJECTED:
case RDMA_CM_EVENT_DISCONNECTED:
case RDMA_CM_EVENT_CONNECT_ERROR:
return event->param.conn;
default:
::rdma_conn_param p = {};
return p;
}
}
QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
pd(allocPd(i->verbs)),
cchannel(mkCChannel(i->verbs)),
scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
outstandingSendEvents(0),
outstandingRecvEvents(0)
{
impl->fd = cchannel->fd;
// Set cq context to this QueuePair object so we can find
// ourselves again
scq->cq_context = this;
rcq->cq_context = this;
::ibv_qp_init_attr qp_attr = {};
// TODO: make a default struct for this
qp_attr.cap.max_send_wr = DEFAULT_WR_ENTRIES;
qp_attr.cap.max_send_sge = 4;
qp_attr.cap.max_recv_wr = DEFAULT_WR_ENTRIES;
qp_attr.cap.max_recv_sge = 4;
qp_attr.send_cq = scq.get();
qp_attr.recv_cq = rcq.get();
qp_attr.qp_type = IBV_QPT_RC;
CHECK(::rdma_create_qp(i.get(), pd.get(), &qp_attr));
qp = boost::shared_ptr< ::ibv_qp >(i->qp, destroyQp);
// Set the qp context to this so we can find ourselves again
qp->qp_context = this;
}
QueuePair::~QueuePair() {
if (outstandingSendEvents > 0)
::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
if (outstandingRecvEvents > 0)
::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
// Reset back pointer in case someone else has the qp
qp->qp_context = 0;
}
void QueuePair::postRecv(Buffer* buf) {
::ibv_recv_wr rwr = {};
::ibv_sge sge;
sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
sge.length = buf->dataCount;
sge.lkey = buf->mr->lkey;
rwr.wr_id = reinterpret_cast<uint64_t>(buf);
rwr.sg_list = &sge;
rwr.num_sge = 1;
::ibv_recv_wr* badrwr = 0;
CHECK_IBV(::ibv_post_recv(qp.get(), &rwr, &badrwr));
if (badrwr)
throw std::logic_error("ibv_post_recv(): Bad rwr");
}
void QueuePair::postSend(Buffer* buf) {
::ibv_send_wr swr = {};
::ibv_sge sge;
sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
sge.length = buf->dataCount;
sge.lkey = buf->mr->lkey;
swr.wr_id = reinterpret_cast<uint64_t>(buf);
swr.opcode = IBV_WR_SEND;
swr.send_flags = IBV_SEND_SIGNALED;
swr.sg_list = &sge;
swr.num_sge = 1;
::ibv_send_wr* badswr = 0;
CHECK_IBV(::ibv_post_send(qp.get(), &swr, &badswr));
if (badswr)
throw std::logic_error("ibv_post_send(): Bad swr");
}
void QueuePair::postSend(uint32_t imm, Buffer* buf) {
::ibv_send_wr swr = {};
::ibv_sge sge;
sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
sge.length = buf->dataCount;
sge.lkey = buf->mr->lkey;
swr.send_flags = IBV_SEND_SIGNALED;
swr.wr_id = reinterpret_cast<uint64_t>(buf);
swr.imm_data = htonl(imm);
swr.opcode = IBV_WR_SEND_WITH_IMM;
swr.sg_list = &sge;
swr.num_sge = 1;
::ibv_send_wr* badswr = 0;
CHECK_IBV(::ibv_post_send(qp.get(), &swr, &badswr));
if (badswr)
throw std::logic_error("ibv_post_send(): Bad swr");
}
}
std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
# define CHECK_TYPE(t) case t: o << #t; break;
switch(t) {
CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
}
# undef CHECK_TYPE
return o;
}
|