diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2009-01-06 23:42:18 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2009-01-06 23:42:18 +0000 |
| commit | 9a933ae9011d343a75929136269fe45c6b863a17 (patch) | |
| tree | 29ebd71241d810af6e0f20d7e5694cba1607486f /cpp/src/qpid/sys/DispatchHandle.cpp | |
| parent | 820071d5a9959a2923269751ddcff2ed085b239a (diff) | |
| download | qpid-python-9a933ae9011d343a75929136269fe45c6b863a17.tar.gz | |
Work on the low level IO code:
* Introduce code so that you can interrupt waiting for a handle and receive
a callback that is correctly serialised with the IO callbacks for that
handle
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@732177 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/DispatchHandle.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.cpp | 41 |
1 files changed, 32 insertions, 9 deletions
diff --git a/cpp/src/qpid/sys/DispatchHandle.cpp b/cpp/src/qpid/sys/DispatchHandle.cpp index 4722fc0b8b..cbdee7eda6 100644 --- a/cpp/src/qpid/sys/DispatchHandle.cpp +++ b/cpp/src/qpid/sys/DispatchHandle.cpp @@ -270,22 +270,30 @@ void DispatchHandle::stopWatch() { case IDLE: case DELAYED_IDLE: case DELAYED_DELETE: - return; + return; case DELAYED_R: case DELAYED_W: case DELAYED_RW: case DELAYED_INACTIVE: - state = DELAYED_IDLE; - break; + state = DELAYED_IDLE; + break; default: - state = IDLE; - break; + state = IDLE; + break; } assert(poller); poller->delFd(*this); poller.reset(); } +void DispatchHandle::call(Callback iCb) { + assert(iCb); + ScopedLock<Mutex> lock(stateLock); + interruptedCallbacks.push(iCb); + + (void) poller->interrupt(*this); +} + // The slightly strange switch structure // is to ensure that the lock is released before // we do the delete @@ -302,9 +310,9 @@ void DispatchHandle::doDelete() { state = DELAYED_DELETE; return; case IDLE: - break; + break; default: - // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states + // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states assert(false); } } @@ -368,14 +376,29 @@ void DispatchHandle::processEvent(Poller::EventType type) { disconnectedCallback(*this); } break; + case Poller::INTERRUPTED: + { + ScopedLock<Mutex> lock(stateLock); + assert(interruptedCallbacks.size() > 0); + // We'll actually do the interrupt below + } + break; default: assert(false); } - // If any of the callbacks re-enabled reading/writing then actually - // do it now { ScopedLock<Mutex> lock(stateLock); + // If we've got a pending interrupt do it now + while (interruptedCallbacks.size() > 0) { + Callback cb = interruptedCallbacks.front(); + assert(cb); + cb(*this); + interruptedCallbacks.pop(); + } + + // If any of the callbacks re-enabled reading/writing then actually + // do it now switch (state) { case DELAYED_R: poller->modFd(*this, Poller::INPUT); |
