diff options
| author | Alan Conway <aconway@apache.org> | 2008-11-18 19:55:59 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-11-18 19:55:59 +0000 |
| commit | 970fba7f2422eab256273a610135be33bd37f7d6 (patch) | |
| tree | 2e1e66e3bd717c2adc02b9787bb55a0c20679381 /cpp/src/qpid | |
| parent | b355d0e5b46739c74e9cbef449d8fc50646e4db2 (diff) | |
| download | qpid-python-970fba7f2422eab256273a610135be33bd37f7d6.tar.gz | |
Optional cluster integration with cman quorum service.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@718693 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/OutputInterceptor.cpp | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Quorum.h | 32 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Quorum_cman.cpp | 53 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Quorum_cman.h | 53 | ||||
| -rw-r--r-- | cpp/src/qpid/cluster/Quorum_null.h | 39 |
7 files changed, 195 insertions, 1 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f6022aa5b8..b2650ffa7f 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -86,6 +86,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : + isQuorate(isQuorateImpl), broker(b), poller(b.getPoller()), cpg(*this), @@ -591,6 +592,17 @@ broker::Broker& Cluster::getBroker() const { return broker; // Immutable, no need to lock. } +/** Default implementation for isQuorateImpl when there is no quorum service. */ +bool Cluster::isQuorateImpl() { return true; } + +void Cluster::checkQuorum() { + if (!isQuorate()) { + QPID_LOG(critical, *this << " disconnected from cluster quorum, shutting down"); + leave(); + throw Exception(QPID_MSG(*this << " disconnected from cluster quorum.")); + } +} + void Cluster::setClusterId(const Uuid& uuid) { clusterId = uuid; if (mgmtObject) diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index d8b9c958d8..aff3f18c6d 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -92,6 +92,9 @@ class Cluster : private Cpg::Handler, public management::Manageable { MemberId getId() const; broker::Broker& getBroker() const; + boost::function<bool ()> isQuorate; + void checkQuorum(); + private: typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr; typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr; @@ -173,7 +176,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { void dumpOutDone(Lock&); void setClusterId(const framing::Uuid&); - + static bool isQuorateImpl(); + mutable sys::Monitor lock; broker::Broker& broker; diff --git a/cpp/src/qpid/cluster/OutputInterceptor.cpp b/cpp/src/qpid/cluster/OutputInterceptor.cpp index effd2c5bff..68afb9bda0 100644 --- a/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -37,6 +37,7 @@ OutputInterceptor::OutputInterceptor(cluster::Connection& p, sys::ConnectionOutp {} void OutputInterceptor::send(framing::AMQFrame& f) { + parent.getCluster().checkQuorum(); Locker l(lock); next->send(f); if (!parent.isCatchUp()) diff --git a/cpp/src/qpid/cluster/Quorum.h b/cpp/src/qpid/cluster/Quorum.h new file mode 100644 index 0000000000..f07b58dfa6 --- /dev/null +++ b/cpp/src/qpid/cluster/Quorum.h @@ -0,0 +1,32 @@ +#ifndef QPID_CLUSTER_QUORUM_H +#define QPID_CLUSTER_QUORUM_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "config.h" + +#if HAVE_LIBCMAN +#include "Quorum_cman.h" +#else +#include "Quorum_null.h" +#endif + +#endif /*!QPID_CLUSTER_QUORUM_H*/ diff --git a/cpp/src/qpid/cluster/Quorum_cman.cpp b/cpp/src/qpid/cluster/Quorum_cman.cpp new file mode 100644 index 0000000000..0d4656b536 --- /dev/null +++ b/cpp/src/qpid/cluster/Quorum_cman.cpp @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Quorum.h" +#include "qpid/log/Statement.h" +#include "qpid/Options.h" +#include "qpid/sys/Time.h" + +namespace qpid { +namespace cluster { + +Quorum::Quorum() : enable(false), cman(0) {} + +Quorum::~Quorum() { if (cman) cman_finish(cman); } + +void Quorum::addOption(Options& opts) { + opts.addOptions()("cluster-cman", optValue(enable), "Enable integration with CMAN Cluster Manager"); +} + +void Quorum::init() { + if (enable) { + cman = cman_init(0); + if (cman == 0) throw ErrnoException("Can't connect to cman service"); + // FIXME aconway 2008-11-13: configure max wait. + for (int retry = 0; !cman_is_quorate(cman) && retry < 30; retry++) { + QPID_LOG(notice, "Waiting for cluster quorum: " << sys::strError(errno)); + sys::sleep(1); + } + if (!cman_is_quorate(cman)) + throw ErrnoException("Timed out waiting for cluster quorum"); + } +} + +bool Quorum::isQuorate() { return cman_is_quorate(cman); } + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/Quorum_cman.h b/cpp/src/qpid/cluster/Quorum_cman.h new file mode 100644 index 0000000000..bf02f697b0 --- /dev/null +++ b/cpp/src/qpid/cluster/Quorum_cman.h @@ -0,0 +1,53 @@ +#ifndef QPID_CLUSTER_QUORUM_CMAN_H +#define QPID_CLUSTER_QUORUM_CMAN_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +extern "C" { +#include <libcman.h> +} + +namespace qpid { + +class Options; + +namespace cluster { + +class Quorum { + public: + Quorum(); + ~Quorum(); + void addOption(Options& opts); + void init(); + bool isQuorate(); + + private: + bool enable; + cman_handle_t cman; +}; + + +}} // namespace qpid::cluster + + // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_QUORUM_CMAN_H*/ diff --git a/cpp/src/qpid/cluster/Quorum_null.h b/cpp/src/qpid/cluster/Quorum_null.h new file mode 100644 index 0000000000..96374a5e88 --- /dev/null +++ b/cpp/src/qpid/cluster/Quorum_null.h @@ -0,0 +1,39 @@ +#ifndef QPID_CLUSTER_QUORUM_NULL_H +#define QPID_CLUSTER_QUORUM_NULL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +namespace cluster { + +/** Null implementation of quorum. */ + +class Quorum { + public: + void init(); + bool isQuorate() { return true; } + void addOption(Options& opts) {} +}; + +#endif + + +#endif /*!QPID_CLUSTER_QUORUM_NULL_H*/ |
