diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-08 15:24:18 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-08 15:24:18 +0000 |
| commit | ba3b6c53f4072744aecbac429c8eab66631d84c6 (patch) | |
| tree | 9786da704b6f20b2f8c47e05ad1fe119d02069d3 /qpid/cpp/src | |
| parent | bb13e5e60b83bc44d436d4fdf41cb56af4da7c81 (diff) | |
| download | qpid-python-ba3b6c53f4072744aecbac429c8eab66631d84c6.tar.gz | |
QPID-3603: HA primary sends membership updates to backup brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1348113 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/ha.mk | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerInfo.cpp | 44 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerInfo.h | 20 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 34 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp | 46 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ConnectionExcluder.h | 13 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 28 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.cpp | 89 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.h | 65 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/management-schema.xml | 15 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 49 |
15 files changed, 353 insertions, 62 deletions
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index ae7b803b87..cab7d8c42b 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -39,6 +39,8 @@ ha_la_SOURCES = \ qpid/ha/HaPlugin.cpp \ qpid/ha/LogPrefix.cpp \ qpid/ha/LogPrefix.h \ + qpid/ha/Membership.cpp \ + qpid/ha/Membership.h \ qpid/ha/Primary.cpp \ qpid/ha/Primary.h \ qpid/ha/QueueReplicator.cpp \ diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 4b8a9081a1..158bdd927d 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -75,7 +75,7 @@ void Backup::initialize(const Url& brokers) { sys::Mutex::ScopedLock l(lock); Url url = linkUrl(brokers); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; - framing::Uuid uuid(true); + types::Uuid uuid(true); // Declare the link std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp index 2422bcd3e2..2673646646 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp @@ -20,8 +20,11 @@ */ #include "BrokerInfo.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" #include <iostream> @@ -35,23 +38,46 @@ std::string PORT="port"; std::string STATUS="status"; } -using framing::Uuid; +using types::Uuid; +using types::Variant; using framing::FieldTable; FieldTable BrokerInfo::asFieldTable() const { + Variant::Map m = asMap(); FieldTable ft; - ft.setString(SYSTEM_ID, systemId.str()); - ft.setString(HOST_NAME, hostName); - ft.setInt(PORT, port); - ft.setInt(STATUS, status); + amqp_0_10::translate(m, ft); return ft; } +Variant::Map BrokerInfo::asMap() const { + Variant::Map m; + m[SYSTEM_ID] = systemId; + m[HOST_NAME] = hostName; + m[PORT] = port; + m[STATUS] = status; + return m; +} + void BrokerInfo::assign(const FieldTable& ft) { - systemId = Uuid(ft.getAsString(SYSTEM_ID)); - hostName = ft.getAsString(HOST_NAME); - port = ft.getAsInt(PORT); - status = BrokerStatus(ft.getAsInt(STATUS)); + Variant::Map m; + amqp_0_10::translate(ft, m); + assign(m); +} + +namespace { +const Variant& get(const Variant::Map& m, const std::string& k) { + Variant::Map::const_iterator i = m.find(k); + if (i == m.end()) throw Exception( + QPID_MSG("Missing field '" << k << "' in broker information")); + return i->second; +} +} + +void BrokerInfo::assign(const Variant::Map& m) { + systemId = get(m, SYSTEM_ID).asUuid(); + hostName = get(m, HOST_NAME).asString(); + port = get(m, PORT).asUint16(); + status = BrokerStatus(get(m, STATUS).asUint8()); } std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h index 7ccbd056c3..b0864e0402 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.h +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h @@ -24,8 +24,9 @@ #include "Enum.h" #include "qpid/Url.h" -#include "qpid/framing/Uuid.h" #include "qpid/framing/FieldTable.h" +#include "qpid/types/Uuid.h" +#include "qpid/types/Variant.h" #include <string> #include <iosfwd> @@ -38,24 +39,29 @@ namespace ha { class BrokerInfo { public: - BrokerInfo(const std::string& host, uint16_t port_, const framing::Uuid& id) : + BrokerInfo() {} + BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) : hostName(host), port(port_), systemId(id) {} - BrokerInfo(const framing::FieldTable& ft) { assign(ft); } - framing::FieldTable asFieldTable() const; - void assign(const framing::FieldTable&); + BrokerInfo(const types::Variant::Map& m) { assign(m); } - framing::Uuid getSystemId() const { return systemId; } + types::Uuid getSystemId() const { return systemId; } std::string getHostName() const { return hostName; } BrokerStatus getStatus() const { return status; } uint16_t getPort() const { return port; } void setStatus(BrokerStatus s) { status = s; } + framing::FieldTable asFieldTable() const; + types::Variant::Map asMap() const; + + void assign(const framing::FieldTable&); + void assign(const types::Variant::Map&); + private: std::string hostName; uint16_t port; - framing::Uuid systemId; + types::Uuid systemId; BrokerStatus status; }; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 2415aff84a..7679078c40 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -13,7 +13,7 @@ * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the +* KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * @@ -37,6 +37,7 @@ #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" +#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include <algorithm> #include <sstream> #include <assert.h> @@ -51,6 +52,7 @@ using qmf::org::apache::qpid::broker::EventExchangeDelete; using qmf::org::apache::qpid::broker::EventQueueDeclare; using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; +using qmf::org::apache::qpid::ha::EventMembersUpdate; using namespace framing; using std::string; using types::Variant; @@ -93,7 +95,8 @@ const string TYPE("type"); const string USER("user"); const string HA_BROKER("habroker"); -const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); +const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); +const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#"); const string QMF2("qmf2"); const string QMF_CONTENT("qmf.content"); const string QMF_DEFAULT_TOPIC("qmf.default.topic"); @@ -109,6 +112,7 @@ const string ORG_APACHE_QPID_HA("org.apache.qpid.ha"); const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); +const string MEMBERS("members"); bool isQMFv2(const Message& message) { const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>(); @@ -169,7 +173,7 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& logPrefix(hb), haBroker(hb), broker(hb.getBroker()), link(l) { - framing::Uuid uuid(true); + types::Uuid uuid(true); const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); broker.getLinks().declare( name, // name for bridge @@ -221,7 +225,8 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH FieldTable declareArgs; declareArgs.setString(QPID_REPLICATE, printable(NONE).str()); peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs); - peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable()); //subscribe to the queue peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); @@ -256,6 +261,7 @@ void BrokerReplicator::route(Deliverable& msg) { else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); else if (match<EventBind>(schema)) doEventBind(values); else if (match<EventUnbind>(schema)) doEventUnbind(values); + else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values); } } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { @@ -423,6 +429,11 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { } } +void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { + Variant::List members = values[MEMBERS].asList(); + haBroker.getMembership().assign(members); +} + void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); if (!isReplicated(values[ARGUMENTS].asMap(), @@ -517,15 +528,14 @@ const string REPLICATE_DEFAULT="replicateDefault"; void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); - ReplicateLevel primary = haBroker.replicateLevel(values[REPLICATE_DEFAULT].asString()); - if (mine != primary) { - QPID_LOG(critical, logPrefix << "Replicate default on backup (" << mine - << ") does not match primary (" << primary << ")"); - haBroker.shutdown(); - } + ReplicateLevel primary = haBroker.replicateLevel( + values[REPLICATE_DEFAULT].asString()); + if (mine != primary) + throw Exception(QPID_MSG("Replicate default on backup (" << mine + << ") does not match primary (" << primary << ")")); + haBroker.getMembership().assign(values[MEMBERS].asList()); } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Invalid replicate default from primary: " - << e.what()); + QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what()); haBroker.shutdown(); } } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index d2fd23e63d..57867587a9 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -81,6 +81,7 @@ class BrokerReplicator : public broker::Exchange void doEventExchangeDelete(types::Variant::Map& values); void doEventBind(types::Variant::Map&); void doEventUnbind(types::Variant::Map&); + void doEventMembersUpdate(types::Variant::Map&); void doResponseQueue(types::Variant::Map& values); void doResponseExchange(types::Variant::Map& values); diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp index 18b4432aa1..9294f38ef3 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp @@ -21,6 +21,7 @@ #include "ConnectionExcluder.h" #include "BrokerInfo.h" +#include "HaBroker.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Connection.h" #include <boost/function.hpp> @@ -29,8 +30,19 @@ namespace qpid { namespace ha { -ConnectionExcluder::ConnectionExcluder(const LogPrefix& lp, const framing::Uuid& uuid) - : logPrefix(lp), backupAllowed(false), self(uuid) {} +ConnectionExcluder::ConnectionExcluder(HaBroker& hb, const types::Uuid& uuid) + : haBroker(hb), logPrefix(hb), self(uuid) {} + +namespace { +bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info) { + framing::FieldTable ft; + if (connection.getClientProperties().getTable(ConnectionExcluder::BACKUP_TAG, ft)) { + info = BrokerInfo(ft); + return true; + } + return false; +} +} void ConnectionExcluder::opened(broker::Connection& connection) { if (connection.isLink()) return; // Allow all outgoing links @@ -39,23 +51,37 @@ void ConnectionExcluder::opened(broker::Connection& connection) { << connection.getMgmtId()); return; } - framing::FieldTable ft; - if (connection.getClientProperties().getTable(BACKUP_TAG, ft)) { - BrokerInfo info(ft); + BrokerStatus status = haBroker.getStatus(); + if (isBackup(status)) reject(connection); + BrokerInfo info; // Get info about a connecting backup. + if (getBrokerInfo(connection, info)) { if (info.getSystemId() == self) { - QPID_LOG(debug, logPrefix << "Self connection rejected"); + QPID_LOG(debug, logPrefix << "Rejected self connection"); + reject(connection); } else { - QPID_LOG(debug, logPrefix << "Backup connection " << info << - (backupAllowed ? " allowed" : " rejected")); - if (backupAllowed) return; + QPID_LOG(debug, logPrefix << "Allowed backup connection " << info); + haBroker.getMembership().add(info); + return; } } - // Abort the connection. + else // This is a client connection. + if (status == RECOVERING) reject(connection); // FIXME aconway 2012-05-29: allow clients in recovery +} + +void ConnectionExcluder::reject(broker::Connection& connection) { throw Exception( QPID_MSG(logPrefix << "Rejected connection " << connection.getMgmtId())); } +void ConnectionExcluder::closed(broker::Connection& connection) { + BrokerInfo info; + BrokerStatus status = haBroker.getStatus(); + if (isBackup(status)) return; // Don't mess with the map received from primary. + if (getBrokerInfo(connection, info)) + haBroker.getMembership().remove(info.getSystemId()); +} + const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin"; const std::string ConnectionExcluder::BACKUP_TAG="qpid.ha-backup"; diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h index 042544c333..629fda7519 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h @@ -25,6 +25,7 @@ #include "LogPrefix.h" #include "qpid/broker/ConnectionObserver.h" #include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" #include <boost/function.hpp> namespace qpid { @@ -46,17 +47,19 @@ class ConnectionExcluder : public broker::ConnectionObserver static const std::string ADMIN_TAG; static const std::string BACKUP_TAG; - ConnectionExcluder(const LogPrefix&, const framing::Uuid& self); + ConnectionExcluder(HaBroker&, const types::Uuid& self); void opened(broker::Connection& connection); + void closed(broker::Connection& connection); - void setBackupAllowed(bool set) { backupAllowed = set; } - bool isBackupAllowed() const { return backupAllowed; } + void setStatus(BrokerStatus); private: + void reject(broker::Connection&); + + HaBroker& haBroker; LogPrefix logPrefix; - bool backupAllowed; - framing::Uuid self; + types::Uuid self; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 9b58bac484..021e4d559e 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -33,11 +33,14 @@ #include "qpid/framing/FieldTable.h" #include "qpid/management/ManagementAgent.h" #include "qpid/sys/SystemInfo.h" +#include "qpid/types/Uuid.h" +#include "qpid/framing/Uuid.h" #include "qmf/org/apache/qpid/ha/Package.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h" +#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include "qpid/log/Statement.h" namespace qpid { @@ -50,14 +53,15 @@ using namespace std; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : logPrefix(status), broker(b), + systemId(broker.getSystem()->getSystemId().data()), settings(s), mgmtObject(0), status(STANDALONE), - excluder(new ConnectionExcluder(logPrefix, broker.getSystem()->getSystemId())), + excluder(new ConnectionExcluder(*this, systemId)), brokerInfo(broker.getSystem()->getNodeName(), // TODO aconway 2012-05-24: other transports? - broker.getPort(broker::Broker::TCP_TRANSPORT), - broker.getSystem()->getSystemId()) + broker.getPort(broker::Broker::TCP_TRANSPORT), systemId), + membership(logPrefix, boost::bind(&HaBroker::membershipUpdate, this, _1)) { // Set up the management object. @@ -67,6 +71,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) _qmf::Package packageInit(ma); mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); mgmtObject->set_replicateDefault(settings.replicateDefault.str()); + mgmtObject->set_systemId(systemId); ma->addObject(mgmtObject); // Register a factory for replicating subscriptions. @@ -92,11 +97,14 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) QPID_LOG(notice, logPrefix << "Broker starting on " << brokerInfo); } -HaBroker::~HaBroker() {} +HaBroker::~HaBroker() { + broker.getConnectionObservers().remove(excluder); +} void HaBroker::recover(sys::Mutex::ScopedLock&) { setStatus(RECOVERING); backup.reset(); // No longer replicating, close link. + membership.reset(brokerInfo); primary.reset(new Primary(*this)); // Starts primary-ready check. } @@ -107,9 +115,11 @@ void HaBroker::activate() { } void HaBroker::activate(sys::Mutex::ScopedLock&) { + BrokerStatus oldStatus = status; setStatus(ACTIVE); + if (oldStatus != RECOVERING) // Already set membership + membership.reset(brokerInfo); backup.reset(); // No longer replicating, close link. - broker.getConnectionObservers().remove(excluder); // This allows client connections. } ReplicateLevel HaBroker::replicateLevel(const std::string& str) { @@ -173,7 +183,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); Url url(bq_args.i_broker); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; - framing::Uuid uuid(true); + types::Uuid uuid(true); std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare( broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), url[0].host, url[0].port, protocol, @@ -282,6 +292,12 @@ void HaBroker::statusChanged(sys::Mutex::ScopedLock& l) { setLinkProperties(l); } +void HaBroker::membershipUpdate(const types::Variant::List& brokers) { + QPID_LOG(debug, logPrefix << "Membership update: " << brokers); + mgmtObject->set_members(brokers); + broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); +} + void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) { framing::FieldTable linkProperties = broker.getLinkClientProperties(); if (isBackup(status)) { diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 017ccefd27..224a0923c5 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -23,6 +23,7 @@ */ #include "BrokerInfo.h" +#include "Membership.h" #include "Enum.h" #include "LogPrefix.h" #include "Settings.h" @@ -91,6 +92,8 @@ class HaBroker : public management::Manageable boost::shared_ptr<ConnectionExcluder> getExcluder() { return excluder; } const BrokerInfo& getBrokerInfo() const { return brokerInfo; } + Membership& getMembership() { return membership; } + void membershipUpdate(const types::Variant::List&); private: void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); @@ -110,6 +113,7 @@ class HaBroker : public management::Manageable LogPrefix logPrefix; broker::Broker& broker; + types::Uuid systemId; const Settings settings; mutable sys::Mutex lock; @@ -123,6 +127,7 @@ class HaBroker : public management::Manageable QueueNames activeBackups; boost::shared_ptr<ConnectionExcluder> excluder; BrokerInfo brokerInfo; + Membership membership; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp new file mode 100644 index 0000000000..7d22a019d5 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -0,0 +1,89 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Membership.h" + +namespace qpid { +namespace ha { + + +void Membership::reset(const BrokerInfo& b) { + { + sys::Mutex::ScopedLock l(lock); + brokers.clear(); + brokers[b.getSystemId()] = b; + } + update(); +} + +void Membership::add(const BrokerInfo& b) { + { + sys::Mutex::ScopedLock l(lock); + brokers[b.getSystemId()] = b; + } + update(); +} + + +void Membership::remove(const types::Uuid& id) { + { + sys::Mutex::ScopedLock l(lock); + BrokerMap::iterator i = brokers.find(id); + if (i != brokers.end()) + brokers.erase(i); + } + update(); +} + +bool Membership::contains(const types::Uuid& id) { + sys::Mutex::ScopedLock l(lock); + return brokers.find(id) != brokers.end(); +} + +void Membership::assign(const types::Variant::List& list) { + { + sys::Mutex::ScopedLock l(lock); + brokers.clear(); + for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + BrokerInfo b(i->asMap()); + brokers[b.getSystemId()] = b; + } + } + update(); +} + +types::Variant::List Membership::asList() const { + sys::Mutex::ScopedLock l(lock); + types::Variant::List list; + for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i) + list.push_back(i->second.asMap()); + return list; +} + +void Membership::update() { + if (updateCallback) { + types::Variant::List list; + for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i) + list.push_back(i->second.asMap()); + updateCallback(list); + } +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h new file mode 100644 index 0000000000..8af03e0f40 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Membership.h @@ -0,0 +1,65 @@ +#ifndef QPID_HA_MEMBERSHIP_H +#define QPID_HA_MEMBERSHIP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerInfo.h" +#include "LogPrefix.h" +#include "qpid/framing/Uuid.h" +#include "qpid/log/Statement.h" +#include "qpid/types/Variant.h" +#include <boost/function.hpp> +#include <set> +#include <vector> +namespace qpid { +namespace ha { + +/** + * Keep track of the brokers in the membership. + * THREAD SAFE: updated in arbitrary connection threads. + */ +class Membership +{ + public: + Membership(LogPrefix lp, boost::function<void(const types::Variant::List&)> updateFn) + : logPrefix(lp), updateCallback(updateFn) {} + + void reset(const BrokerInfo& b); ///< Reset to contain just one member. + void add(const BrokerInfo& b); + void remove(const types::Uuid& id); + bool contains(const types::Uuid& id); + + void assign(const types::Variant::List&); + types::Variant::List asList() const; + + private: + typedef std::map<types::Uuid, BrokerInfo> BrokerMap; + void update(); + + mutable sys::Mutex lock; + LogPrefix logPrefix; + BrokerMap brokers; + boost::function<void(const types::Variant::List&)> updateCallback; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_MEMBERSHIP_H*/ diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 0a577d08e4..ae48e48c6f 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -57,8 +57,6 @@ Primary::Primary(HaBroker& b) : else { QPID_LOG(debug, logPrefix << "Waiting for " << expected << " backups on " << queues.size() << " queues"); - // Allow backups to connect. - haBroker.getExcluder()->setBackupAllowed(true); } } } diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml index 681ace370f..ce14032694 100644 --- a/qpid/cpp/src/qpid/ha/management-schema.xml +++ b/qpid/cpp/src/qpid/ha/management-schema.xml @@ -34,9 +34,12 @@ <property name="expectedBackups" type="uint16" desc="Number of HA backup brokers expected."/> - <property - name="replicateDefault" type="sstr" - desc="Replicate value for queues/exchanges without a qpid.replicate argument"/> + <property name="replicateDefault" type="sstr" + desc="Replication for queues/exchanges with no qpid.replicate argument"/> + + <property name="members" type="list" desc="List of brokers in the cluster"/> + + <property name="systemId" type="uuid" desc="Identifies the system."/> <method name="promote" desc="Promote a backup broker to primary."/> @@ -58,4 +61,10 @@ </method> </class> + <eventArguments> + <arg name="members" type="list" desc="List of broker information maps"/> + </eventArguments> + + <event name="membersUpdate" sev="inform" args="members"/> + </schema> diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 88fb8855ba..6e270851f0 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -26,19 +26,24 @@ from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG from qpidtoollibs import BrokerAgent +from uuid import UUID log = getLogger(__name__) -class QmfHaBroker(object): +class QmfAgent(object): + """Access to a QMF broker agent.""" def __init__(self, address): - self.connection = Connection.establish( + self._connection = Connection.establish( address, client_properties={"qpid.ha-admin":1}) - self.qmf = BrokerAgent(self.connection) - self.ha_broker = self.qmf.getHaBroker() - if not self.ha_broker: - raise Exception("HA module is not loaded on broker at %s"%address) + self._agent = BrokerAgent(self._connection) + assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address) + + def __getattr__(self, name): + a = getattr(self._agent, name) + return a class HaBroker(Broker): + """Start a broker with HA enabled""" def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" @@ -58,6 +63,7 @@ class HaBroker(Broker): assert os.path.exists(self.qpid_config_path) getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. self.qpid_ha_script=import_script(self.qpid_ha_path) + self._agent = None def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args) @@ -65,7 +71,11 @@ class HaBroker(Broker): def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) - def ha_status(self): QmfHaBroker(self.host_port()).ha_broker.status + def agent(self): + if not self._agent: self._agent = QmfAgent(self.host_port()) + return self._agent + + def ha_status(self): self.agent().getHaBroker().status # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -641,6 +651,31 @@ class ReplicationTests(BrokerTest): self.failIf(i < 0) self.assertEqual(log.find("caught up", i), -1) + def test_broker_info(self): + """Check that broker information is correctly published via management""" + cluster = HaCluster(self, 3) + + for broker in cluster: # Make sure HA system-id matches broker's + qmf = broker.agent().getHaBroker() + self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef)) + + cluster_ports = map(lambda b: b.port(), cluster) + cluster_ports.sort() + def ports(qmf): + qmf.update() + return sorted(map(lambda b: b["port"], qmf.members)) + # Check that all brokers have the same membership as the cluster + for broker in cluster: + qmf = broker.agent().getHaBroker() + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + # Add a new broker, check it is updated everywhere + b = cluster.start() + cluster_ports.append(b.port()) + cluster_ports.sort() + for broker in cluster: + qmf = broker.agent().getHaBroker() + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit |
