From 229dd0ea527beec82f3217c94b993c5e66286993 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 29 Apr 2013 15:57:59 +0000 Subject: QPID-4787: HA brokers find self-address in brokers_url. HA brokers need to know their own addresses, but it is not safe to simply use local hosts name and Broker::getPort() since the broker may be listening on multiple addresses. The solution is to have brokers check the ha-rokers-url for their own address while doing the initial status check of the cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1477165 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/ha/BrokerInfo.cpp | 8 ++++++-- cpp/src/qpid/ha/ConnectionObserver.cpp | 26 +++++++++++++++++++++----- cpp/src/qpid/ha/ConnectionObserver.h | 6 +++++- cpp/src/qpid/ha/HaBroker.cpp | 17 ++++++++--------- cpp/src/qpid/ha/HaBroker.h | 2 ++ cpp/src/qpid/ha/StatusCheck.cpp | 5 ++++- cpp/src/tests/ha_tests.py | 31 +++++++++++++++---------------- 7 files changed, 61 insertions(+), 34 deletions(-) (limited to 'cpp/src') diff --git a/cpp/src/qpid/ha/BrokerInfo.cpp b/cpp/src/qpid/ha/BrokerInfo.cpp index 8efed91b17..c3183d8f47 100644 --- a/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/cpp/src/qpid/ha/BrokerInfo.cpp @@ -89,8 +89,12 @@ void BrokerInfo::assign(const Variant::Map& m) { } std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { - return o << b.getHostName() << ":" << b.getPort() << "(" - << printable(b.getStatus()) << ")"; + o << "FIXME:"; + o << b.getSystemId().str().substr(0,7); + if (!b.getHostName().empty()) + o << "@" << b.getHostName() << ":" << b.getPort(); + o << "(" << printable(b.getStatus()) << ")"; + return o; } std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) { diff --git a/cpp/src/qpid/ha/ConnectionObserver.cpp b/cpp/src/qpid/ha/ConnectionObserver.cpp index 775222efd3..76be46a92b 100644 --- a/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -22,6 +22,7 @@ #include "ConnectionObserver.h" #include "BrokerInfo.h" #include "HaBroker.h" +#include "qpid/Url.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" @@ -41,6 +42,17 @@ bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, Bro return false; } +bool ConnectionObserver::getAddress(const broker::Connection& connection, Address& addr) { + Url url; + url.parseNoThrow( + connection.getClientProperties().getAsString(ConnectionObserver::ADDRESS_TAG).c_str()); + if (!url.empty()) { + addr = url[0]; + return true; + } + return false; +} + void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix) { sys::Mutex::ScopedLock l(lock); @@ -60,17 +72,20 @@ bool ConnectionObserver::isSelf(const broker::Connection& connection) { void ConnectionObserver::opened(broker::Connection& connection) { try { + if (isSelf(connection)) { // Reject self connections + // Set my own address if there is an address header. + Address addr; + if (getAddress(connection, addr)) haBroker.setAddress(addr); + QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId()); + connection.abort(); + return; + } if (connection.isLink()) return; // Allow outgoing links. if (connection.getClientProperties().isSet(ADMIN_TAG)) { QPID_LOG(debug, logPrefix << "Accepted admin connection: " << connection.getMgmtId()); return; // No need to call observer, always allow admins. } - if (isSelf(connection)) { // Reject self connections - QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId()); - connection.abort(); - return; - } ObserverPtr o(getObserver()); if (o) o->opened(connection); } @@ -94,5 +109,6 @@ void ConnectionObserver::closed(broker::Connection& connection) { const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin"; const std::string ConnectionObserver::BACKUP_TAG="qpid.ha-backup"; +const std::string ConnectionObserver::ADDRESS_TAG="qpid.ha-address"; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ConnectionObserver.h b/cpp/src/qpid/ha/ConnectionObserver.h index 5374660dbe..85510ab459 100644 --- a/cpp/src/qpid/ha/ConnectionObserver.h +++ b/cpp/src/qpid/ha/ConnectionObserver.h @@ -29,6 +29,8 @@ #include "boost/shared_ptr.hpp" namespace qpid { +class Address; + namespace ha { class BrokerInfo; class HaBroker; @@ -50,8 +52,10 @@ class ConnectionObserver : public broker::ConnectionObserver static const std::string ADMIN_TAG; static const std::string BACKUP_TAG; + static const std::string ADDRESS_TAG; - static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info); + static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo&); + static bool getAddress(const broker::Connection& connection, Address&); ConnectionObserver(HaBroker& haBroker, const types::Uuid& self); diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index 590db7efa5..b5258dc84a 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -84,15 +84,7 @@ bool isNone(const std::string& x) { return x.empty() || x == NONE; } // Called in Plugin::initialize void HaBroker::initialize() { - // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port. - membership.add( - BrokerInfo( - membership.getSelf(), - settings.cluster ? JOINING : membership.getStatus(), - broker.getSystem()->getNodeName(), - broker.getPort(broker::Broker::TCP_TRANSPORT) - ) - ); + if (settings.cluster) membership.setStatus(JOINING); QPID_LOG(notice, "Initializing: " << membership.getInfo()); // Set up the management object. @@ -207,4 +199,11 @@ BrokerStatus HaBroker::getStatus() const { return membership.getStatus(); } +void HaBroker::setAddress(const Address& a) { + QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a); + BrokerInfo b(membership.getSelf(), membership.getStatus(), a.host, a.port); + membership.add(b); +} + + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h index 6b15c88e0a..1b3666362a 100644 --- a/cpp/src/qpid/ha/HaBroker.h +++ b/cpp/src/qpid/ha/HaBroker.h @@ -90,6 +90,8 @@ class HaBroker : public management::Manageable Membership& getMembership() { return membership; } types::Uuid getSystemId() const { return systemId; } + void setAddress(const Address&); // set self address from a self-connection + private: void setPublicUrl(const Url&); diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp index 2921b9ec55..e56efc5873 100644 --- a/cpp/src/qpid/ha/StatusCheck.cpp +++ b/cpp/src/qpid/ha/StatusCheck.cpp @@ -19,6 +19,7 @@ * */ #include "StatusCheck.h" +#include "ConnectionObserver.h" #include "qpid/log/Statement.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Connection.h" @@ -55,7 +56,9 @@ void StatusCheckThread::run() { try { Variant::Map options, clientProperties; clientProperties = brokerInfo.asMap(); // Detect self connections. - clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups. + clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups. + clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str(); + clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap(); options["client-properties"] = clientProperties; options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC; diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index e7be2a246d..8ee27b5519 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -545,26 +545,25 @@ class ReplicationTests(HaBrokerTest): """Check that broker information is correctly published via management""" cluster = HaCluster(self, 3) + def ha_broker(broker): + ha_broker = broker.agent().getHaBroker(); + ha_broker.update() + return ha_broker + for broker in cluster: # Make sure HA system-id matches broker's - qmf = broker.agent().getHaBroker() - self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef)) - - cluster_ports = map(lambda b: b.port(), cluster) - cluster_ports.sort() - def ports(qmf): - qmf.update() - return sorted(map(lambda b: b["port"], qmf.members)) + self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent().getBroker().systemRef)) + # Check that all brokers have the same membership as the cluster - for broker in cluster: - qmf = broker.agent().getHaBroker() - assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker) + def check_ids(broker): + cluster_ids = set([ ha_broker(b).systemId for b in cluster]) + broker_ids = set([m["system-id"] for m in ha_broker(broker).members]) + assert retry(lambda: cluster_ids == broker_ids, 1), "%s != %s on %s"%(cluster_ids, broker_ids, broker) + + for broker in cluster: check_ids(broker) + # Add a new broker, check it is updated everywhere b = cluster.start() - cluster_ports.append(b.port()) - cluster_ports.sort() - for broker in cluster: - qmf = broker.agent().getHaBroker() - assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + for broker in cluster: check_ids(broker) def test_auth(self): """Verify that authentication does not interfere with replication.""" -- cgit v1.2.1