From e9cf88573a6532010a70f28fec8869bc034fe16b Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 22 May 2012 18:11:07 +0000 Subject: QPID-3603: HA backups pass identifying info to primary. Pass hostname, management UUID and status in link connection arguments. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1341580 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/CMakeLists.txt | 2 + qpid/cpp/src/ha.mk | 2 + qpid/cpp/src/qpid/broker/Broker.h | 3 ++ qpid/cpp/src/qpid/broker/System.cpp | 10 ++--- qpid/cpp/src/qpid/broker/System.h | 17 ++++++++ qpid/cpp/src/qpid/framing/Uuid.cpp | 7 ++++ qpid/cpp/src/qpid/ha/Backup.cpp | 6 +-- qpid/cpp/src/qpid/ha/BrokerInfo.cpp | 59 +++++++++++++++++++++++++++ qpid/cpp/src/qpid/ha/BrokerInfo.h | 62 +++++++++++++++++++++++++++++ qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp | 20 +++++----- qpid/cpp/src/qpid/ha/HaBroker.cpp | 37 ++++++++++++----- qpid/cpp/src/qpid/ha/HaBroker.h | 5 +++ qpid/cpp/src/tests/Uuid.cpp | 5 +++ 13 files changed, 207 insertions(+), 28 deletions(-) create mode 100644 qpid/cpp/src/qpid/ha/BrokerInfo.cpp create mode 100644 qpid/cpp/src/qpid/ha/BrokerInfo.h (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 8e4e9dae34..2ccb52e5de 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -628,6 +628,8 @@ if (BUILD_HA) set (ha_SOURCES qpid/ha/Backup.cpp qpid/ha/Backup.h + qpid/ha/BrokerInfo.h + qpid/ha/BrokerInfo.cpp qpid/ha/BrokerReplicator.cpp qpid/ha/BrokerReplicator.h qpid/ha/ConnectionExcluder.cpp diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 31f7bcc494..ae7b803b87 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -25,6 +25,8 @@ dmoduleexec_LTLIBRARIES += ha.la ha_la_SOURCES = \ qpid/ha/Backup.cpp \ qpid/ha/Backup.h \ + qpid/ha/BrokerInfo.h \ + qpid/ha/BrokerInfo.cpp \ qpid/ha/BrokerReplicator.cpp \ qpid/ha/BrokerReplicator.h \ qpid/ha/ConnectionExcluder.cpp \ diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 72ed05aacb..c57e3c849c 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -384,6 +384,9 @@ class Broker : public sys::Runnable, public Plugin::Target, /** Properties to be set on outgoing link connections */ QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const; QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&); + + /** Information identifying this system */ + boost::shared_ptr getSystem() const { return systemObject; } }; }} diff --git a/qpid/cpp/src/qpid/broker/System.cpp b/qpid/cpp/src/qpid/broker/System.cpp index 8cd2edda76..fa8df6406b 100644 --- a/qpid/cpp/src/qpid/broker/System.cpp +++ b/qpid/cpp/src/qpid/broker/System.cpp @@ -37,7 +37,6 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) if (agent != 0) { - framing::Uuid systemId; if (_dataDir.empty ()) { @@ -66,14 +65,13 @@ System::System (string _dataDir, Broker* broker) : mgmtObject(0) } mgmtObject = new _qmf::System(agent, this, types::Uuid(systemId.c_array())); - std::string sysname, nodename, release, version, machine; - qpid::sys::SystemInfo::getSystemId (sysname, - nodename, + qpid::sys::SystemInfo::getSystemId (osName, + nodeName, release, version, machine); - mgmtObject->set_osName (sysname); - mgmtObject->set_nodeName (nodename); + mgmtObject->set_osName (osName); + mgmtObject->set_nodeName (nodeName); mgmtObject->set_release (release); mgmtObject->set_version (version); mgmtObject->set_machine (machine); diff --git a/qpid/cpp/src/qpid/broker/System.h b/qpid/cpp/src/qpid/broker/System.h index 0fc2c2bd88..6847c662ae 100644 --- a/qpid/cpp/src/qpid/broker/System.h +++ b/qpid/cpp/src/qpid/broker/System.h @@ -21,6 +21,7 @@ // #include "qpid/management/Manageable.h" +#include "qpid/framing/Uuid.h" #include "qmf/org/apache/qpid/broker/System.h" #include #include @@ -35,6 +36,8 @@ class System : public management::Manageable private: qmf::org::apache::qpid::broker::System* mgmtObject; + framing::Uuid systemId; + std::string osName, nodeName, release, version, machine; public: @@ -44,6 +47,20 @@ class System : public management::Manageable management::ManagementObject* GetManagementObject (void) const { return mgmtObject; } + + + /** Persistent UUID assigned by the management system to this broker. */ + framing::Uuid getSystemId() const { return systemId; } + /** Returns the OS name; e.g., GNU/Linux or Windows */ + std::string getOsName() const { return osName; } + /** Returns the node name. Usually the same as the host name. */ + std::string getNodeName() const { return nodeName; } + /** Returns the OS release identifier. */ + std::string getRelease() const { return release; } + /** Returns the OS release version (kernel, build, sp, etc.) */ + std::string getVersion() const { return version; } + /** Returns the hardware type. */ + std::string getMachine() const { return machine; } }; }} diff --git a/qpid/cpp/src/qpid/framing/Uuid.cpp b/qpid/cpp/src/qpid/framing/Uuid.cpp index b3d1e2e1e4..e94e28ffee 100644 --- a/qpid/cpp/src/qpid/framing/Uuid.cpp +++ b/qpid/cpp/src/qpid/framing/Uuid.cpp @@ -43,6 +43,13 @@ Uuid::Uuid(const uint8_t* data) { assign(data); } +Uuid::Uuid(const std::string& s) { + if (s.size() != UNPARSED_SIZE) + throw IllegalArgumentException(QPID_MSG("Invalid UUID: " << s)); + if (uuid_parse(&s[0], c_array()) != 0) + throw IllegalArgumentException(QPID_MSG("Invalid UUID: " << s)); +} + void Uuid::assign(const uint8_t* data) { // This const cast is for Solaris which has a // uuid_copy that takes a non const 2nd argument diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 8b62f89f11..f00d8a8f4c 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -61,14 +61,14 @@ Url Backup::linkUrl(const Url& brokers) const { Url url; for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (!isSelf(*i)) url.push_back(*i); - if (url.empty()) throw Url::Invalid("HA broker Link URL is empty"); - QPID_LOG(debug, logPrefix << "Link URL set to: " << url); + if (url.empty()) throw Url::Invalid("HA Backup failover URL is empty"); + QPID_LOG(debug, logPrefix << "Backup failover URL (excluding self): " << url); return url; } void Backup::initialize(const Url& brokers) { if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(info, logPrefix << "Backup initialized with broker URL: " << brokers); + QPID_LOG(info, logPrefix << "Backup broker URL: " << brokers); sys::Mutex::ScopedLock l(lock); Url url = linkUrl(brokers); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp new file mode 100644 index 0000000000..59c58a68b5 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerInfo.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" +#include + + +namespace qpid { +namespace ha { + +namespace { +std::string SYSTEM_ID="system-id"; +std::string HOST_NAME="host-name"; +std::string STATUS="status"; +} + +using framing::Uuid; +using framing::FieldTable; + +FieldTable BrokerInfo::asFieldTable() const { + FieldTable ft; + ft.setString(SYSTEM_ID, systemId.str()); + ft.setString(HOST_NAME, hostName); + ft.setInt(STATUS, status); + return ft; +} + +void BrokerInfo::assign(const FieldTable& ft) { + systemId = Uuid(ft.getAsString(SYSTEM_ID)); + hostName = ft.getAsString(HOST_NAME); + status = BrokerStatus(ft.getAsInt(STATUS)); +} + +std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { + return o << b.getHostName() << "(" << b.getSystemId() + << "," << printable(b.getStatus()) << ")"; +} + +}} diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h new file mode 100644 index 0000000000..d72b6793ff --- /dev/null +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h @@ -0,0 +1,62 @@ +#ifndef QPID_HA_BROKERINFO_H +#define QPID_HA_BROKERINFO_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Enum.h" +#include "qpid/framing/Uuid.h" +#include "qpid/framing/FieldTable.h" +#include +#include + +namespace qpid { +namespace ha { + +/** + * Information about a cluster broker, maintained by the cluster primary. + */ +class BrokerInfo +{ + public: + BrokerInfo(const std::string& host, const framing::Uuid& id) : + hostName(host), systemId(id) {} + + BrokerInfo(const framing::FieldTable& ft) { assign(ft); } + framing::FieldTable asFieldTable() const; + void assign(const framing::FieldTable&); + + framing::Uuid getSystemId() const { return systemId; } + std::string getHostName() const { return hostName; } + BrokerStatus getStatus() const { return status; } + void setStatus(BrokerStatus s) { status = s; } + + private: + std::string hostName; + framing::Uuid systemId; + BrokerStatus status; +}; + +std::ostream& operator<<(std::ostream&, const BrokerInfo&); + +}} // namespace qpid::ha + +#endif /*!QPID_HA_BROKERINFO_H*/ diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp index fef4c67174..6c413982f8 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp @@ -20,6 +20,8 @@ */ #include "ConnectionExcluder.h" +#include "BrokerInfo.h" +#include "qpid/framing/FieldTable.h" #include "qpid/broker/Connection.h" #include #include @@ -37,18 +39,16 @@ void ConnectionExcluder::opened(broker::Connection& connection) { << connection.getMgmtId()); return; } - if (connection.getClientProperties().isSet(BACKUP_TAG)) { - if (backupAllowed) { - QPID_LOG(debug, logPrefix << "Allowing backup connection: " - << connection.getMgmtId()); - return; - } - else QPID_LOG(debug, logPrefix << "Rejected backup connection: " - << connection.getMgmtId()); + framing::FieldTable ft; + if (connection.getClientProperties().getTable(BACKUP_TAG, ft)) { + BrokerInfo info(ft); + QPID_LOG(debug, logPrefix << "Backup connection " << info << + (backupAllowed ? " allowed" : " rejected")); + if (backupAllowed) return; } - + // Abort the connection. throw Exception( - QPID_MSG(logPrefix << "Rejected client connection " << connection.getMgmtId())); + QPID_MSG(logPrefix << "Rejected connection " << connection.getMgmtId())); } const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin"; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 9caa96a607..ecaf968053 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -24,6 +24,7 @@ #include "Primary.h" #include "Settings.h" #include "ReplicatingSubscription.h" +#include "qpid/amqp_0_10/Codecs.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" @@ -31,6 +32,7 @@ #include "qpid/broker/SignalHandler.h" #include "qpid/framing/FieldTable.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/sys/SystemInfo.h" #include "qmf/org/apache/qpid/ha/Package.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h" @@ -51,7 +53,10 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) settings(s), mgmtObject(0), status(STANDALONE), - excluder(new ConnectionExcluder(logPrefix)) + excluder(new ConnectionExcluder(logPrefix)), + brokerInfo(broker.getSystem()->getNodeName(), + broker.getSystem()->getSystemId()) + { // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); @@ -81,6 +86,8 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); statusChanged(l); + + QPID_LOG(notice, logPrefix << "Broker starting on " << brokerInfo); } HaBroker::~HaBroker() {} @@ -267,15 +274,27 @@ void HaBroker::setStatus(BrokerStatus newStatus, sys::Mutex::ScopedLock& l) { statusChanged(l); } -void HaBroker::statusChanged(sys::Mutex::ScopedLock&) { +void HaBroker::statusChanged(sys::Mutex::ScopedLock& l) { mgmtObject->set_status(printable(status).str()); - // Set the backup-related properties for newly created links. - framing::FieldTable ft = broker.getLinkClientProperties(); - if (isBackup(status)) - ft.setInt(ConnectionExcluder::BACKUP_TAG, 1); - else - ft.erase(ConnectionExcluder::BACKUP_TAG); - broker.setLinkClientProperties(ft); + brokerInfo.setStatus(status); + setLinkProperties(l); +} + +void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) { + framing::FieldTable linkProperties = broker.getLinkClientProperties(); + if (isBackup(status)) { + // If this is a backup then any links we make are backup links + // and need to be tagged. + QPID_LOG(debug, logPrefix << "Backup setting info for outgoing links: " << brokerInfo); + linkProperties.setTable(ConnectionExcluder::BACKUP_TAG, brokerInfo.asFieldTable()); + } + else { + // If this is a primary then any links are federation links + // and should not be tagged. + QPID_LOG(debug, logPrefix << "Primary removing backup info for outgoing links"); + linkProperties.erase(ConnectionExcluder::BACKUP_TAG); + } + broker.setLinkClientProperties(linkProperties); } void HaBroker::activatedBackup(const std::string& queue) { diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index ebd4836e71..017ccefd27 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -22,6 +22,7 @@ * */ +#include "BrokerInfo.h" #include "Enum.h" #include "LogPrefix.h" #include "Settings.h" @@ -89,6 +90,8 @@ class HaBroker : public management::Manageable boost::shared_ptr getExcluder() { return excluder; } + const BrokerInfo& getBrokerInfo() const { return brokerInfo; } + private: void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&); @@ -101,6 +104,7 @@ class HaBroker : public management::Manageable void recover(sys::Mutex::ScopedLock&); void activate(sys::Mutex::ScopedLock&); void statusChanged(sys::Mutex::ScopedLock&); + void setLinkProperties(sys::Mutex::ScopedLock&); std::vector getKnownBrokers() const; @@ -118,6 +122,7 @@ class HaBroker : public management::Manageable BrokerStatus status; QueueNames activeBackups; boost::shared_ptr excluder; + BrokerInfo brokerInfo; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/tests/Uuid.cpp b/qpid/cpp/src/tests/Uuid.cpp index f85a297adc..d7d1f00e3c 100644 --- a/qpid/cpp/src/tests/Uuid.cpp +++ b/qpid/cpp/src/tests/Uuid.cpp @@ -52,6 +52,11 @@ boost::array sample = {{0x1b, 0x4e, 0x28, 0xba, 0x2f, 0xa1, 0x11, const string sampleStr("1b4e28ba-2fa1-11d2-883f-b9a761bde3fb"); const string zeroStr("00000000-0000-0000-0000-000000000000"); +QPID_AUTO_TEST_CASE(testUuidStr) { + Uuid uuid(sampleStr); + BOOST_CHECK(uuid == sample); +} + QPID_AUTO_TEST_CASE(testUuidIstream) { Uuid uuid; istringstream in(sampleStr); -- cgit v1.2.1