diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 06:16:19 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2008-09-11 06:16:19 +0000 |
| commit | 2e05b7082f5e387fc686925e5ac006485e4686db (patch) | |
| tree | b0a43e45da7cc24b65407ce6f7254e21b3fcde78 /cpp/src/qpid/sys/rdma/RdmaIO.cpp | |
| parent | 468b4b6ddaa3d96bb743cdbd27ded651eea31847 (diff) | |
| download | qpid-python-2e05b7082f5e387fc686925e5ac006485e4686db.tar.gz | |
Implementation of AMQP over RDMA protocols (Infiniband)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@694143 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 449 |
1 files changed, 370 insertions, 79 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index e7a5e7d5cb..dd4fbefcaf 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -20,13 +20,21 @@ */ #include "RdmaIO.h" +#include "qpid/log/Statement.h" + + #include <iostream> #include <boost/bind.hpp> +using qpid::sys::DispatchHandle; +using qpid::sys::Poller; + namespace Rdma { AsynchIO::AsynchIO( QueuePair::intrusive_ptr q, - int s, + int size, + int xCredit, + int rCount, ReadCallback rc, IdleCallback ic, FullCallback fc, @@ -34,10 +42,15 @@ namespace Rdma { ) : qp(q), dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0), - bufferSize(s), - recvBufferCount(DEFAULT_WR_ENTRIES), - xmitBufferCount(DEFAULT_WR_ENTRIES), + bufferSize(size), + recvCredit(0), + xmitCredit(xCredit), + recvBufferCount(rCount), + xmitBufferCount(xCredit), outstandingWrites(0), + closed(false), + deleting(false), + state(IDLE), readCallback(rc), idleCallback(ic), fullCallback(fc), @@ -57,72 +70,232 @@ namespace Rdma { } AsynchIO::~AsynchIO() { + // Warn if we are deleting whilst there are still unreclaimed write buffers + if ( outstandingWrites>0 ) + QPID_LOG(error, "RDMA: qp=" << qp << ": Deleting queue before all write buffers finished"); + + // Turn off callbacks (before doing the deletes) + dataHandle.stopWatch(); + // The buffers ptr_deque automatically deletes all the buffers we've allocated + // TODO: It might turn out to be more efficient in high connection loads to reuse the + // buffers rather than having to reregister them all the time (this would be straightforward if all + // connections haver the same buffer size and harder otherwise) } void AsynchIO::start(Poller::shared_ptr poller) { dataHandle.startWatch(poller); } - // TODO: Currently we don't prevent write buffer overrun we just advise - // when to stop writing. - void AsynchIO::queueWrite(Buffer* buff) { - qp->postSend(buff); - ++outstandingWrites; - if (outstandingWrites >= xmitBufferCount) { - fullCallback(*this); + // Mark for deletion/Delete this object when we have no outstanding writes + void AsynchIO::deferDelete() { + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + if (outstandingWrites > 0 || state != IDLE) { + deleting = true; + return; + } + state = DELETED; // Stop any read callback before the dataHandle.stopWatch() in the destructor } + delete this; } - void AsynchIO::notifyPendingWrite() { - // Just perform the idle callback (if possible) - if (outstandingWrites < xmitBufferCount) { - idleCallback(*this); + void AsynchIO::queueWrite(Buffer* buff) { + // Make sure we don't overrun our available buffers + // either at our end or the known available at the peers end + if (writable()) { + // TODO: We might want to batch up sending credit + if (recvCredit > 0) { + int creditSent = recvCredit & ~FlagsMask; + qp->postSend(creditSent, buff); + recvCredit -= creditSent; + } else { + qp->postSend(buff); + } + ++outstandingWrites; + --xmitCredit; + } else { + if (fullCallback) { + fullCallback(*this, buff); + } else { + QPID_LOG(error, "RDMA: qp=" << qp << ": Write queue full, but no callback, throwing buffer away"); + returnBuffer(buff); + } } } + // Mark now closed (so we don't accept any more writes or make any idle callbacks) void AsynchIO::queueWriteClose() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + closed = true; } - Buffer* AsynchIO::getBuffer() { - qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); - if (bufferQueue.empty()) { - Buffer* b = qp->createBuffer(bufferSize); - buffers.push_front(b); - b->dataCount = 0; - return b; - } else { - Buffer* b = bufferQueue.front(); - bufferQueue.pop_front(); - b->dataCount = 0; - b->dataStart = 0; - return b; + void AsynchIO::notifyPendingWrite() { + // As notifyPendingWrite can be called on an arbitrary thread it must check whether we are processing or not. + // If we are then we just return as we know that we will eventually do the idle callback anyway. + // + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // We can get here in any state (as the caller could be in any thread) + switch (state) { + case NOTIFY_WRITE: + case PENDING_NOTIFY: + // We only need to note a pending notify if we're already doing a notify as data processing + // is always followed by write notification processing + state = PENDING_NOTIFY; + return; + case PENDING_DATA: + return; + case DATA: + // Only need to return here as data processing will do the idleCallback itself anyway + return; + case IDLE: + state = NOTIFY_WRITE; + break; + case DELETED: + assert(state!=DELETED); + } } + + doWriteCallback(); + // Keep track of what we need to do so that we can release the lock + enum {COMPLETION, NOTIFY} action; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // If there was pending data whilst we were doing this, process it now + switch (state) { + case NOTIFY_WRITE: + state = IDLE; + return; + case PENDING_DATA: + action = COMPLETION; + break; + case PENDING_NOTIFY: + action = NOTIFY; + break; + default: + assert(state!=IDLE && state!=DATA && state!=DELETED); + return; + } + // Using NOTIFY_WRITE for both cases is a bit strange, but we're making sure we get the + // correct result if we reenter notifyPendingWrite(), in which case we want to + // end up in PENDING_NOTIFY (entering dataEvent doesn't matter as it only checks + // not IDLE) + state = NOTIFY_WRITE; + } + do { + // Note we only get here if we were in the PENDING_DATA or PENDING_NOTIFY state + // so that we do need to process completions or notifications now + switch (action) { + case COMPLETION: + processCompletions(); + case NOTIFY: + doWriteCallback(); + break; + } + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + switch (state) { + case NOTIFY_WRITE: + state = IDLE; + goto exit; + case PENDING_DATA: + action = COMPLETION; + break; + case PENDING_NOTIFY: + action = NOTIFY; + break; + default: + assert(state!=IDLE && state!=DATA && state!=DELETED); + return; + } + state = NOTIFY_WRITE; + } + } while (true); + exit: + // If we just processed completions we might need to delete ourselves + if (action == COMPLETION && deleting && outstandingWrites == 0) { + delete this; + } + } + + void AsynchIO::dataEvent(qpid::sys::DispatchHandle&) { + // Keep track of writable notifications + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + // We're already processing a notification + switch (state) { + case IDLE: + break; + default: + state = PENDING_DATA; + return; + } + // Can't get here in DATA state as that would violate the serialisation rules + assert( state==IDLE ); + state = DATA; + } + + processCompletions(); + + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + assert( state==DATA ); + state = NOTIFY_WRITE; + } + + do { + doWriteCallback(); + + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(stateLock); + if ( state==NOTIFY_WRITE ) { + state = IDLE; + break; + } + // Can't get DATA/PENDING_DATA here as dataEvent cannot be reentered + assert( state==PENDING_NOTIFY ); + state = NOTIFY_WRITE; + } + } while (true); + + // We might need to delete ourselves + if (deleting && outstandingWrites == 0) { + delete this; + } } - void AsynchIO::dataEvent(DispatchHandle&) { + void AsynchIO::processCompletions() { QueuePair::intrusive_ptr q = qp->getNextChannelEvent(); + // Re-enable notification for queue: + // This needs to happen before we could do anything that could generate more work completion + // events (ie the callbacks etc. in the following). + // This can't make us reenter this code as the handle attached to the completion queue will still be + // disabled by the poller until we leave this code + qp->notifyRecv(); + qp->notifySend(); + + int recvEvents = 0; + int sendEvents = 0; + // If no event do nothing if (!q) return; assert(q == qp); - // Re-enable notification for queue - qp->notifySend(); - qp->notifyRecv(); - // Repeat until no more events do { QueuePairEvent e(qp->getNextEvent()); if (!e) - return; + break; ::ibv_wc_status status = e.getEventStatus(); if (status != IBV_WC_SUCCESS) { errorCallback(*this); + // TODO: Probably need to flush queues at this point return; } @@ -131,46 +304,144 @@ namespace Rdma { Buffer* b = e.getBuffer(); QueueDirection dir = e.getDirection(); if (dir == RECV) { - readCallback(*this, b); + ++recvEvents; + + // Get our xmitCredit if it was sent + bool dataPresent = true; + if (e.immPresent() ) { + xmitCredit += (e.getImm() & ~FlagsMask); + dataPresent = ((e.getImm() & IgnoreData) == 0); + } + + // if there was no data sent then the message was only to update our credit + if ( dataPresent ) { + readCallback(*this, b); + } + // At this point the buffer has been consumed so put it back on the recv queue + b->dataStart = 0; + b->dataCount = 0; qp->postRecv(b); + + // Received another message + ++recvCredit; + + // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently) + if (recvCredit > recvBufferCount/2) { + // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message + // but this is a little unlikely, as to get in this state we have to have received messages without sending any + // for a while so its likely we've received an credit update from the far side. + if (writable()) { + Buffer* ob = getBuffer(); + // Have to send something as adapters hate it when you try to transfer 0 bytes + *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit); + ob->dataCount = sizeof(uint32_t); + + int creditSent = recvCredit & ~FlagsMask; + qp->postSend(creditSent | IgnoreData, ob); + recvCredit -= creditSent; + ++outstandingWrites; + --xmitCredit; + } else { + QPID_LOG(warning, "RDMA: qp=" << qp << ": Unable to send unsolicited credit"); + } + } } else { + ++sendEvents; { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); bufferQueue.push_front(b); } --outstandingWrites; - // TODO: maybe don't call idle unless we're low on write buffers - idleCallback(*this); } } while (true); + + // Not sure if this is expected or not + if (recvEvents == 0 && sendEvents == 0) { + QPID_LOG(debug, "RDMA: qp=" << qp << ": Got channel event with no recv/send completions"); + } + } + + void AsynchIO::doWriteCallback() { + // TODO: maybe don't call idle unless we're low on write buffers + // Keep on calling the idle routine as long as we are writable and we got something to write last call + while (writable()) { + int xc = xmitCredit; + idleCallback(*this); + // Check whether we actually wrote anything + if (xmitCredit == xc) { + QPID_LOG(debug, "RDMA: qp=" << qp << ": Called for data, but got none: xmitCredit=" << xmitCredit); + return; + } + } + } + + Buffer* AsynchIO::getBuffer() { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + if (bufferQueue.empty()) { + Buffer* b = qp->createBuffer(bufferSize); + buffers.push_front(b); + return b; + } else { + Buffer* b = bufferQueue.front(); + bufferQueue.pop_front(); + b->dataCount = 0; + b->dataStart = 0; + return b; + } + + } + + void AsynchIO::returnBuffer(Buffer* b) { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); + bufferQueue.push_front(b); + b->dataCount = 0; + b->dataStart = 0; } + ConnectionManager::ConnectionManager( + ErrorCallback errc, + DisconnectedCallback dc + ) : + ci(Connection::make()), + handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0), + errorCallback(errc), + disconnectedCallback(dc) + { + ci->nonblocking(); + } + + void ConnectionManager::start(Poller::shared_ptr poller) { + startConnection(ci); + handle.startWatch(poller); + } + + void ConnectionManager::event(DispatchHandle&) { + connectionEvent(ci); + } + Listener::Listener( const sockaddr& src, - ConnectedCallback cc, + const ConnectionParams& cp, + EstablishedCallback ec, ErrorCallback errc, DisconnectedCallback dc, ConnectionRequestCallback crc ) : + ConnectionManager(errc, dc), src_addr(src), - ci(Connection::make()), - handle(*ci, boost::bind(&Listener::connectionEvent, this, _1), 0, 0), - connectedCallback(cc), - errorCallback(errc), - disconnectedCallback(dc), - connectionRequestCallback(crc) + checkConnectionParams(cp), + connectionRequestCallback(crc), + establishedCallback(ec) { - ci->nonblocking(); } - void Listener::start(Poller::shared_ptr poller) { + void Listener::startConnection(Connection::intrusive_ptr ci) { ci->bind(src_addr); ci->listen(); - handle.startWatch(poller); } - void Listener::connectionEvent(DispatchHandle&) { + void Listener::connectionEvent(Connection::intrusive_ptr ci) { ConnectionEvent e(ci->getNextEvent()); // If (for whatever reason) there was no event do nothing @@ -181,65 +452,75 @@ namespace Rdma { // you get from CONNECT_REQUEST has the same context info // as its parent listening rdma_cm_id ::rdma_cm_event_type eventType = e.getEventType(); + ::rdma_conn_param conn_param = e.getConnectionParam(); Rdma::Connection::intrusive_ptr id = e.getConnection(); switch (eventType) { case RDMA_CM_EVENT_CONNECT_REQUEST: { - bool accept = true; - // Extract connection parameters and private data from event - ::rdma_conn_param conn_param = e.getConnectionParam(); + // Make sure peer has sent params we can use + if (!conn_param.private_data || conn_param.private_data_len < sizeof(ConnectionParams)) { + id->reject(); + break; + } + ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data); + // Reject if requested msg size is bigger than we allow + if (cp.maxRecvBufferSize > checkConnectionParams.maxRecvBufferSize) { + id->reject(&checkConnectionParams); + break; + } + + bool accept = true; if (connectionRequestCallback) - //TODO: pass private data to callback (and accept new private data for accept somehow) - accept = connectionRequestCallback(id); + accept = connectionRequestCallback(id, cp); + if (accept) { // Accept connection - id->accept(conn_param); + cp.initialXmitCredit = checkConnectionParams.initialXmitCredit; + id->accept(conn_param, &cp); } else { - //Reject connection + // Reject connection id->reject(); } - break; } case RDMA_CM_EVENT_ESTABLISHED: - connectedCallback(id); + establishedCallback(id); break; case RDMA_CM_EVENT_DISCONNECTED: disconnectedCallback(id); break; case RDMA_CM_EVENT_CONNECT_ERROR: - errorCallback(id); + errorCallback(id, CONNECT_ERROR); break; default: - std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; + // Unexpected response + errorCallback(id, UNKNOWN); + //std::cerr << "Warning: unexpected response to listen - " << eventType << "\n"; } } Connector::Connector( const sockaddr& dst, + const ConnectionParams& cp, ConnectedCallback cc, ErrorCallback errc, DisconnectedCallback dc, RejectedCallback rc ) : + ConnectionManager(errc, dc), dst_addr(dst), - ci(Connection::make()), - handle(*ci, boost::bind(&Connector::connectionEvent, this, _1), 0, 0), - connectedCallback(cc), - errorCallback(errc), - disconnectedCallback(dc), - rejectedCallback(rc) + connectionParams(cp), + rejectedCallback(rc), + connectedCallback(cc) { - ci->nonblocking(); } - void Connector::start(Poller::shared_ptr poller) { + void Connector::startConnection(Connection::intrusive_ptr ci) { ci->resolve_addr(dst_addr); - handle.startWatch(poller); } - void Connector::connectionEvent(DispatchHandle&) { + void Connector::connectionEvent(Connection::intrusive_ptr ci) { ConnectionEvent e(ci->getNextEvent()); // If (for whatever reason) there was no event do nothing @@ -247,6 +528,8 @@ namespace Rdma { return; ::rdma_cm_event_type eventType = e.getEventType(); + ::rdma_conn_param conn_param = e.getConnectionParam(); + Rdma::Connection::intrusive_ptr id = e.getConnection(); switch (eventType) { case RDMA_CM_EVENT_ADDR_RESOLVED: // RESOLVE_ADDR @@ -254,38 +537,46 @@ namespace Rdma { break; case RDMA_CM_EVENT_ADDR_ERROR: // RESOLVE_ADDR - errorCallback(ci); + errorCallback(ci, ADDR_ERROR); break; case RDMA_CM_EVENT_ROUTE_RESOLVED: // RESOLVE_ROUTE: - ci->connect(); + ci->connect(&connectionParams); break; case RDMA_CM_EVENT_ROUTE_ERROR: // RESOLVE_ROUTE: - errorCallback(ci); + errorCallback(ci, ROUTE_ERROR); break; case RDMA_CM_EVENT_CONNECT_ERROR: // CONNECTING - errorCallback(ci); + errorCallback(ci, CONNECT_ERROR); break; case RDMA_CM_EVENT_UNREACHABLE: // CONNECTING - errorCallback(ci); + errorCallback(ci, UNREACHABLE); break; - case RDMA_CM_EVENT_REJECTED: + case RDMA_CM_EVENT_REJECTED: { // CONNECTING - rejectedCallback(ci); + // Extract private data from event + assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams)); + ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data); + rejectedCallback(ci, cp); break; - case RDMA_CM_EVENT_ESTABLISHED: + } + case RDMA_CM_EVENT_ESTABLISHED: { // CONNECTING - connectedCallback(ci); + // Extract private data from event + assert(conn_param.private_data && conn_param.private_data_len >= sizeof(ConnectionParams)); + ConnectionParams cp = *static_cast<const ConnectionParams*>(conn_param.private_data); + connectedCallback(ci, cp); break; + } case RDMA_CM_EVENT_DISCONNECTED: // ESTABLISHED disconnectedCallback(ci); break; default: - std::cerr << "Warning: unexpected event in connect: " << eventType << "\n"; + QPID_LOG(warning, "RDMA: Unexpected event in connect: " << eventType); } } } |
