diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2010-05-18 21:41:25 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2010-05-18 21:41:25 +0000 |
| commit | 27920d16e70f590b49548877a2129a1d2162d985 (patch) | |
| tree | e3b90b82b63c89ee03317d129192299074fae15f /cpp/src/qpid/sys/rdma/RdmaIO.h | |
| parent | bbf31a9b3113ad6d37ed24d2ce767dd5f830afa3 (diff) | |
| download | qpid-python-27920d16e70f590b49548877a2129a1d2162d985.tar.gz | |
Fix RDMA for upstream changes which now require notification on shutdown
differently from before
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945904 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.h')
| -rw-r--r-- | cpp/src/qpid/sys/rdma/RdmaIO.h | 27 |
1 files changed, 14 insertions, 13 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h index 711685031c..0b86461465 100644 --- a/cpp/src/qpid/sys/rdma/RdmaIO.h +++ b/cpp/src/qpid/sys/rdma/RdmaIO.h @@ -43,8 +43,9 @@ namespace Rdma { { typedef boost::function1<void, AsynchIO&> ErrorCallback; typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback; - typedef boost::function1<void, AsynchIO&> IdleCallback; - typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback; + typedef boost::function1<void, AsynchIO&> IdleCallback; + typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback; + typedef boost::function1<void, AsynchIO&> NotifyCallback; QueuePair::intrusive_ptr qp; qpid::sys::DispatchHandleRef dataHandle; @@ -54,9 +55,8 @@ namespace Rdma { int recvBufferCount; int xmitBufferCount; int outstandingWrites; - bool closed; // TODO: Perhaps (probably) this state can be merged with the following... - bool deleting; // TODO: Perhaps (probably) this state can be merged with the following... - enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED }; + bool draining; // TODO: Perhaps (probably) this state can be merged with the following... + enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN }; qpid::sys::AtomicValue<State> state; //qpid::sys::Mutex stateLock; std::deque<Buffer*> bufferQueue; @@ -67,6 +67,7 @@ namespace Rdma { IdleCallback idleCallback; FullCallback fullCallback; ErrorCallback errorCallback; + NotifyCallback notifyCallback; public: // TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use @@ -82,22 +83,20 @@ namespace Rdma { FullCallback fc, ErrorCallback ec ); + ~AsynchIO(); void start(qpid::sys::Poller::shared_ptr poller); bool writable() const; bool bufferAvailable() const; void queueWrite(Buffer* buff); void notifyPendingWrite(); - void queueWriteClose(); - void deferDelete(); + void drainWriteQueue(NotifyCallback); + void stop(NotifyCallback); int incompletedWrites() const; Buffer* getBuffer(); void returnBuffer(Buffer*); private: - // Don't let anyone else delete us to make sure there can't be outstanding callbacks - ~AsynchIO(); - // Constants for the peer-peer command messages // These are sent in the high bits if the imm data of an rdma message // The low bits are used to send the credit @@ -107,10 +106,12 @@ namespace Rdma { void dataEvent(qpid::sys::DispatchHandle& handle); void processCompletions(); void doWriteCallback(); + void doStoppedCallback(); + void doDrainedCallback(); }; inline bool AsynchIO::writable() const { - return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0); + return (!draining && outstandingWrites < xmitBufferCount && xmitCredit > 0); } inline int AsynchIO::incompletedWrites() const { @@ -146,7 +147,7 @@ namespace Rdma { class ConnectionManager { Connection::intrusive_ptr ci; - qpid::sys::DispatchHandle handle; + qpid::sys::DispatchHandleRef handle; protected: ErrorCallback errorCallback; @@ -160,7 +161,7 @@ namespace Rdma { virtual ~ConnectionManager(); - void start(qpid::sys::Poller::shared_ptr polle, const qpid::sys::SocketAddress& addrr); + void start(qpid::sys::Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr); private: void event(qpid::sys::DispatchHandle& handle); |
