summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-11-18 19:55:59 +0000
committerAlan Conway <aconway@apache.org>2008-11-18 19:55:59 +0000
commit970fba7f2422eab256273a610135be33bd37f7d6 (patch)
tree2e1e66e3bd717c2adc02b9787bb55a0c20679381 /cpp/src/qpid
parentb355d0e5b46739c74e9cbef449d8fc50646e4db2 (diff)
downloadqpid-python-970fba7f2422eab256273a610135be33bd37f7d6.tar.gz
Optional cluster integration with cman quorum service.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@718693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp12
-rw-r--r--cpp/src/qpid/cluster/Cluster.h6
-rw-r--r--cpp/src/qpid/cluster/OutputInterceptor.cpp1
-rw-r--r--cpp/src/qpid/cluster/Quorum.h32
-rw-r--r--cpp/src/qpid/cluster/Quorum_cman.cpp53
-rw-r--r--cpp/src/qpid/cluster/Quorum_cman.h53
-rw-r--r--cpp/src/qpid/cluster/Quorum_null.h39
7 files changed, 195 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f6022aa5b8..b2650ffa7f 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -86,6 +86,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
+ isQuorate(isQuorateImpl),
broker(b),
poller(b.getPoller()),
cpg(*this),
@@ -591,6 +592,17 @@ broker::Broker& Cluster::getBroker() const {
return broker; // Immutable, no need to lock.
}
+/** Default implementation for isQuorateImpl when there is no quorum service. */
+bool Cluster::isQuorateImpl() { return true; }
+
+void Cluster::checkQuorum() {
+ if (!isQuorate()) {
+ QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down");
+ leave();
+ throw Exception(QPID_MSG(*this << " disconnected from cluster quorum."));
+ }
+}
+
void Cluster::setClusterId(const Uuid& uuid) {
clusterId = uuid;
if (mgmtObject)
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index d8b9c958d8..aff3f18c6d 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -92,6 +92,9 @@ class Cluster : private Cpg::Handler, public management::Manageable {
MemberId getId() const;
broker::Broker& getBroker() const;
+ boost::function<bool ()> isQuorate;
+ void checkQuorum();
+
private:
typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
@@ -173,7 +176,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void dumpOutDone(Lock&);
void setClusterId(const framing::Uuid&);
-
+ static bool isQuorateImpl();
+
mutable sys::Monitor lock;
broker::Broker& broker;
diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp
index effd2c5bff..68afb9bda0 100644
--- a/cpp/src/qpid/cluster/OutputInterceptor.cpp
+++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp
@@ -37,6 +37,7 @@ OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutp
{}
void OutputInterceptor::send(framing::AMQFrame& f) {
+ parent.getCluster().checkQuorum();
Locker l(lock);
next->send(f);
if (!parent.isCatchUp())
diff --git a/cpp/src/qpid/cluster/Quorum.h b/cpp/src/qpid/cluster/Quorum.h
new file mode 100644
index 0000000000..f07b58dfa6
--- /dev/null
+++ b/cpp/src/qpid/cluster/Quorum.h
@@ -0,0 +1,32 @@
+#ifndef QPID_CLUSTER_QUORUM_H
+#define QPID_CLUSTER_QUORUM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "config.h"
+
+#if HAVE_LIBCMAN
+#include "Quorum_cman.h"
+#else
+#include "Quorum_null.h"
+#endif
+
+#endif /*!QPID_CLUSTER_QUORUM_H*/
diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp
new file mode 100644
index 0000000000..0d4656b536
--- /dev/null
+++ b/cpp/src/qpid/cluster/Quorum_cman.cpp
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Quorum.h"
+#include "qpid/log/Statement.h"
+#include "qpid/Options.h"
+#include "qpid/sys/Time.h"
+
+namespace qpid {
+namespace cluster {
+
+Quorum::Quorum() : enable(false), cman(0) {}
+
+Quorum::~Quorum() { if (cman) cman_finish(cman); }
+
+void Quorum::addOption(Options& opts) {
+ opts.addOptions()("cluster-cman", optValue(enable), "Enable integration with CMAN Cluster Manager");
+}
+
+void Quorum::init() {
+ if (enable) {
+ cman = cman_init(0);
+ if (cman == 0) throw ErrnoException("Can't connect to cman service");
+ // FIXME aconway 2008-11-13: configure max wait.
+ for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) {
+ QPID_LOG(notice, "Waiting for cluster quorum: " << sys::strError(errno));
+ sys::sleep(1);
+ }
+ if (!cman_is_quorate(cman))
+ throw ErrnoException("Timed out waiting for cluster quorum");
+ }
+}
+
+bool Quorum::isQuorate() { return cman_is_quorate(cman); }
+
+}} // namespace qpid::cluster
diff --git a/cpp/src/qpid/cluster/Quorum_cman.h b/cpp/src/qpid/cluster/Quorum_cman.h
new file mode 100644
index 0000000000..bf02f697b0
--- /dev/null
+++ b/cpp/src/qpid/cluster/Quorum_cman.h
@@ -0,0 +1,53 @@
+#ifndef QPID_CLUSTER_QUORUM_CMAN_H
+#define QPID_CLUSTER_QUORUM_CMAN_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+extern "C" {
+#include <libcman.h>
+}
+
+namespace qpid {
+
+class Options;
+
+namespace cluster {
+
+class Quorum {
+ public:
+ Quorum();
+ ~Quorum();
+ void addOption(Options& opts);
+ void init();
+ bool isQuorate();
+
+ private:
+ bool enable;
+ cman_handle_t cman;
+};
+
+
+}} // namespace qpid::cluster
+
+ // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_QUORUM_CMAN_H*/
diff --git a/cpp/src/qpid/cluster/Quorum_null.h b/cpp/src/qpid/cluster/Quorum_null.h
new file mode 100644
index 0000000000..96374a5e88
--- /dev/null
+++ b/cpp/src/qpid/cluster/Quorum_null.h
@@ -0,0 +1,39 @@
+#ifndef QPID_CLUSTER_QUORUM_NULL_H
+#define QPID_CLUSTER_QUORUM_NULL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace cluster {
+
+/** Null implementation of quorum. */
+
+class Quorum {
+ public:
+ void init();
+ bool isQuorate() { return true; }
+ void addOption(Options& opts) {}
+};
+
+#endif
+
+
+#endif /*!QPID_CLUSTER_QUORUM_NULL_H*/