summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-04-29 15:57:59 +0000
committerAlan Conway <aconway@apache.org>2013-04-29 15:57:59 +0000
commit229dd0ea527beec82f3217c94b993c5e66286993 (patch)
tree2124944807ddfde7baf7cfc96e92e4033e432a24 /cpp/src
parentf89b692caa9d9a9ddeddf7c244829db2c21cc85e (diff)
downloadqpid-python-229dd0ea527beec82f3217c94b993c5e66286993.tar.gz
QPID-4787: HA brokers find self-address in brokers_url.
HA brokers need to know their own addresses, but it is not safe to simply use local hosts name and Broker::getPort() since the broker may be listening on multiple addresses. The solution is to have brokers check the ha-rokers-url for their own address while doing the initial status check of the cluster. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1477165 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/ha/BrokerInfo.cpp8
-rw-r--r--cpp/src/qpid/ha/ConnectionObserver.cpp26
-rw-r--r--cpp/src/qpid/ha/ConnectionObserver.h6
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp17
-rw-r--r--cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--cpp/src/qpid/ha/StatusCheck.cpp5
-rwxr-xr-xcpp/src/tests/ha_tests.py31
7 files changed, 61 insertions, 34 deletions
diff --git a/cpp/src/qpid/ha/BrokerInfo.cpp b/cpp/src/qpid/ha/BrokerInfo.cpp
index 8efed91b17..c3183d8f47 100644
--- a/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -89,8 +89,12 @@ void BrokerInfo::assign(const Variant::Map& m) {
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
- return o << b.getHostName() << ":" << b.getPort() << "("
- << printable(b.getStatus()) << ")";
+ o << "FIXME:";
+ o << b.getSystemId().str().substr(0,7);
+ if (!b.getHostName().empty())
+ o << "@" << b.getHostName() << ":" << b.getPort();
+ o << "(" << printable(b.getStatus()) << ")";
+ return o;
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) {
diff --git a/cpp/src/qpid/ha/ConnectionObserver.cpp b/cpp/src/qpid/ha/ConnectionObserver.cpp
index 775222efd3..76be46a92b 100644
--- a/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -22,6 +22,7 @@
#include "ConnectionObserver.h"
#include "BrokerInfo.h"
#include "HaBroker.h"
+#include "qpid/Url.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
@@ -41,6 +42,17 @@ bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, Bro
return false;
}
+bool ConnectionObserver::getAddress(const broker::Connection& connection, Address& addr) {
+ Url url;
+ url.parseNoThrow(
+ connection.getClientProperties().getAsString(ConnectionObserver::ADDRESS_TAG).c_str());
+ if (!url.empty()) {
+ addr = url[0];
+ return true;
+ }
+ return false;
+}
+
void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix)
{
sys::Mutex::ScopedLock l(lock);
@@ -60,17 +72,20 @@ bool ConnectionObserver::isSelf(const broker::Connection& connection) {
void ConnectionObserver::opened(broker::Connection& connection) {
try {
+ if (isSelf(connection)) { // Reject self connections
+ // Set my own address if there is an address header.
+ Address addr;
+ if (getAddress(connection, addr)) haBroker.setAddress(addr);
+ QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
+ connection.abort();
+ return;
+ }
if (connection.isLink()) return; // Allow outgoing links.
if (connection.getClientProperties().isSet(ADMIN_TAG)) {
QPID_LOG(debug, logPrefix << "Accepted admin connection: "
<< connection.getMgmtId());
return; // No need to call observer, always allow admins.
}
- if (isSelf(connection)) { // Reject self connections
- QPID_LOG(debug, logPrefix << "Rejected self connection "+connection.getMgmtId());
- connection.abort();
- return;
- }
ObserverPtr o(getObserver());
if (o) o->opened(connection);
}
@@ -94,5 +109,6 @@ void ConnectionObserver::closed(broker::Connection& connection) {
const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin";
const std::string ConnectionObserver::BACKUP_TAG="qpid.ha-backup";
+const std::string ConnectionObserver::ADDRESS_TAG="qpid.ha-address";
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ConnectionObserver.h b/cpp/src/qpid/ha/ConnectionObserver.h
index 5374660dbe..85510ab459 100644
--- a/cpp/src/qpid/ha/ConnectionObserver.h
+++ b/cpp/src/qpid/ha/ConnectionObserver.h
@@ -29,6 +29,8 @@
#include "boost/shared_ptr.hpp"
namespace qpid {
+class Address;
+
namespace ha {
class BrokerInfo;
class HaBroker;
@@ -50,8 +52,10 @@ class ConnectionObserver : public broker::ConnectionObserver
static const std::string ADMIN_TAG;
static const std::string BACKUP_TAG;
+ static const std::string ADDRESS_TAG;
- static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo& info);
+ static bool getBrokerInfo(const broker::Connection& connection, BrokerInfo&);
+ static bool getAddress(const broker::Connection& connection, Address&);
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index 590db7efa5..b5258dc84a 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -84,15 +84,7 @@ bool isNone(const std::string& x) { return x.empty() || x == NONE; }
// Called in Plugin::initialize
void HaBroker::initialize() {
- // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
- membership.add(
- BrokerInfo(
- membership.getSelf(),
- settings.cluster ? JOINING : membership.getStatus(),
- broker.getSystem()->getNodeName(),
- broker.getPort(broker::Broker::TCP_TRANSPORT)
- )
- );
+ if (settings.cluster) membership.setStatus(JOINING);
QPID_LOG(notice, "Initializing: " << membership.getInfo());
// Set up the management object.
@@ -207,4 +199,11 @@ BrokerStatus HaBroker::getStatus() const {
return membership.getStatus();
}
+void HaBroker::setAddress(const Address& a) {
+ QPID_LOG(info, role->getLogPrefix() << "Set self address to: " << a);
+ BrokerInfo b(membership.getSelf(), membership.getStatus(), a.host, a.port);
+ membership.add(b);
+}
+
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
index 6b15c88e0a..1b3666362a 100644
--- a/cpp/src/qpid/ha/HaBroker.h
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -90,6 +90,8 @@ class HaBroker : public management::Manageable
Membership& getMembership() { return membership; }
types::Uuid getSystemId() const { return systemId; }
+ void setAddress(const Address&); // set self address from a self-connection
+
private:
void setPublicUrl(const Url&);
diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp
index 2921b9ec55..e56efc5873 100644
--- a/cpp/src/qpid/ha/StatusCheck.cpp
+++ b/cpp/src/qpid/ha/StatusCheck.cpp
@@ -19,6 +19,7 @@
*
*/
#include "StatusCheck.h"
+#include "ConnectionObserver.h"
#include "qpid/log/Statement.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Connection.h"
@@ -55,7 +56,9 @@ void StatusCheckThread::run() {
try {
Variant::Map options, clientProperties;
clientProperties = brokerInfo.asMap(); // Detect self connections.
- clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups.
+ clientProperties[ConnectionObserver::ADMIN_TAG] = 1; // Allow connection to backups.
+ clientProperties[ConnectionObserver::ADDRESS_TAG] = url.str();
+ clientProperties[ConnectionObserver::BACKUP_TAG] = brokerInfo.asMap();
options["client-properties"] = clientProperties;
options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC;
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index e7be2a246d..8ee27b5519 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -545,26 +545,25 @@ class ReplicationTests(HaBrokerTest):
"""Check that broker information is correctly published via management"""
cluster = HaCluster(self, 3)
+ def ha_broker(broker):
+ ha_broker = broker.agent().getHaBroker();
+ ha_broker.update()
+ return ha_broker
+
for broker in cluster: # Make sure HA system-id matches broker's
- qmf = broker.agent().getHaBroker()
- self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
-
- cluster_ports = map(lambda b: b.port(), cluster)
- cluster_ports.sort()
- def ports(qmf):
- qmf.update()
- return sorted(map(lambda b: b["port"], qmf.members))
+ self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent().getBroker().systemRef))
+
# Check that all brokers have the same membership as the cluster
- for broker in cluster:
- qmf = broker.agent().getHaBroker()
- assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
+ def check_ids(broker):
+ cluster_ids = set([ ha_broker(b).systemId for b in cluster])
+ broker_ids = set([m["system-id"] for m in ha_broker(broker).members])
+ assert retry(lambda: cluster_ids == broker_ids, 1), "%s != %s on %s"%(cluster_ids, broker_ids, broker)
+
+ for broker in cluster: check_ids(broker)
+
# Add a new broker, check it is updated everywhere
b = cluster.start()
- cluster_ports.append(b.port())
- cluster_ports.sort()
- for broker in cluster:
- qmf = broker.agent().getHaBroker()
- assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+ for broker in cluster: check_ids(broker)
def test_auth(self):
"""Verify that authentication does not interfere with replication."""