diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2007-07-27 17:19:30 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2007-07-27 17:19:30 +0000 |
| commit | 65ea2f177bd0810590895d89a490af8cea60253b (patch) | |
| tree | 1a1432d706ac5f43dc8cdd5fdb0d7b5566dd5d06 /cpp/src/qpid/sys/Dispatcher.cpp | |
| parent | 0a7f3f5dde40e59e90588e4ab7ba2ba99651c0f4 (diff) | |
| download | qpid-python-65ea2f177bd0810590895d89a490af8cea60253b.tar.gz | |
* Asynchronous network IO subsystem
- This is now implemented such that it very nearly only depends on the platform
code (Socker & Poller), this is not 100% true at present, but should be simple
to finish.
- This is still not the default (use "./configure --disable-apr-netio" to get it)
- Interrupting the broker gives a known error
- Default for number of broker io threads is not correct (needs to be number of CPUs -
it will run slower with too many io threads)
* EventChannel code
- Deleted all EventChannel code as it's entirely superceded by this new shiny code ;-)
* Rearranged the platform Socket implementations a bit for better abstraction
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/Dispatcher.cpp')
| -rw-r--r-- | cpp/src/qpid/sys/Dispatcher.cpp | 59 |
1 files changed, 40 insertions, 19 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp index 3a1da13bd0..b8751168c2 100644 --- a/cpp/src/qpid/sys/Dispatcher.cpp +++ b/cpp/src/qpid/sys/Dispatcher.cpp @@ -94,10 +94,11 @@ void DispatchHandle::rewatch() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_R: case DELAYED_W: - case CALLBACK: + case DELAYED_INACTIVE: state = r ? (w ? DELAYED_RW : DELAYED_R) : DELAYED_W; @@ -132,6 +133,7 @@ void DispatchHandle::rewatchRead() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_R: case DELAYED_RW: @@ -140,7 +142,7 @@ void DispatchHandle::rewatchRead() { case DELAYED_W: state = DELAYED_RW; break; - case CALLBACK: + case DELAYED_INACTIVE: state = DELAYED_R; break; case ACTIVE_R: @@ -168,6 +170,7 @@ void DispatchHandle::rewatchWrite() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_W: case DELAYED_RW: @@ -176,7 +179,7 @@ void DispatchHandle::rewatchWrite() { case DELAYED_R: state = DELAYED_RW; break; - case CALLBACK: + case DELAYED_INACTIVE: state = DELAYED_W; break; case INACTIVE: @@ -204,15 +207,16 @@ void DispatchHandle::unwatchRead() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_R: - state = CALLBACK; + state = DELAYED_INACTIVE; break; case DELAYED_RW: state = DELAYED_W; break; case DELAYED_W: - case CALLBACK: + case DELAYED_INACTIVE: case DELAYED_DELETE: break; case ACTIVE_R: @@ -239,15 +243,16 @@ void DispatchHandle::unwatchWrite() { ScopedLock<Mutex> lock(stateLock); switch(state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_W: - state = CALLBACK; + state = DELAYED_INACTIVE; break; case DELAYED_RW: state = DELAYED_R; break; case DELAYED_R: - case CALLBACK: + case DELAYED_INACTIVE: case DELAYED_DELETE: break; case ACTIVE_W: @@ -270,12 +275,13 @@ void DispatchHandle::unwatch() { ScopedLock<Mutex> lock(stateLock); switch (state) { case IDLE: + case DELAYED_IDLE: break; case DELAYED_R: case DELAYED_W: case DELAYED_RW: - case CALLBACK: - state = CALLBACK; + case DELAYED_INACTIVE: + state = DELAYED_INACTIVE; break; case DELAYED_DELETE: break; @@ -289,32 +295,46 @@ void DispatchHandle::unwatch() { void DispatchHandle::stopWatch() { ScopedLock<Mutex> lock(stateLock); - if ( state == IDLE) { + switch (state) { + case IDLE: + case DELAYED_IDLE: + case DELAYED_DELETE: return; + case DELAYED_R: + case DELAYED_W: + case DELAYED_RW: + case DELAYED_INACTIVE: + state = DELAYED_IDLE; + break; + default: + state = IDLE; + break; } assert(poller); poller->delFd(*this); poller.reset(); - state = IDLE; } // The slightly strange switch structure // is to ensure that the lock is released before // we do the delete void DispatchHandle::doDelete() { + // Ensure that we're no longer watching anything + stopWatch(); + // If we're in the middle of a callback defer the delete { ScopedLock<Mutex> lock(stateLock); switch (state) { - case DELAYED_R: - case DELAYED_W: - case DELAYED_RW: - case CALLBACK: + case DELAYED_IDLE: case DELAYED_DELETE: state = DELAYED_DELETE; return; + case IDLE: + break; default: - break; + // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states + assert(false); } } // If we're not then do it right away @@ -359,7 +379,7 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) { case Poller::DISCONNECTED: { ScopedLock<Mutex> lock(stateLock); - state = CALLBACK; + state = DELAYED_INACTIVE; } if (disconnectedCallback) { disconnectedCallback(*this); @@ -386,10 +406,11 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) { poller->modFd(*this, Poller::INOUT); state = ACTIVE_RW; return; - case CALLBACK: + case DELAYED_INACTIVE: state = INACTIVE; return; - case IDLE: + case DELAYED_IDLE: + state = IDLE; return; default: // This should be impossible |
