summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Quorum_cman.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-08-06 17:41:18 +0000
committerAlan Conway <aconway@apache.org>2009-08-06 17:41:18 +0000
commit1af1e0114b71a366dc78a47c3d63833f1a0b4c8a (patch)
tree85488c593a8d165641d45035d16a78ae402fbdea /cpp/src/qpid/cluster/Quorum_cman.cpp
parent7ffe1a63bfdbf071ca4e3b1f31e66182caa90873 (diff)
downloadqpid-python-1af1e0114b71a366dc78a47c3d63833f1a0b4c8a.tar.gz
Fix cman integration to exit immediately on loss of quorum.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@801740 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/cluster/Quorum_cman.cpp')
-rw-r--r--cpp/src/qpid/cluster/Quorum_cman.cpp66
1 files changed, 62 insertions, 4 deletions
diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp
index 32ed5c1d91..277adaf7b1 100644
--- a/cpp/src/qpid/cluster/Quorum_cman.cpp
+++ b/cpp/src/qpid/cluster/Quorum_cman.cpp
@@ -18,28 +18,86 @@
* under the License.
*
*/
+
#include "qpid/cluster/Quorum_cman.h"
+#include "qpid/cluster/Cluster.h"
#include "qpid/log/Statement.h"
#include "qpid/Options.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/posix/PrivatePosix.h"
namespace qpid {
namespace cluster {
-Quorum::Quorum() : enable(false), cman(0) {}
+namespace {
+
+boost::function<void()> errorFn;
-Quorum::~Quorum() { if (cman) cman_finish(cman); }
+void cmanCallbackFn(cman_handle_t handle, void */*privdata*/, int reason, int arg) {
+ if (reason == CMAN_REASON_STATECHANGE && arg == 0) {
+ QPID_LOG(critical, "Lost contact with cluster quorum.");
+ if (errorFn) errorFn();
+ cman_stop_notification(handle);
+ }
+}
+}
+
+Quorum::Quorum(boost::function<void()> err) : enable(false), cman(0), cmanFd(0) {
+ errorFn = err;
+}
+
+Quorum::~Quorum() {
+ dispatchHandle.reset();
+ if (cman) cman_finish(cman);
+}
-void Quorum::init() {
+void Quorum::start(boost::shared_ptr<sys::Poller> p) {
+ poller = p;
enable = true;
+ QPID_LOG(debug, "Connecting to quorum service.");
cman = cman_init(0);
if (cman == 0) throw ErrnoException("Can't connect to cman service");
if (!cman_is_quorate(cman)) {
QPID_LOG(notice, "Waiting for cluster quorum.");
while(!cman_is_quorate(cman)) sys::sleep(5);
}
+ int err = cman_start_notification(cman, cmanCallbackFn);
+ if (err != 0) throw ErrnoException("Can't register for cman notifications");
+ watch(getFd());
}
-bool Quorum::isQuorate() { return enable ? cman_is_quorate(cman) : true; }
+void Quorum::watch(int fd) {
+ cmanFd = fd;
+ dispatchHandle.reset(
+ new sys::DispatchHandleRef(
+ sys::PosixIOHandle(cmanFd),
+ boost::bind(&Quorum::dispatch, this, _1), // read
+ 0, // write
+ boost::bind(&Quorum::disconnect, this, _1) // disconnect
+ ));
+ dispatchHandle->startWatch(poller);
+}
+
+int Quorum::getFd() {
+ int fd = cman_get_fd(cman);
+ if (fd == 0) throw ErrnoException("Can't get cman file descriptor");
+ return fd;
+}
+
+void Quorum::dispatch(sys::DispatchHandle&) {
+ try {
+ cman_dispatch(cman, CMAN_DISPATCH_ALL);
+ int fd = getFd();
+ if (fd != cmanFd) watch(fd);
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, "Error in quorum dispatch: " << e.what());
+ errorFn();
+ }
+}
+
+void Quorum::disconnect(sys::DispatchHandle&) {
+ QPID_LOG(critical, "Disconnected from quorum service");
+ errorFn();
+}
}} // namespace qpid::cluster