diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-04-29 22:46:23 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-04-29 22:46:23 +0000 |
| commit | c86a77f2ce6150ce8fc0770604d92502acd996b8 (patch) | |
| tree | 77e5057ee8f96ffe07eb5abfac2d4c06c21823d6 /cpp/src/qpid/sys/rdma/RdmaIO.cpp | |
| parent | 0c8f372f71409444dd9f3bc38c481c1ec6ba4827 (diff) | |
| download | qpid-python-c86a77f2ce6150ce8fc0770604d92502acd996b8.tar.gz | |
More RDMA Work in Progress
Changes to client buffering
Buffering improvement to server
Removed unused state machine from RdmaIO code
Move the write throttling due to limited write buffers into the RdmaIO code
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@652180 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 126 |
1 files changed, 23 insertions, 103 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 31d109ea4d..755d6f17c4 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -9,14 +9,18 @@ namespace Rdma { int s, ReadCallback rc, IdleCallback ic, + FullCallback fc, ErrorCallback ec ) : qp(q), dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0), bufferSize(s), recvBufferCount(DEFAULT_WR_ENTRIES), + xmitBufferCount(DEFAULT_WR_ENTRIES), + outstandingWrites(0), readCallback(rc), idleCallback(ic), + fullCallback(fc), errorCallback(ec) { qp->nonblocking(); @@ -40,20 +44,28 @@ namespace Rdma { dataHandle.startWatch(poller); } - void AsynchIO::queueReadBuffer(Buffer*) { - } - + // TODO: Currently we don't prevent write buffer overrun we just advise + // when to stop writing. void AsynchIO::queueWrite(Buffer* buff) { qp->postSend(buff); + ++outstandingWrites; + if (outstandingWrites >= xmitBufferCount) { + fullCallback(*this); + } } void AsynchIO::notifyPendingWrite() { + // Just perform the idle callback (if possible) + if (outstandingWrites < xmitBufferCount) { + idleCallback(*this); + } } void AsynchIO::queueWriteClose() { } Buffer* AsynchIO::getBuffer() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); if (bufferQueue.empty()) { Buffer* b = qp->createBuffer(bufferSize); buffers.push_front(b); @@ -103,7 +115,12 @@ namespace Rdma { // At this point the buffer has been consumed so put it back on the recv queue qp->postRecv(b); } else { + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); bufferQueue.push_front(b); + } + --outstandingWrites; + // TODO: maybe don't call idle unless we're low on write buffers idleCallback(*this); } } while (true); @@ -122,8 +139,7 @@ namespace Rdma { connectedCallback(cc), errorCallback(errc), disconnectedCallback(dc), - connectionRequestCallback(crc), - state(IDLE) + connectionRequestCallback(crc) { ci->nonblocking(); } @@ -131,7 +147,6 @@ namespace Rdma { void Listener::start(Poller::shared_ptr poller) { ci->bind(src_addr); ci->listen(); - state = LISTENING; handle.startWatch(poller); } @@ -194,15 +209,13 @@ namespace Rdma { connectedCallback(cc), errorCallback(errc), disconnectedCallback(dc), - rejectedCallback(rc), - state(IDLE) + rejectedCallback(rc) { ci->nonblocking(); } void Connector::start(Poller::shared_ptr poller) { ci->resolve_addr(dst_addr); - state = RESOLVE_ADDR; handle.startWatch(poller); } @@ -214,138 +227,45 @@ namespace Rdma { return; ::rdma_cm_event_type eventType = e.getEventType(); -#if 1 switch (eventType) { case RDMA_CM_EVENT_ADDR_RESOLVED: // RESOLVE_ADDR - state = RESOLVE_ROUTE; ci->resolve_route(); break; case RDMA_CM_EVENT_ADDR_ERROR: // RESOLVE_ADDR - state = ERROR; errorCallback(ci); break; case RDMA_CM_EVENT_ROUTE_RESOLVED: // RESOLVE_ROUTE: - state = CONNECTING; ci->connect(); break; case RDMA_CM_EVENT_ROUTE_ERROR: // RESOLVE_ROUTE: - state = ERROR; errorCallback(ci); break; case RDMA_CM_EVENT_CONNECT_ERROR: // CONNECTING - state = ERROR; errorCallback(ci); break; case RDMA_CM_EVENT_UNREACHABLE: // CONNECTING - state = ERROR; errorCallback(ci); break; case RDMA_CM_EVENT_REJECTED: // CONNECTING - state = REJECTED; rejectedCallback(ci); break; case RDMA_CM_EVENT_ESTABLISHED: // CONNECTING - state = ESTABLISHED; connectedCallback(ci); break; case RDMA_CM_EVENT_DISCONNECTED: // ESTABLISHED - state = DISCONNECTED; disconnectedCallback(ci); break; default: - std::cerr << "Warning: unexpected event in " << state << " state - " << eventType << "\n"; - state = ERROR; - } -#else - switch (state) { - case IDLE: - std::cerr << "Warning: event in IDLE state\n"; - break; - case RESOLVE_ADDR: - switch (eventType) { - case RDMA_CM_EVENT_ADDR_RESOLVED: - state = RESOLVE_ROUTE; - ci->resolve_route(); - break; - case RDMA_CM_EVENT_ADDR_ERROR: - state = ERROR; - errorCallback(ci); - break; - default: - state = ERROR; - std::cerr << "Warning: unexpected response to resolve_addr - " << eventType << "\n"; - } - break; - case RESOLVE_ROUTE: - switch (eventType) { - case RDMA_CM_EVENT_ROUTE_RESOLVED: - state = CONNECTING; - ci->connect(); - break; - case RDMA_CM_EVENT_ROUTE_ERROR: - state = ERROR; - errorCallback(ci); - break; - default: - state = ERROR; - std::cerr << "Warning: unexpected response to resolve_route - " << eventType << "\n"; - } - break; - case CONNECTING: - switch (eventType) { - case RDMA_CM_EVENT_CONNECT_RESPONSE: - std::cerr << "connect_response\n"; - break; - case RDMA_CM_EVENT_CONNECT_ERROR: - state = ERROR; - errorCallback(ci); - break; - case RDMA_CM_EVENT_UNREACHABLE: - state = ERROR; - errorCallback(ci); - break; - case RDMA_CM_EVENT_REJECTED: - state = REJECTED; - rejectedCallback(ci); - break; - case RDMA_CM_EVENT_ESTABLISHED: - state = ESTABLISHED; - connectedCallback(ci); - break; - default: - state = ERROR; - std::cerr << "Warning: unexpected response to connect - " << eventType << "\n"; - } - break; - case ESTABLISHED: - switch (eventType) { - case RDMA_CM_EVENT_DISCONNECTED: - disconnectedCallback(ci); - break; - default: - std::cerr << "Warning: unexpected event in ESTABLISHED state - " << eventType << "\n"; - } - break; - case REJECTED: - std::cerr << "Warning: event in REJECTED state - " << eventType << "\n"; - break; - case ERROR: - std::cerr << "Warning: event in ERROR state - " << eventType << "\n"; - break; - case LISTENING: - case ACCEPTING: - std::cerr << "Warning: in an illegal state (and received event!) - " << eventType << "\n"; - break; + std::cerr << "Warning: unexpected event in connect: " << eventType << "\n"; } -#endif } } |
