1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
|
#include "RdmaIO.h"
#include <iostream>
#include <boost/bind.hpp>
namespace Rdma {
AsynchIO::AsynchIO(
QueuePair::intrusive_ptr q,
int s,
ReadCallback rc,
IdleCallback ic,
FullCallback fc,
ErrorCallback ec
) :
qp(q),
dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
bufferSize(s),
recvBufferCount(DEFAULT_WR_ENTRIES),
xmitBufferCount(DEFAULT_WR_ENTRIES),
outstandingWrites(0),
readCallback(rc),
idleCallback(ic),
fullCallback(fc),
errorCallback(ec)
{
qp->nonblocking();
qp->notifyRecv();
qp->notifySend();
// Prepost some recv buffers before we go any further
for (int i = 0; i<recvBufferCount; ++i) {
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
b->dataCount = b->byteCount;
qp->postRecv(b);
}
}
AsynchIO::~AsynchIO() {
// The buffers ptr_deque automatically deletes all the buffers we've allocated
}
void AsynchIO::start(Poller::shared_ptr poller) {
dataHandle.startWatch(poller);
}
// TODO: Currently we don't prevent write buffer overrun we just advise
// when to stop writing.
void AsynchIO::queueWrite(Buffer* buff) {
qp->postSend(buff);
++outstandingWrites;
if (outstandingWrites >= xmitBufferCount) {
fullCallback(*this);
}
}
void AsynchIO::notifyPendingWrite() {
// Just perform the idle callback (if possible)
if (outstandingWrites < xmitBufferCount) {
idleCallback(*this);
}
}
void AsynchIO::queueWriteClose() {
}
Buffer* AsynchIO::getBuffer() {
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
if (bufferQueue.empty()) {
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
b->dataCount = 0;
return b;
} else {
Buffer* b = bufferQueue.front();
bufferQueue.pop_front();
b->dataCount = 0;
b->dataStart = 0;
return b;
}
}
void AsynchIO::dataEvent(DispatchHandle&) {
QueuePair::intrusive_ptr q = qp->getNextChannelEvent();
// If no event do nothing
if (!q)
return;
assert(q == qp);
// Re-enable notification for queue
qp->notifySend();
qp->notifyRecv();
// Repeat until no more events
do {
QueuePairEvent e(qp->getNextEvent());
if (!e)
return;
::ibv_wc_status status = e.getEventStatus();
if (status != IBV_WC_SUCCESS) {
errorCallback(*this);
return;
}
// Test if recv (or recv with imm)
//::ibv_wc_opcode eventType = e.getEventType();
Buffer* b = e.getBuffer();
QueueDirection dir = e.getDirection();
if (dir == RECV) {
readCallback(*this, b);
// At this point the buffer has been consumed so put it back on the recv queue
qp->postRecv(b);
} else {
{
qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
bufferQueue.push_front(b);
}
--outstandingWrites;
// TODO: maybe don't call idle unless we're low on write buffers
idleCallback(*this);
}
} while (true);
}
Listener::Listener(
const sockaddr& src,
ConnectedCallback cc,
ErrorCallback errc,
DisconnectedCallback dc,
ConnectionRequestCallback crc
) :
src_addr(src),
ci(Connection::make()),
handle(*ci, boost::bind(&Listener::connectionEvent, this, _1), 0, 0),
connectedCallback(cc),
errorCallback(errc),
disconnectedCallback(dc),
connectionRequestCallback(crc)
{
ci->nonblocking();
}
void Listener::start(Poller::shared_ptr poller) {
ci->bind(src_addr);
ci->listen();
handle.startWatch(poller);
}
void Listener::connectionEvent(DispatchHandle&) {
ConnectionEvent e(ci->getNextEvent());
// If (for whatever reason) there was no event do nothing
if (!e)
return;
// Important documentation ommision the new rdma_cm_id
// you get from CONNECT_REQUEST has the same context info
// as its parent listening rdma_cm_id
::rdma_cm_event_type eventType = e.getEventType();
Rdma::Connection::intrusive_ptr id = e.getConnection();
switch (eventType) {
case RDMA_CM_EVENT_CONNECT_REQUEST: {
bool accept = true;
// Extract connection parameters and private data from event
::rdma_conn_param conn_param = e.getConnectionParam();
if (connectionRequestCallback)
//TODO: pass private data to callback (and accept new private data for accept somehow)
accept = connectionRequestCallback(id);
if (accept) {
// Accept connection
id->accept(conn_param);
} else {
//Reject connection
id->reject();
}
break;
}
case RDMA_CM_EVENT_ESTABLISHED:
connectedCallback(id);
break;
case RDMA_CM_EVENT_DISCONNECTED:
disconnectedCallback(id);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
errorCallback(id);
break;
default:
std::cerr << "Warning: unexpected response to listen - " << eventType << "\n";
}
}
Connector::Connector(
const sockaddr& dst,
ConnectedCallback cc,
ErrorCallback errc,
DisconnectedCallback dc,
RejectedCallback rc
) :
dst_addr(dst),
ci(Connection::make()),
handle(*ci, boost::bind(&Connector::connectionEvent, this, _1), 0, 0),
connectedCallback(cc),
errorCallback(errc),
disconnectedCallback(dc),
rejectedCallback(rc)
{
ci->nonblocking();
}
void Connector::start(Poller::shared_ptr poller) {
ci->resolve_addr(dst_addr);
handle.startWatch(poller);
}
void Connector::connectionEvent(DispatchHandle&) {
ConnectionEvent e(ci->getNextEvent());
// If (for whatever reason) there was no event do nothing
if (!e)
return;
::rdma_cm_event_type eventType = e.getEventType();
switch (eventType) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
// RESOLVE_ADDR
ci->resolve_route();
break;
case RDMA_CM_EVENT_ADDR_ERROR:
// RESOLVE_ADDR
errorCallback(ci);
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
// RESOLVE_ROUTE:
ci->connect();
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
// RESOLVE_ROUTE:
errorCallback(ci);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
// CONNECTING
errorCallback(ci);
break;
case RDMA_CM_EVENT_UNREACHABLE:
// CONNECTING
errorCallback(ci);
break;
case RDMA_CM_EVENT_REJECTED:
// CONNECTING
rejectedCallback(ci);
break;
case RDMA_CM_EVENT_ESTABLISHED:
// CONNECTING
connectedCallback(ci);
break;
case RDMA_CM_EVENT_DISCONNECTED:
// ESTABLISHED
disconnectedCallback(ci);
break;
default:
std::cerr << "Warning: unexpected event in connect: " << eventType << "\n";
}
}
}
|