diff options
Diffstat (limited to 'cpp/src/qpid/cluster/Quorum_cman.cpp')
| -rw-r--r-- | cpp/src/qpid/cluster/Quorum_cman.cpp | 66 |
1 files changed, 62 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp index 32ed5c1d91..277adaf7b1 100644 --- a/cpp/src/qpid/cluster/Quorum_cman.cpp +++ b/cpp/src/qpid/cluster/Quorum_cman.cpp @@ -18,28 +18,86 @@ * under the License. * */ + #include "qpid/cluster/Quorum_cman.h" +#include "qpid/cluster/Cluster.h" #include "qpid/log/Statement.h" #include "qpid/Options.h" #include "qpid/sys/Time.h" +#include "qpid/sys/posix/PrivatePosix.h" namespace qpid { namespace cluster { -Quorum::Quorum() : enable(false), cman(0) {} +namespace { + +boost::function<void()> errorFn; -Quorum::~Quorum() { if (cman) cman_finish(cman); } +void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int arg) { + if (reason == CMAN_REASON_STATECHANGE && arg == 0) { + QPID_LOG(critical, "Lost contact with cluster quorum."); + if (errorFn) errorFn(); + cman_stop_notification(handle); + } +} +} + +Quorum::Quorum(boost::function<void()> err) : enable(false), cman(0), cmanFd(0) { + errorFn = err; +} + +Quorum::~Quorum() { + dispatchHandle.reset(); + if (cman) cman_finish(cman); +} -void Quorum::init() { +void Quorum::start(boost::shared_ptr<sys::Poller> p) { + poller = p; enable = true; + QPID_LOG(debug, "Connecting to quorum service."); cman = cman_init(0); if (cman == 0) throw ErrnoException("Can't connect to cman service"); if (!cman_is_quorate(cman)) { QPID_LOG(notice, "Waiting for cluster quorum."); while(!cman_is_quorate(cman)) sys::sleep(5); } + int err = cman_start_notification(cman, cmanCallbackFn); + if (err != 0) throw ErrnoException("Can't register for cman notifications"); + watch(getFd()); } -bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; } +void Quorum::watch(int fd) { + cmanFd = fd; + dispatchHandle.reset( + new sys::DispatchHandleRef( + sys::PosixIOHandle(cmanFd), + boost::bind(&Quorum::dispatch, this, _1), // read + 0, // write + boost::bind(&Quorum::disconnect, this, _1) // disconnect + )); + dispatchHandle->startWatch(poller); +} + +int Quorum::getFd() { + int fd = cman_get_fd(cman); + if (fd == 0) throw ErrnoException("Can't get cman file descriptor"); + return fd; +} + +void Quorum::dispatch(sys::DispatchHandle&) { + try { + cman_dispatch(cman, CMAN_DISPATCH_ALL); + int fd = getFd(); + if (fd != cmanFd) watch(fd); + } catch (const std::exception& e) { + QPID_LOG(critical, "Error in quorum dispatch: " << e.what()); + errorFn(); + } +} + +void Quorum::disconnect(sys::DispatchHandle&) { + QPID_LOG(critical, "Disconnected from quorum service"); + errorFn(); +} }} // namespace qpid::cluster |
