summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/RdmaIO.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2008-04-29 22:46:23 +0000
committerAndrew Stitcher <astitcher@apache.org>2008-04-29 22:46:23 +0000
commitc86a77f2ce6150ce8fc0770604d92502acd996b8 (patch)
tree77e5057ee8f96ffe07eb5abfac2d4c06c21823d6 /cpp/src/qpid/sys/rdma/RdmaIO.cpp
parent0c8f372f71409444dd9f3bc38c481c1ec6ba4827 (diff)
downloadqpid-python-c86a77f2ce6150ce8fc0770604d92502acd996b8.tar.gz
More RDMA Work in Progress
Changes to client buffering Buffering improvement to server Removed unused state machine from RdmaIO code Move the write throttling due to limited write buffers into the RdmaIO code git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652180 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.cpp')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.cpp126
1 files changed, 23 insertions, 103 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index 31d109ea4d..755d6f17c4 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -9,14 +9,18 @@ namespace Rdma {
int s,
ReadCallback rc,
IdleCallback ic,
+ FullCallback fc,
ErrorCallback ec
) :
qp(q),
dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
bufferSize(s),
recvBufferCount(DEFAULT_WR_ENTRIES),
+ xmitBufferCount(DEFAULT_WR_ENTRIES),
+ outstandingWrites(0),
readCallback(rc),
idleCallback(ic),
+ fullCallback(fc),
errorCallback(ec)
{
qp->nonblocking();
@@ -40,20 +44,28 @@ namespace Rdma {
dataHandle.startWatch(poller);
}
- void AsynchIO::queueReadBuffer(Buffer*) {
- }
-
+ // TODO: Currently we don't prevent write buffer overrun we just advise
+ // when to stop writing.
void AsynchIO::queueWrite(Buffer* buff) {
qp->postSend(buff);
+ ++outstandingWrites;
+ if (outstandingWrites >= xmitBufferCount) {
+ fullCallback(*this);
+ }
}
void AsynchIO::notifyPendingWrite() {
+ // Just perform the idle callback (if possible)
+ if (outstandingWrites < xmitBufferCount) {
+ idleCallback(*this);
+ }
}
void AsynchIO::queueWriteClose() {
}
Buffer* AsynchIO::getBuffer() {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
if (bufferQueue.empty()) {
Buffer* b = qp->createBuffer(bufferSize);
buffers.push_front(b);
@@ -103,7 +115,12 @@ namespace Rdma {
// At this point the buffer has been consumed so put it back on the recv queue
qp->postRecv(b);
} else {
+ {
+ qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
bufferQueue.push_front(b);
+ }
+ --outstandingWrites;
+ // TODO: maybe don't call idle unless we're low on write buffers
idleCallback(*this);
}
} while (true);
@@ -122,8 +139,7 @@ namespace Rdma {
connectedCallback(cc),
errorCallback(errc),
disconnectedCallback(dc),
- connectionRequestCallback(crc),
- state(IDLE)
+ connectionRequestCallback(crc)
{
ci->nonblocking();
}
@@ -131,7 +147,6 @@ namespace Rdma {
void Listener::start(Poller::shared_ptr poller) {
ci->bind(src_addr);
ci->listen();
- state = LISTENING;
handle.startWatch(poller);
}
@@ -194,15 +209,13 @@ namespace Rdma {
connectedCallback(cc),
errorCallback(errc),
disconnectedCallback(dc),
- rejectedCallback(rc),
- state(IDLE)
+ rejectedCallback(rc)
{
ci->nonblocking();
}
void Connector::start(Poller::shared_ptr poller) {
ci->resolve_addr(dst_addr);
- state = RESOLVE_ADDR;
handle.startWatch(poller);
}
@@ -214,138 +227,45 @@ namespace Rdma {
return;
::rdma_cm_event_type eventType = e.getEventType();
-#if 1
switch (eventType) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
// RESOLVE_ADDR
- state = RESOLVE_ROUTE;
ci->resolve_route();
break;
case RDMA_CM_EVENT_ADDR_ERROR:
// RESOLVE_ADDR
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
// RESOLVE_ROUTE:
- state = CONNECTING;
ci->connect();
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
// RESOLVE_ROUTE:
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
// CONNECTING
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_UNREACHABLE:
// CONNECTING
- state = ERROR;
errorCallback(ci);
break;
case RDMA_CM_EVENT_REJECTED:
// CONNECTING
- state = REJECTED;
rejectedCallback(ci);
break;
case RDMA_CM_EVENT_ESTABLISHED:
// CONNECTING
- state = ESTABLISHED;
connectedCallback(ci);
break;
case RDMA_CM_EVENT_DISCONNECTED:
// ESTABLISHED
- state = DISCONNECTED;
disconnectedCallback(ci);
break;
default:
- std::cerr << "Warning: unexpected event in " << state << " state - " << eventType << "\n";
- state = ERROR;
- }
-#else
- switch (state) {
- case IDLE:
- std::cerr << "Warning: event in IDLE state\n";
- break;
- case RESOLVE_ADDR:
- switch (eventType) {
- case RDMA_CM_EVENT_ADDR_RESOLVED:
- state = RESOLVE_ROUTE;
- ci->resolve_route();
- break;
- case RDMA_CM_EVENT_ADDR_ERROR:
- state = ERROR;
- errorCallback(ci);
- break;
- default:
- state = ERROR;
- std::cerr << "Warning: unexpected response to resolve_addr - " << eventType << "\n";
- }
- break;
- case RESOLVE_ROUTE:
- switch (eventType) {
- case RDMA_CM_EVENT_ROUTE_RESOLVED:
- state = CONNECTING;
- ci->connect();
- break;
- case RDMA_CM_EVENT_ROUTE_ERROR:
- state = ERROR;
- errorCallback(ci);
- break;
- default:
- state = ERROR;
- std::cerr << "Warning: unexpected response to resolve_route - " << eventType << "\n";
- }
- break;
- case CONNECTING:
- switch (eventType) {
- case RDMA_CM_EVENT_CONNECT_RESPONSE:
- std::cerr << "connect_response\n";
- break;
- case RDMA_CM_EVENT_CONNECT_ERROR:
- state = ERROR;
- errorCallback(ci);
- break;
- case RDMA_CM_EVENT_UNREACHABLE:
- state = ERROR;
- errorCallback(ci);
- break;
- case RDMA_CM_EVENT_REJECTED:
- state = REJECTED;
- rejectedCallback(ci);
- break;
- case RDMA_CM_EVENT_ESTABLISHED:
- state = ESTABLISHED;
- connectedCallback(ci);
- break;
- default:
- state = ERROR;
- std::cerr << "Warning: unexpected response to connect - " << eventType << "\n";
- }
- break;
- case ESTABLISHED:
- switch (eventType) {
- case RDMA_CM_EVENT_DISCONNECTED:
- disconnectedCallback(ci);
- break;
- default:
- std::cerr << "Warning: unexpected event in ESTABLISHED state - " << eventType << "\n";
- }
- break;
- case REJECTED:
- std::cerr << "Warning: event in REJECTED state - " << eventType << "\n";
- break;
- case ERROR:
- std::cerr << "Warning: event in ERROR state - " << eventType << "\n";
- break;
- case LISTENING:
- case ACCEPTING:
- std::cerr << "Warning: in an illegal state (and received event!) - " << eventType << "\n";
- break;
+ std::cerr << "Warning: unexpected event in connect: " << eventType << "\n";
}
-#endif
}
}