summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys/DispatchHandle.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys/DispatchHandle.cpp')
-rw-r--r--cpp/src/qpid/sys/DispatchHandle.cpp352
1 files changed, 0 insertions, 352 deletions
diff --git a/cpp/src/qpid/sys/DispatchHandle.cpp b/cpp/src/qpid/sys/DispatchHandle.cpp
deleted file mode 100644
index 5d6fc4e72f..0000000000
--- a/cpp/src/qpid/sys/DispatchHandle.cpp
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/sys/DispatchHandle.h"
-#include "qpid/log/Statement.h"
-
-#include <algorithm>
-
-#include <boost/cast.hpp>
-
-#include <assert.h>
-
-namespace qpid {
-namespace sys {
-
-DispatchHandle::DispatchHandle(const IOHandle& h, Callback rCb, Callback wCb, Callback dCb) :
- PollerHandle(h),
- readableCallback(rCb),
- writableCallback(wCb),
- disconnectedCallback(dCb),
- state(IDLE)
-{
-}
-
-
-DispatchHandle::~DispatchHandle() {
-}
-
-void DispatchHandle::startWatch(Poller::shared_ptr poller0) {
- bool r = readableCallback;
- bool w = writableCallback;
-
- ScopedLock<Mutex> lock(stateLock);
- assert(state == IDLE);
-
- poller = poller0;
- poller->registerHandle(*this);
- state = WAITING;
- Poller::Direction dir = r ?
- ( w ? Poller::INOUT : Poller::INPUT ) :
- ( w ? Poller::OUTPUT : Poller::NONE );
- poller->monitorHandle(*this, dir);
-}
-
-void DispatchHandle::rewatch() {
- bool r = readableCallback;
- bool w = writableCallback;
- if (!r && !w) {
- return;
- }
- Poller::Direction dir = r ?
- ( w ? Poller::INOUT : Poller::INPUT ) :
- ( w ? Poller::OUTPUT : Poller::NONE );
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case STOPPING:
- case DELETING:
- return;
- default:
- break;
- }
- assert(poller);
- poller->monitorHandle(*this, dir);
-}
-
-void DispatchHandle::rewatchRead() {
- if (!readableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case STOPPING:
- case DELETING:
- return;
- default:
- break;
- }
- assert(poller);
- poller->monitorHandle(*this, Poller::INPUT);
-}
-
-void DispatchHandle::rewatchWrite() {
- if (!writableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case STOPPING:
- case DELETING:
- return;
- default:
- break;
- }
- assert(poller);
- poller->monitorHandle(*this, Poller::OUTPUT);
-}
-
-void DispatchHandle::unwatchRead() {
- if (!readableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case STOPPING:
- case DELETING:
- return;
- default:
- break;
- }
- assert(poller);
- poller->unmonitorHandle(*this, Poller::INPUT);
-}
-
-void DispatchHandle::unwatchWrite() {
- if (!writableCallback) {
- return;
- }
-
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case STOPPING:
- case DELETING:
- return;
- default:
- break;
- }
- assert(poller);
- poller->unmonitorHandle(*this, Poller::OUTPUT);
-}
-
-void DispatchHandle::unwatch() {
- ScopedLock<Mutex> lock(stateLock);
- switch(state) {
- case IDLE:
- case STOPPING:
- case DELETING:
- return;
- default:
- break;
- }
- assert(poller);
- poller->unmonitorHandle(*this, Poller::INOUT);
-}
-
-void DispatchHandle::stopWatch() {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case IDLE:
- assert(state != IDLE);
- return;
- case STOPPING:
- assert(state != STOPPING);
- return;
- case CALLING:
- state = STOPPING;
- break;
- case WAITING:
- state = IDLE;
- break;
- case DELETING:
- return;
- }
- assert(poller);
- poller->unregisterHandle(*this);
- poller.reset();
-}
-
-// If we are in the IDLE/STOPPING state we can't do the callback as we've
-// not/no longer got the fd registered in any poller
-void DispatchHandle::call(Callback iCb) {
- assert(iCb);
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case IDLE:
- case STOPPING:
- case DELETING:
- return;
- default:
- interruptedCallbacks.push(iCb);
- assert(poller);
- (void) poller->interrupt(*this);
- }
-}
-
-// The slightly strange switch structure
-// is to ensure that the lock is released before
-// we do the delete
-void DispatchHandle::doDelete() {
- {
- ScopedLock<Mutex> lock(stateLock);
- // Ensure that we're no longer watching anything
- switch (state) {
- case IDLE:
- state = DELETING;
- break;
- case STOPPING:
- state = DELETING;
- return;
- case WAITING:
- state = DELETING;
- assert(poller);
- (void) poller->interrupt(*this);
- poller->unregisterHandle(*this);
- return;
- case CALLING:
- state = DELETING;
- assert(poller);
- poller->unregisterHandle(*this);
- return;
- case DELETING:
- return;
- }
- }
- // If we're IDLE we can do this right away
- delete this;
-}
-
-void DispatchHandle::processEvent(Poller::EventType type) {
-
- // Phase I
- {
- ScopedLock<Mutex> lock(stateLock);
-
- switch(state) {
- case IDLE:
- // Can get here if a non connection thread stops watching
- // whilst we were stuck in the above lock
- return;
- case WAITING:
- state = CALLING;
- break;
- case CALLING:
- assert(state!=CALLING);
- return;
- case STOPPING:
- assert(state!=STOPPING);
- return;
- case DELETING:
- // Need to make sure we clean up any pending callbacks in this case
- std::swap(callbacks, interruptedCallbacks);
- goto saybyebye;
- }
-
- std::swap(callbacks, interruptedCallbacks);
- }
-
- // Do callbacks - whilst we are doing the callbacks we are prevented from processing
- // the same handle until we re-enable it. To avoid rentering the callbacks for a single
- // handle re-enabling in the callbacks is actually deferred until they are complete.
- try {
- switch (type) {
- case Poller::READABLE:
- readableCallback(*this);
- break;
- case Poller::WRITABLE:
- writableCallback(*this);
- break;
- case Poller::READ_WRITABLE:
- readableCallback(*this);
- writableCallback(*this);
- break;
- case Poller::DISCONNECTED:
- if (disconnectedCallback) {
- disconnectedCallback(*this);
- }
- break;
- case Poller::INTERRUPTED:
- {
- // We'll actually do the interrupt below
- }
- break;
- default:
- assert(false);
- }
-
- // If we have any callbacks do them now -
- // (because we use a copy from before the previous callbacks we won't
- // do anything yet that was just added)
- while (callbacks.size() > 0) {
- {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case DELETING:
- goto finishcallbacks;
- default:
- break;
- }
- }
- Callback cb = callbacks.front();
- assert(cb);
- cb(*this);
- callbacks.pop();
- }
- } catch (std::exception& e) {
- // One of the callbacks threw an exception - that's not allowed
- QPID_LOG(error, "Caught exception in state: " << state << " with event: " << type << ": " << e.what());
- // It would be nice to clean up and delete ourselves here, but we can't
- }
-
-finishcallbacks:
- {
- ScopedLock<Mutex> lock(stateLock);
- switch (state) {
- case IDLE:
- assert(state!=IDLE);
- return;
- case STOPPING:
- state = IDLE;
- return;
- case WAITING:
- assert(state!=WAITING);
- return;
- case CALLING:
- state = WAITING;
- return;
- case DELETING:
- break;
- }
- }
-
-saybyebye:
- delete this;
-}
-
-}}