summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/rdma/RdmaIO.h
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-05-18 21:41:25 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-05-18 21:41:25 +0000
commit27920d16e70f590b49548877a2129a1d2162d985 (patch)
treee3b90b82b63c89ee03317d129192299074fae15f /cpp/src/qpid/sys/rdma/RdmaIO.h
parentbbf31a9b3113ad6d37ed24d2ce767dd5f830afa3 (diff)
downloadqpid-python-27920d16e70f590b49548877a2129a1d2162d985.tar.gz
Fix RDMA for upstream changes which now require notification on shutdown
differently from before git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@945904 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/rdma/RdmaIO.h')
-rw-r--r--cpp/src/qpid/sys/rdma/RdmaIO.h27
1 files changed, 14 insertions, 13 deletions
diff --git a/cpp/src/qpid/sys/rdma/RdmaIO.h b/cpp/src/qpid/sys/rdma/RdmaIO.h
index 711685031c..0b86461465 100644
--- a/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -43,8 +43,9 @@ namespace Rdma {
{
typedef boost::function1<void, AsynchIO&> ErrorCallback;
typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
- typedef boost::function1<void, AsynchIO&> IdleCallback;
- typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback;
+ typedef boost::function1<void, AsynchIO&> IdleCallback;
+ typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback;
+ typedef boost::function1<void, AsynchIO&> NotifyCallback;
QueuePair::intrusive_ptr qp;
qpid::sys::DispatchHandleRef dataHandle;
@@ -54,9 +55,8 @@ namespace Rdma {
int recvBufferCount;
int xmitBufferCount;
int outstandingWrites;
- bool closed; // TODO: Perhaps (probably) this state can be merged with the following...
- bool deleting; // TODO: Perhaps (probably) this state can be merged with the following...
- enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED };
+ bool draining; // TODO: Perhaps (probably) this state can be merged with the following...
+ enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DRAINED, SHUTDOWN };
qpid::sys::AtomicValue<State> state;
//qpid::sys::Mutex stateLock;
std::deque<Buffer*> bufferQueue;
@@ -67,6 +67,7 @@ namespace Rdma {
IdleCallback idleCallback;
FullCallback fullCallback;
ErrorCallback errorCallback;
+ NotifyCallback notifyCallback;
public:
// TODO: Instead of specifying a buffer size specify the amount of memory the AsynchIO class can use
@@ -82,22 +83,20 @@ namespace Rdma {
FullCallback fc,
ErrorCallback ec
);
+ ~AsynchIO();
void start(qpid::sys::Poller::shared_ptr poller);
bool writable() const;
bool bufferAvailable() const;
void queueWrite(Buffer* buff);
void notifyPendingWrite();
- void queueWriteClose();
- void deferDelete();
+ void drainWriteQueue(NotifyCallback);
+ void stop(NotifyCallback);
int incompletedWrites() const;
Buffer* getBuffer();
void returnBuffer(Buffer*);
private:
- // Don't let anyone else delete us to make sure there can't be outstanding callbacks
- ~AsynchIO();
-
// Constants for the peer-peer command messages
// These are sent in the high bits if the imm data of an rdma message
// The low bits are used to send the credit
@@ -107,10 +106,12 @@ namespace Rdma {
void dataEvent(qpid::sys::DispatchHandle& handle);
void processCompletions();
void doWriteCallback();
+ void doStoppedCallback();
+ void doDrainedCallback();
};
inline bool AsynchIO::writable() const {
- return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0);
+ return (!draining && outstandingWrites < xmitBufferCount && xmitCredit > 0);
}
inline int AsynchIO::incompletedWrites() const {
@@ -146,7 +147,7 @@ namespace Rdma {
class ConnectionManager {
Connection::intrusive_ptr ci;
- qpid::sys::DispatchHandle handle;
+ qpid::sys::DispatchHandleRef handle;
protected:
ErrorCallback errorCallback;
@@ -160,7 +161,7 @@ namespace Rdma {
virtual ~ConnectionManager();
- void start(qpid::sys::Poller::shared_ptr polle, const qpid::sys::SocketAddress& addrr);
+ void start(qpid::sys::Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr);
private:
void event(qpid::sys::DispatchHandle& handle);