summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-08 15:24:18 +0000
committerAlan Conway <aconway@apache.org>2012-06-08 15:24:18 +0000
commitba3b6c53f4072744aecbac429c8eab66631d84c6 (patch)
tree9786da704b6f20b2f8c47e05ad1fe119d02069d3 /qpid/cpp/src
parentbb13e5e60b83bc44d436d4fdf41cb56af4da7c81 (diff)
downloadqpid-python-ba3b6c53f4072744aecbac429c8eab66631d84c6.tar.gz
QPID-3603: HA primary sends membership updates to backup brokers.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1348113 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/ha.mk2
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp44
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h20
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp34
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h1
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp46
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h13
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp28
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h5
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp89
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h65
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml15
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py49
15 files changed, 353 insertions, 62 deletions
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index ae7b803b87..cab7d8c42b 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -39,6 +39,8 @@ ha_la_SOURCES = \
qpid/ha/HaPlugin.cpp \
qpid/ha/LogPrefix.cpp \
qpid/ha/LogPrefix.h \
+ qpid/ha/Membership.cpp \
+ qpid/ha/Membership.h \
qpid/ha/Primary.cpp \
qpid/ha/Primary.h \
qpid/ha/QueueReplicator.cpp \
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 4b8a9081a1..158bdd927d 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -75,7 +75,7 @@ void Backup::initialize(const Url& brokers) {
sys::Mutex::ScopedLock l(lock);
Url url = linkUrl(brokers);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
- framing::Uuid uuid(true);
+ types::Uuid uuid(true);
// Declare the link
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index 2422bcd3e2..2673646646 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -20,8 +20,11 @@
*/
#include "BrokerInfo.h"
+#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
#include <iostream>
@@ -35,23 +38,46 @@ std::string PORT="port";
std::string STATUS="status";
}
-using framing::Uuid;
+using types::Uuid;
+using types::Variant;
using framing::FieldTable;
FieldTable BrokerInfo::asFieldTable() const {
+ Variant::Map m = asMap();
FieldTable ft;
- ft.setString(SYSTEM_ID, systemId.str());
- ft.setString(HOST_NAME, hostName);
- ft.setInt(PORT, port);
- ft.setInt(STATUS, status);
+ amqp_0_10::translate(m, ft);
return ft;
}
+Variant::Map BrokerInfo::asMap() const {
+ Variant::Map m;
+ m[SYSTEM_ID] = systemId;
+ m[HOST_NAME] = hostName;
+ m[PORT] = port;
+ m[STATUS] = status;
+ return m;
+}
+
void BrokerInfo::assign(const FieldTable& ft) {
- systemId = Uuid(ft.getAsString(SYSTEM_ID));
- hostName = ft.getAsString(HOST_NAME);
- port = ft.getAsInt(PORT);
- status = BrokerStatus(ft.getAsInt(STATUS));
+ Variant::Map m;
+ amqp_0_10::translate(ft, m);
+ assign(m);
+}
+
+namespace {
+const Variant& get(const Variant::Map& m, const std::string& k) {
+ Variant::Map::const_iterator i = m.find(k);
+ if (i == m.end()) throw Exception(
+ QPID_MSG("Missing field '" << k << "' in broker information"));
+ return i->second;
+}
+}
+
+void BrokerInfo::assign(const Variant::Map& m) {
+ systemId = get(m, SYSTEM_ID).asUuid();
+ hostName = get(m, HOST_NAME).asString();
+ port = get(m, PORT).asUint16();
+ status = BrokerStatus(get(m, STATUS).asUint8());
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
index 7ccbd056c3..b0864e0402 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.h
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -24,8 +24,9 @@
#include "Enum.h"
#include "qpid/Url.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/types/Variant.h"
#include <string>
#include <iosfwd>
@@ -38,24 +39,29 @@ namespace ha {
class BrokerInfo
{
public:
- BrokerInfo(const std::string& host, uint16_t port_, const framing::Uuid& id) :
+ BrokerInfo() {}
+ BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
hostName(host), port(port_), systemId(id) {}
-
BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
- framing::FieldTable asFieldTable() const;
- void assign(const framing::FieldTable&);
+ BrokerInfo(const types::Variant::Map& m) { assign(m); }
- framing::Uuid getSystemId() const { return systemId; }
+ types::Uuid getSystemId() const { return systemId; }
std::string getHostName() const { return hostName; }
BrokerStatus getStatus() const { return status; }
uint16_t getPort() const { return port; }
void setStatus(BrokerStatus s) { status = s; }
+ framing::FieldTable asFieldTable() const;
+ types::Variant::Map asMap() const;
+
+ void assign(const framing::FieldTable&);
+ void assign(const types::Variant::Map&);
+
private:
std::string hostName;
uint16_t port;
- framing::Uuid systemId;
+ types::Uuid systemId;
BrokerStatus status;
};
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 2415aff84a..7679078c40 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -13,7 +13,7 @@
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
+* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
@@ -37,6 +37,7 @@
#include "qmf/org/apache/qpid/broker/EventQueueDeclare.h"
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include <algorithm>
#include <sstream>
#include <assert.h>
@@ -51,6 +52,7 @@ using qmf::org::apache::qpid::broker::EventExchangeDelete;
using qmf::org::apache::qpid::broker::EventQueueDeclare;
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
+using qmf::org::apache::qpid::ha::EventMembersUpdate;
using namespace framing;
using std::string;
using types::Variant;
@@ -93,7 +95,8 @@ const string TYPE("type");
const string USER("user");
const string HA_BROKER("habroker");
-const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
+const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
const string QMF2("qmf2");
const string QMF_CONTENT("qmf.content");
const string QMF_DEFAULT_TOPIC("qmf.default.topic");
@@ -109,6 +112,7 @@ const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
const string QMF_DEFAULT_DIRECT("qmf.default.direct");
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
+const string MEMBERS("members");
bool isQMFv2(const Message& message) {
const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
@@ -169,7 +173,7 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
logPrefix(hb),
haBroker(hb), broker(hb.getBroker()), link(l)
{
- framing::Uuid uuid(true);
+ types::Uuid uuid(true);
const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
broker.getLinks().declare(
name, // name for bridge
@@ -221,7 +225,8 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
FieldTable declareArgs;
declareArgs.setString(QPID_REPLICATE, printable(NONE).str());
peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs);
- peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable());
//subscribe to the queue
peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
@@ -256,6 +261,7 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
else if (match<EventBind>(schema)) doEventBind(values);
else if (match<EventUnbind>(schema)) doEventUnbind(values);
+ else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
}
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -423,6 +429,11 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
}
}
+void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
+ Variant::List members = values[MEMBERS].asList();
+ haBroker.getMembership().assign(members);
+}
+
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
if (!isReplicated(values[ARGUMENTS].asMap(),
@@ -517,15 +528,14 @@ const string REPLICATE_DEFAULT="replicateDefault";
void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
- ReplicateLevel primary = haBroker.replicateLevel(values[REPLICATE_DEFAULT].asString());
- if (mine != primary) {
- QPID_LOG(critical, logPrefix << "Replicate default on backup (" << mine
- << ") does not match primary (" << primary << ")");
- haBroker.shutdown();
- }
+ ReplicateLevel primary = haBroker.replicateLevel(
+ values[REPLICATE_DEFAULT].asString());
+ if (mine != primary)
+ throw Exception(QPID_MSG("Replicate default on backup (" << mine
+ << ") does not match primary (" << primary << ")"));
+ haBroker.getMembership().assign(values[MEMBERS].asList());
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Invalid replicate default from primary: "
- << e.what());
+ QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what());
haBroker.shutdown();
}
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index d2fd23e63d..57867587a9 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -81,6 +81,7 @@ class BrokerReplicator : public broker::Exchange
void doEventExchangeDelete(types::Variant::Map& values);
void doEventBind(types::Variant::Map&);
void doEventUnbind(types::Variant::Map&);
+ void doEventMembersUpdate(types::Variant::Map&);
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
index 18b4432aa1..9294f38ef3 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -21,6 +21,7 @@
#include "ConnectionExcluder.h"
#include "BrokerInfo.h"
+#include "HaBroker.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/broker/Connection.h"
#include <boost/function.hpp>
@@ -29,8 +30,19 @@
namespace qpid {
namespace ha {
-ConnectionExcluder::ConnectionExcluder(const LogPrefix& lp, const framing::Uuid& uuid)
- : logPrefix(lp), backupAllowed(false), self(uuid) {}
+ConnectionExcluder::ConnectionExcluder(HaBroker& hb, const types::Uuid& uuid)
+ : haBroker(hb), logPrefix(hb), self(uuid) {}
+
+namespace {
+bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+ framing::FieldTable ft;
+ if (connection.getClientProperties().getTable(ConnectionExcluder::BACKUP_TAG, ft)) {
+ info = BrokerInfo(ft);
+ return true;
+ }
+ return false;
+}
+}
void ConnectionExcluder::opened(broker::Connection& connection) {
if (connection.isLink()) return; // Allow all outgoing links
@@ -39,23 +51,37 @@ void ConnectionExcluder::opened(broker::Connection& connection) {
<< connection.getMgmtId());
return;
}
- framing::FieldTable ft;
- if (connection.getClientProperties().getTable(BACKUP_TAG, ft)) {
- BrokerInfo info(ft);
+ BrokerStatus status = haBroker.getStatus();
+ if (isBackup(status)) reject(connection);
+ BrokerInfo info; // Get info about a connecting backup.
+ if (getBrokerInfo(connection, info)) {
if (info.getSystemId() == self) {
- QPID_LOG(debug, logPrefix << "Self connection rejected");
+ QPID_LOG(debug, logPrefix << "Rejected self connection");
+ reject(connection);
}
else {
- QPID_LOG(debug, logPrefix << "Backup connection " << info <<
- (backupAllowed ? " allowed" : " rejected"));
- if (backupAllowed) return;
+ QPID_LOG(debug, logPrefix << "Allowed backup connection " << info);
+ haBroker.getMembership().add(info);
+ return;
}
}
- // Abort the connection.
+ else // This is a client connection.
+ if (status == RECOVERING) reject(connection); // FIXME aconway 2012-05-29: allow clients in recovery
+}
+
+void ConnectionExcluder::reject(broker::Connection& connection) {
throw Exception(
QPID_MSG(logPrefix << "Rejected connection " << connection.getMgmtId()));
}
+void ConnectionExcluder::closed(broker::Connection& connection) {
+ BrokerInfo info;
+ BrokerStatus status = haBroker.getStatus();
+ if (isBackup(status)) return; // Don't mess with the map received from primary.
+ if (getBrokerInfo(connection, info))
+ haBroker.getMembership().remove(info.getSystemId());
+}
+
const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";
const std::string ConnectionExcluder::BACKUP_TAG="qpid.ha-backup";
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
index 042544c333..629fda7519 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -25,6 +25,7 @@
#include "LogPrefix.h"
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
namespace qpid {
@@ -46,17 +47,19 @@ class ConnectionExcluder : public broker::ConnectionObserver
static const std::string ADMIN_TAG;
static const std::string BACKUP_TAG;
- ConnectionExcluder(const LogPrefix&, const framing::Uuid& self);
+ ConnectionExcluder(HaBroker&, const types::Uuid& self);
void opened(broker::Connection& connection);
+ void closed(broker::Connection& connection);
- void setBackupAllowed(bool set) { backupAllowed = set; }
- bool isBackupAllowed() const { return backupAllowed; }
+ void setStatus(BrokerStatus);
private:
+ void reject(broker::Connection&);
+
+ HaBroker& haBroker;
LogPrefix logPrefix;
- bool backupAllowed;
- framing::Uuid self;
+ types::Uuid self;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 9b58bac484..021e4d559e 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -33,11 +33,14 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/management/ManagementAgent.h"
#include "qpid/sys/SystemInfo.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/framing/Uuid.h"
#include "qmf/org/apache/qpid/ha/Package.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include "qpid/log/Statement.h"
namespace qpid {
@@ -50,14 +53,15 @@ using namespace std;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: logPrefix(status),
broker(b),
+ systemId(broker.getSystem()->getSystemId().data()),
settings(s),
mgmtObject(0),
status(STANDALONE),
- excluder(new ConnectionExcluder(logPrefix, broker.getSystem()->getSystemId())),
+ excluder(new ConnectionExcluder(*this, systemId)),
brokerInfo(broker.getSystem()->getNodeName(),
// TODO aconway 2012-05-24: other transports?
- broker.getPort(broker::Broker::TCP_TRANSPORT),
- broker.getSystem()->getSystemId())
+ broker.getPort(broker::Broker::TCP_TRANSPORT), systemId),
+ membership(logPrefix, boost::bind(&HaBroker::membershipUpdate, this, _1))
{
// Set up the management object.
@@ -67,6 +71,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
_qmf::Package packageInit(ma);
mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
mgmtObject->set_replicateDefault(settings.replicateDefault.str());
+ mgmtObject->set_systemId(systemId);
ma->addObject(mgmtObject);
// Register a factory for replicating subscriptions.
@@ -92,11 +97,14 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
QPID_LOG(notice, logPrefix << "Broker starting on " << brokerInfo);
}
-HaBroker::~HaBroker() {}
+HaBroker::~HaBroker() {
+ broker.getConnectionObservers().remove(excluder);
+}
void HaBroker::recover(sys::Mutex::ScopedLock&) {
setStatus(RECOVERING);
backup.reset(); // No longer replicating, close link.
+ membership.reset(brokerInfo);
primary.reset(new Primary(*this)); // Starts primary-ready check.
}
@@ -107,9 +115,11 @@ void HaBroker::activate() {
}
void HaBroker::activate(sys::Mutex::ScopedLock&) {
+ BrokerStatus oldStatus = status;
setStatus(ACTIVE);
+ if (oldStatus != RECOVERING) // Already set membership
+ membership.reset(brokerInfo);
backup.reset(); // No longer replicating, close link.
- broker.getConnectionObservers().remove(excluder); // This allows client connections.
}
ReplicateLevel HaBroker::replicateLevel(const std::string& str) {
@@ -173,7 +183,7 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
- framing::Uuid uuid(true);
+ types::Uuid uuid(true);
std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
url[0].host, url[0].port, protocol,
@@ -282,6 +292,12 @@ void HaBroker::statusChanged(sys::Mutex::ScopedLock& l) {
setLinkProperties(l);
}
+void HaBroker::membershipUpdate(const types::Variant::List& brokers) {
+ QPID_LOG(debug, logPrefix << "Membership update: " << brokers);
+ mgmtObject->set_members(brokers);
+ broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
+}
+
void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) {
framing::FieldTable linkProperties = broker.getLinkClientProperties();
if (isBackup(status)) {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 017ccefd27..224a0923c5 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -23,6 +23,7 @@
*/
#include "BrokerInfo.h"
+#include "Membership.h"
#include "Enum.h"
#include "LogPrefix.h"
#include "Settings.h"
@@ -91,6 +92,8 @@ class HaBroker : public management::Manageable
boost::shared_ptr<ConnectionExcluder> getExcluder() { return excluder; }
const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
+ Membership& getMembership() { return membership; }
+ void membershipUpdate(const types::Variant::List&);
private:
void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
@@ -110,6 +113,7 @@ class HaBroker : public management::Manageable
LogPrefix logPrefix;
broker::Broker& broker;
+ types::Uuid systemId;
const Settings settings;
mutable sys::Mutex lock;
@@ -123,6 +127,7 @@ class HaBroker : public management::Manageable
QueueNames activeBackups;
boost::shared_ptr<ConnectionExcluder> excluder;
BrokerInfo brokerInfo;
+ Membership membership;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
new file mode 100644
index 0000000000..7d22a019d5
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Membership.h"
+
+namespace qpid {
+namespace ha {
+
+
+void Membership::reset(const BrokerInfo& b) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ brokers.clear();
+ brokers[b.getSystemId()] = b;
+ }
+ update();
+}
+
+void Membership::add(const BrokerInfo& b) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ brokers[b.getSystemId()] = b;
+ }
+ update();
+}
+
+
+void Membership::remove(const types::Uuid& id) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ BrokerMap::iterator i = brokers.find(id);
+ if (i != brokers.end())
+ brokers.erase(i);
+ }
+ update();
+}
+
+bool Membership::contains(const types::Uuid& id) {
+ sys::Mutex::ScopedLock l(lock);
+ return brokers.find(id) != brokers.end();
+}
+
+void Membership::assign(const types::Variant::List& list) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ brokers.clear();
+ for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ BrokerInfo b(i->asMap());
+ brokers[b.getSystemId()] = b;
+ }
+ }
+ update();
+}
+
+types::Variant::List Membership::asList() const {
+ sys::Mutex::ScopedLock l(lock);
+ types::Variant::List list;
+ for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+ list.push_back(i->second.asMap());
+ return list;
+}
+
+void Membership::update() {
+ if (updateCallback) {
+ types::Variant::List list;
+ for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+ list.push_back(i->second.asMap());
+ updateCallback(list);
+ }
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
new file mode 100644
index 0000000000..8af03e0f40
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -0,0 +1,65 @@
+#ifndef QPID_HA_MEMBERSHIP_H
+#define QPID_HA_MEMBERSHIP_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerInfo.h"
+#include "LogPrefix.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/log/Statement.h"
+#include "qpid/types/Variant.h"
+#include <boost/function.hpp>
+#include <set>
+#include <vector>
+namespace qpid {
+namespace ha {
+
+/**
+ * Keep track of the brokers in the membership.
+ * THREAD SAFE: updated in arbitrary connection threads.
+ */
+class Membership
+{
+ public:
+ Membership(LogPrefix lp, boost::function<void(const types::Variant::List&)> updateFn)
+ : logPrefix(lp), updateCallback(updateFn) {}
+
+ void reset(const BrokerInfo& b); ///< Reset to contain just one member.
+ void add(const BrokerInfo& b);
+ void remove(const types::Uuid& id);
+ bool contains(const types::Uuid& id);
+
+ void assign(const types::Variant::List&);
+ types::Variant::List asList() const;
+
+ private:
+ typedef std::map<types::Uuid, BrokerInfo> BrokerMap;
+ void update();
+
+ mutable sys::Mutex lock;
+ LogPrefix logPrefix;
+ BrokerMap brokers;
+ boost::function<void(const types::Variant::List&)> updateCallback;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_MEMBERSHIP_H*/
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 0a577d08e4..ae48e48c6f 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -57,8 +57,6 @@ Primary::Primary(HaBroker& b) :
else {
QPID_LOG(debug, logPrefix << "Waiting for " << expected
<< " backups on " << queues.size() << " queues");
- // Allow backups to connect.
- haBroker.getExcluder()->setBackupAllowed(true);
}
}
}
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index 681ace370f..ce14032694 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -34,9 +34,12 @@
<property name="expectedBackups" type="uint16"
desc="Number of HA backup brokers expected."/>
- <property
- name="replicateDefault" type="sstr"
- desc="Replicate value for queues/exchanges without a qpid.replicate argument"/>
+ <property name="replicateDefault" type="sstr"
+ desc="Replication for queues/exchanges with no qpid.replicate argument"/>
+
+ <property name="members" type="list" desc="List of brokers in the cluster"/>
+
+ <property name="systemId" type="uuid" desc="Identifies the system."/>
<method name="promote" desc="Promote a backup broker to primary."/>
@@ -58,4 +61,10 @@
</method>
</class>
+ <eventArguments>
+ <arg name="members" type="list" desc="List of broker information maps"/>
+ </eventArguments>
+
+ <event name="membersUpdate" sev="inform" args="members"/>
+
</schema>
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 88fb8855ba..6e270851f0 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -26,19 +26,24 @@ from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG
from qpidtoollibs import BrokerAgent
+from uuid import UUID
log = getLogger(__name__)
-class QmfHaBroker(object):
+class QmfAgent(object):
+ """Access to a QMF broker agent."""
def __init__(self, address):
- self.connection = Connection.establish(
+ self._connection = Connection.establish(
address, client_properties={"qpid.ha-admin":1})
- self.qmf = BrokerAgent(self.connection)
- self.ha_broker = self.qmf.getHaBroker()
- if not self.ha_broker:
- raise Exception("HA module is not loaded on broker at %s"%address)
+ self._agent = BrokerAgent(self._connection)
+ assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
+
+ def __getattr__(self, name):
+ a = getattr(self._agent, name)
+ return a
class HaBroker(Broker):
+ """Start a broker with HA enabled"""
def __init__(self, test, args=[], brokers_url=None, ha_cluster=True,
ha_replicate="all", **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
@@ -58,6 +63,7 @@ class HaBroker(Broker):
assert os.path.exists(self.qpid_config_path)
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
self.qpid_ha_script=import_script(self.qpid_ha_path)
+ self._agent = None
def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args)
@@ -65,7 +71,11 @@ class HaBroker(Broker):
def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
- def ha_status(self): QmfHaBroker(self.host_port()).ha_broker.status
+ def agent(self):
+ if not self._agent: self._agent = QmfAgent(self.host_port())
+ return self._agent
+
+ def ha_status(self): self.agent().getHaBroker().status
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -641,6 +651,31 @@ class ReplicationTests(BrokerTest):
self.failIf(i < 0)
self.assertEqual(log.find("caught up", i), -1)
+ def test_broker_info(self):
+ """Check that broker information is correctly published via management"""
+ cluster = HaCluster(self, 3)
+
+ for broker in cluster: # Make sure HA system-id matches broker's
+ qmf = broker.agent().getHaBroker()
+ self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
+
+ cluster_ports = map(lambda b: b.port(), cluster)
+ cluster_ports.sort()
+ def ports(qmf):
+ qmf.update()
+ return sorted(map(lambda b: b["port"], qmf.members))
+ # Check that all brokers have the same membership as the cluster
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+ # Add a new broker, check it is updated everywhere
+ b = cluster.start()
+ cluster_ports.append(b.port())
+ cluster_ports.sort()
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit