diff options
Diffstat (limited to 'cpp/src/qpid/sys/DispatchHandle.cpp')
-rw-r--r-- | cpp/src/qpid/sys/DispatchHandle.cpp | 352 |
1 files changed, 0 insertions, 352 deletions
diff --git a/cpp/src/qpid/sys/DispatchHandle.cpp b/cpp/src/qpid/sys/DispatchHandle.cpp deleted file mode 100644 index 5d6fc4e72f..0000000000 --- a/cpp/src/qpid/sys/DispatchHandle.cpp +++ /dev/null @@ -1,352 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/DispatchHandle.h" -#include "qpid/log/Statement.h" - -#include <algorithm> - -#include <boost/cast.hpp> - -#include <assert.h> - -namespace qpid { -namespace sys { - -DispatchHandle::DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) : - PollerHandle(h), - readableCallback(rCb), - writableCallback(wCb), - disconnectedCallback(dCb), - state(IDLE) -{ -} - - -DispatchHandle::~DispatchHandle() { -} - -void DispatchHandle::startWatch(Poller::shared_ptr poller0) { - bool r = readableCallback; - bool w = writableCallback; - - ScopedLock<Mutex> lock(stateLock); - assert(state == IDLE); - - poller = poller0; - poller->registerHandle(*this); - state = WAITING; - Poller::Direction dir = r ? - ( w ? Poller::INOUT : Poller::INPUT ) : - ( w ? Poller::OUTPUT : Poller::NONE ); - poller->monitorHandle(*this, dir); -} - -void DispatchHandle::rewatch() { - bool r = readableCallback; - bool w = writableCallback; - if (!r && !w) { - return; - } - Poller::Direction dir = r ? - ( w ? Poller::INOUT : Poller::INPUT ) : - ( w ? Poller::OUTPUT : Poller::NONE ); - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case STOPPING: - case DELETING: - return; - default: - break; - } - assert(poller); - poller->monitorHandle(*this, dir); -} - -void DispatchHandle::rewatchRead() { - if (!readableCallback) { - return; - } - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case STOPPING: - case DELETING: - return; - default: - break; - } - assert(poller); - poller->monitorHandle(*this, Poller::INPUT); -} - -void DispatchHandle::rewatchWrite() { - if (!writableCallback) { - return; - } - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case STOPPING: - case DELETING: - return; - default: - break; - } - assert(poller); - poller->monitorHandle(*this, Poller::OUTPUT); -} - -void DispatchHandle::unwatchRead() { - if (!readableCallback) { - return; - } - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case STOPPING: - case DELETING: - return; - default: - break; - } - assert(poller); - poller->unmonitorHandle(*this, Poller::INPUT); -} - -void DispatchHandle::unwatchWrite() { - if (!writableCallback) { - return; - } - - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case STOPPING: - case DELETING: - return; - default: - break; - } - assert(poller); - poller->unmonitorHandle(*this, Poller::OUTPUT); -} - -void DispatchHandle::unwatch() { - ScopedLock<Mutex> lock(stateLock); - switch(state) { - case IDLE: - case STOPPING: - case DELETING: - return; - default: - break; - } - assert(poller); - poller->unmonitorHandle(*this, Poller::INOUT); -} - -void DispatchHandle::stopWatch() { - ScopedLock<Mutex> lock(stateLock); - switch (state) { - case IDLE: - assert(state != IDLE); - return; - case STOPPING: - assert(state != STOPPING); - return; - case CALLING: - state = STOPPING; - break; - case WAITING: - state = IDLE; - break; - case DELETING: - return; - } - assert(poller); - poller->unregisterHandle(*this); - poller.reset(); -} - -// If we are in the IDLE/STOPPING state we can't do the callback as we've -// not/no longer got the fd registered in any poller -void DispatchHandle::call(Callback iCb) { - assert(iCb); - ScopedLock<Mutex> lock(stateLock); - switch (state) { - case IDLE: - case STOPPING: - case DELETING: - return; - default: - interruptedCallbacks.push(iCb); - assert(poller); - (void) poller->interrupt(*this); - } -} - -// The slightly strange switch structure -// is to ensure that the lock is released before -// we do the delete -void DispatchHandle::doDelete() { - { - ScopedLock<Mutex> lock(stateLock); - // Ensure that we're no longer watching anything - switch (state) { - case IDLE: - state = DELETING; - break; - case STOPPING: - state = DELETING; - return; - case WAITING: - state = DELETING; - assert(poller); - (void) poller->interrupt(*this); - poller->unregisterHandle(*this); - return; - case CALLING: - state = DELETING; - assert(poller); - poller->unregisterHandle(*this); - return; - case DELETING: - return; - } - } - // If we're IDLE we can do this right away - delete this; -} - -void DispatchHandle::processEvent(Poller::EventType type) { - - // Phase I - { - ScopedLock<Mutex> lock(stateLock); - - switch(state) { - case IDLE: - // Can get here if a non connection thread stops watching - // whilst we were stuck in the above lock - return; - case WAITING: - state = CALLING; - break; - case CALLING: - assert(state!=CALLING); - return; - case STOPPING: - assert(state!=STOPPING); - return; - case DELETING: - // Need to make sure we clean up any pending callbacks in this case - std::swap(callbacks, interruptedCallbacks); - goto saybyebye; - } - - std::swap(callbacks, interruptedCallbacks); - } - - // Do callbacks - whilst we are doing the callbacks we are prevented from processing - // the same handle until we re-enable it. To avoid rentering the callbacks for a single - // handle re-enabling in the callbacks is actually deferred until they are complete. - try { - switch (type) { - case Poller::READABLE: - readableCallback(*this); - break; - case Poller::WRITABLE: - writableCallback(*this); - break; - case Poller::READ_WRITABLE: - readableCallback(*this); - writableCallback(*this); - break; - case Poller::DISCONNECTED: - if (disconnectedCallback) { - disconnectedCallback(*this); - } - break; - case Poller::INTERRUPTED: - { - // We'll actually do the interrupt below - } - break; - default: - assert(false); - } - - // If we have any callbacks do them now - - // (because we use a copy from before the previous callbacks we won't - // do anything yet that was just added) - while (callbacks.size() > 0) { - { - ScopedLock<Mutex> lock(stateLock); - switch (state) { - case DELETING: - goto finishcallbacks; - default: - break; - } - } - Callback cb = callbacks.front(); - assert(cb); - cb(*this); - callbacks.pop(); - } - } catch (std::exception& e) { - // One of the callbacks threw an exception - that's not allowed - QPID_LOG(error, "Caught exception in state: " << state << " with event: " << type << ": " << e.what()); - // It would be nice to clean up and delete ourselves here, but we can't - } - -finishcallbacks: - { - ScopedLock<Mutex> lock(stateLock); - switch (state) { - case IDLE: - assert(state!=IDLE); - return; - case STOPPING: - state = IDLE; - return; - case WAITING: - assert(state!=WAITING); - return; - case CALLING: - state = WAITING; - return; - case DELETING: - break; - } - } - -saybyebye: - delete this; -} - -}} |