summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/Dispatcher.cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-07-27 17:19:30 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-07-27 17:19:30 +0000
commit65ea2f177bd0810590895d89a490af8cea60253b (patch)
tree1a1432d706ac5f43dc8cdd5fdb0d7b5566dd5d06 /cpp/src/qpid/sys/Dispatcher.cpp
parent0a7f3f5dde40e59e90588e4ab7ba2ba99651c0f4 (diff)
downloadqpid-python-65ea2f177bd0810590895d89a490af8cea60253b.tar.gz
* Asynchronous network IO subsystem
- This is now implemented such that it very nearly only depends on the platform code (Socker & Poller), this is not 100% true at present, but should be simple to finish. - This is still not the default (use "./configure --disable-apr-netio" to get it) - Interrupting the broker gives a known error - Default for number of broker io threads is not correct (needs to be number of CPUs - it will run slower with too many io threads) * EventChannel code - Deleted all EventChannel code as it's entirely superceded by this new shiny code ;-) * Rearranged the platform Socket implementations a bit for better abstraction git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/sys/Dispatcher.cpp')
-rw-r--r--cpp/src/qpid/sys/Dispatcher.cpp59
1 files changed, 40 insertions, 19 deletions
diff --git a/cpp/src/qpid/sys/Dispatcher.cpp b/cpp/src/qpid/sys/Dispatcher.cpp
index 3a1da13bd0..b8751168c2 100644
--- a/cpp/src/qpid/sys/Dispatcher.cpp
+++ b/cpp/src/qpid/sys/Dispatcher.cpp
@@ -94,10 +94,11 @@ void DispatchHandle::rewatch() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_W:
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = r ?
(w ? DELAYED_RW : DELAYED_R) :
DELAYED_W;
@@ -132,6 +133,7 @@ void DispatchHandle::rewatchRead() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_RW:
@@ -140,7 +142,7 @@ void DispatchHandle::rewatchRead() {
case DELAYED_W:
state = DELAYED_RW;
break;
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = DELAYED_R;
break;
case ACTIVE_R:
@@ -168,6 +170,7 @@ void DispatchHandle::rewatchWrite() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_W:
case DELAYED_RW:
@@ -176,7 +179,7 @@ void DispatchHandle::rewatchWrite() {
case DELAYED_R:
state = DELAYED_RW;
break;
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = DELAYED_W;
break;
case INACTIVE:
@@ -204,15 +207,16 @@ void DispatchHandle::unwatchRead() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
- state = CALLBACK;
+ state = DELAYED_INACTIVE;
break;
case DELAYED_RW:
state = DELAYED_W;
break;
case DELAYED_W:
- case CALLBACK:
+ case DELAYED_INACTIVE:
case DELAYED_DELETE:
break;
case ACTIVE_R:
@@ -239,15 +243,16 @@ void DispatchHandle::unwatchWrite() {
ScopedLock<Mutex> lock(stateLock);
switch(state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_W:
- state = CALLBACK;
+ state = DELAYED_INACTIVE;
break;
case DELAYED_RW:
state = DELAYED_R;
break;
case DELAYED_R:
- case CALLBACK:
+ case DELAYED_INACTIVE:
case DELAYED_DELETE:
break;
case ACTIVE_W:
@@ -270,12 +275,13 @@ void DispatchHandle::unwatch() {
ScopedLock<Mutex> lock(stateLock);
switch (state) {
case IDLE:
+ case DELAYED_IDLE:
break;
case DELAYED_R:
case DELAYED_W:
case DELAYED_RW:
- case CALLBACK:
- state = CALLBACK;
+ case DELAYED_INACTIVE:
+ state = DELAYED_INACTIVE;
break;
case DELAYED_DELETE:
break;
@@ -289,32 +295,46 @@ void DispatchHandle::unwatch() {
void DispatchHandle::stopWatch() {
ScopedLock<Mutex> lock(stateLock);
- if ( state == IDLE) {
+ switch (state) {
+ case IDLE:
+ case DELAYED_IDLE:
+ case DELAYED_DELETE:
return;
+ case DELAYED_R:
+ case DELAYED_W:
+ case DELAYED_RW:
+ case DELAYED_INACTIVE:
+ state = DELAYED_IDLE;
+ break;
+ default:
+ state = IDLE;
+ break;
}
assert(poller);
poller->delFd(*this);
poller.reset();
- state = IDLE;
}
// The slightly strange switch structure
// is to ensure that the lock is released before
// we do the delete
void DispatchHandle::doDelete() {
+ // Ensure that we're no longer watching anything
+ stopWatch();
+
// If we're in the middle of a callback defer the delete
{
ScopedLock<Mutex> lock(stateLock);
switch (state) {
- case DELAYED_R:
- case DELAYED_W:
- case DELAYED_RW:
- case CALLBACK:
+ case DELAYED_IDLE:
case DELAYED_DELETE:
state = DELAYED_DELETE;
return;
+ case IDLE:
+ break;
default:
- break;
+ // Can only get out of stopWatch() in DELAYED_IDLE/DELAYED_DELETE/IDLE states
+ assert(false);
}
}
// If we're not then do it right away
@@ -359,7 +379,7 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) {
case Poller::DISCONNECTED:
{
ScopedLock<Mutex> lock(stateLock);
- state = CALLBACK;
+ state = DELAYED_INACTIVE;
}
if (disconnectedCallback) {
disconnectedCallback(*this);
@@ -386,10 +406,11 @@ void DispatchHandle::dispatchCallbacks(Poller::EventType type) {
poller->modFd(*this, Poller::INOUT);
state = ACTIVE_RW;
return;
- case CALLBACK:
+ case DELAYED_INACTIVE:
state = INACTIVE;
return;
- case IDLE:
+ case DELAYED_IDLE:
+ state = IDLE;
return;
default:
// This should be impossible