diff options
Diffstat (limited to 'cpp/src/qpid/sys/rdma')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.cpp | 20 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 12 |
2 files changed, 16 insertions, 16 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/cpp/src/qpid/sys/rdma/RdmaIO.cpp index 77e766dd79..0ca4c5e259 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.cpp +++ b/cpp/src/qpid/sys/rdma/RdmaIO.cpp @@ -68,7 +68,7 @@ namespace Rdma { b->dataCount = b->byteCount; qp->postRecv(b); } - + for (int i = 0; i<xmitBufferCount; ++i) { // Allocate xmit buffer Buffer* b = qp->createBuffer(bufferSize); @@ -86,7 +86,7 @@ namespace Rdma { // Turn off callbacks (before doing the deletes) dataHandle.stopWatch(); - + // The buffers ptr_deque automatically deletes all the buffers we've allocated // TODO: It might turn out to be more efficient in high connection loads to reuse the // buffers rather than having to reregister them all the time (this would be straightforward if all @@ -189,7 +189,7 @@ namespace Rdma { if (doReturn) { return; } - + doWriteCallback(); // Keep track of what we need to do so that we can release the lock @@ -317,7 +317,7 @@ namespace Rdma { // disabled by the poller until we leave this code qp->notifyRecv(); qp->notifySend(); - + int recvEvents = 0; int sendEvents = 0; @@ -353,7 +353,7 @@ namespace Rdma { xmitCredit += (e.getImm() & ~FlagsMask); dataPresent = ((e.getImm() & IgnoreData) == 0); } - + // if there was no data sent then the message was only to update our credit if ( dataPresent ) { readCallback(*this, b); @@ -366,7 +366,7 @@ namespace Rdma { // Received another message ++recvCredit; - + // Send recvCredit if it is large enough (it will have got this large because we've not sent anything recently) if (recvCredit > recvBufferCount/2) { // TODO: This should use RDMA write with imm as there might not ever be a buffer to receive this message @@ -377,7 +377,7 @@ namespace Rdma { // Have to send something as adapters hate it when you try to transfer 0 bytes *reinterpret_cast< uint32_t* >(ob->bytes) = htonl(recvCredit); ob->dataCount = sizeof(uint32_t); - + int creditSent = recvCredit & ~FlagsMask; qp->postSend(creditSent | IgnoreData, ob); recvCredit -= creditSent; @@ -426,7 +426,7 @@ namespace Rdma { b->dataStart = 0; return b; } - + void AsynchIO::returnBuffer(Buffer* b) { qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock); bufferQueue.push_front(b); @@ -445,7 +445,7 @@ namespace Rdma { { ci->nonblocking(); } - + void ConnectionManager::start(Poller::shared_ptr poller) { startConnection(ci); handle.startWatch(poller); @@ -454,7 +454,7 @@ namespace Rdma { void ConnectionManager::event(DispatchHandle&) { connectionEvent(ci); } - + Listener::Listener( const sockaddr& src, const ConnectionParams& cp, diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 577c22d053..410ea42884 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -111,7 +111,7 @@ namespace Rdma { inline bool AsynchIO::writable() const { return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0); } - + inline int AsynchIO::incompletedWrites() const { return outstandingWrites; } @@ -125,7 +125,7 @@ namespace Rdma { struct ConnectionParams { int maxRecvBufferSize; int initialXmitCredit ; - + ConnectionParams(int s, int c) : maxRecvBufferSize(s), initialXmitCredit(c) @@ -142,7 +142,7 @@ namespace Rdma { typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, ErrorType> ErrorCallback; typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback; - + class ConnectionManager { Connection::intrusive_ptr ci; qpid::sys::DispatchHandle handle; @@ -150,13 +150,13 @@ namespace Rdma { protected: ErrorCallback errorCallback; DisconnectedCallback disconnectedCallback; - + public: ConnectionManager( ErrorCallback errc, DisconnectedCallback dc ); - + virtual ~ConnectionManager() {} void start(qpid::sys::Poller::shared_ptr poller); @@ -167,7 +167,7 @@ namespace Rdma { virtual void startConnection(Connection::intrusive_ptr ci) = 0; virtual void connectionEvent(Connection::intrusive_ptr ci) = 0; }; - + typedef boost::function2<bool, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectionRequestCallback; typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> EstablishedCallback; |
