summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt23
-rw-r--r--qpid/cpp/src/ha.mk23
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h4
-rw-r--r--qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h (renamed from qpid/cpp/src/qpid/ha/LogPrefix.cpp)40
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.h9
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp47
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h7
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp7
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h7
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp75
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.h72
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp104
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h37
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp70
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h19
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp55
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h24
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h76
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp99
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h101
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h11
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp119
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h56
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.cpp70
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicationTest.h (renamed from qpid/cpp/src/qpid/ha/LogPrefix.h)47
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h5
-rw-r--r--qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp90
-rw-r--r--qpid/cpp/src/qpid/ha/UnreadyQueueSet.h88
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml7
-rw-r--r--qpid/cpp/src/qpid/ha/types.cpp (renamed from qpid/cpp/src/qpid/ha/Enum.cpp)27
-rw-r--r--qpid/cpp/src/qpid/ha/types.h (renamed from qpid/cpp/src/qpid/ha/Enum.h)12
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py102
37 files changed, 1100 insertions, 458 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index aee5fb6213..b83e8d9c6d 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -626,31 +626,38 @@ set (ha_default ON)
option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default})
if (BUILD_HA)
set (ha_SOURCES
+ qpid/ha/BackupConnectionExcluder.h
+ qpid/ha/BrokerInfo.cpp
+ qpid/ha/BrokerInfo.h
+ qpid/ha/HeldQueue.h
+ qpid/ha/QueueGuard.cpp
+ qpid/ha/QueueGuard.h
+ qpid/ha/ReplicationTest.cpp
+ qpid/ha/ReplicationTest.h
qpid/ha/Backup.cpp
qpid/ha/Backup.h
- qpid/ha/BrokerInfo.h
- qpid/ha/BrokerInfo.cpp
qpid/ha/BrokerReplicator.cpp
qpid/ha/BrokerReplicator.h
- qpid/ha/ConnectionExcluder.cpp
- qpid/ha/ConnectionExcluder.h
+ qpid/ha/ConnectionObserver.cpp
+ qpid/ha/ConnectionObserver.h
qpid/ha/Counter.h
- qpid/ha/Enum.cpp
- qpid/ha/Enum.h
qpid/ha/HaBroker.cpp
qpid/ha/HaBroker.h
qpid/ha/HaPlugin.cpp
- qpid/ha/LogPrefix.cpp
- qpid/ha/LogPrefix.h
qpid/ha/Membership.cpp
qpid/ha/Membership.h
qpid/ha/Primary.cpp
qpid/ha/Primary.h
+ qpid/ha/PrimaryConnectionMonitor.h
qpid/ha/QueueReplicator.cpp
qpid/ha/QueueReplicator.h
qpid/ha/ReplicatingSubscription.cpp
qpid/ha/ReplicatingSubscription.h
qpid/ha/Settings.h
+ qpid/ha/Types.cpp
+ qpid/ha/Types.h
+ qpid/ha/UnreadyQueueSet.cpp
+ qpid/ha/UnreadyQueueSet.h
)
add_library (ha MODULE ${ha_SOURCES})
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index cab7d8c42b..dae924b6ea 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -25,29 +25,36 @@ dmoduleexec_LTLIBRARIES += ha.la
ha_la_SOURCES = \
qpid/ha/Backup.cpp \
qpid/ha/Backup.h \
- qpid/ha/BrokerInfo.h \
+ qpid/ha/BackupConnectionExcluder.h \
qpid/ha/BrokerInfo.cpp \
+ qpid/ha/BrokerInfo.h \
qpid/ha/BrokerReplicator.cpp \
qpid/ha/BrokerReplicator.h \
- qpid/ha/ConnectionExcluder.cpp \
- qpid/ha/ConnectionExcluder.h \
+ qpid/ha/ConnectionObserver.cpp \
+ qpid/ha/ConnectionObserver.h \
qpid/ha/Counter.h \
- qpid/ha/Enum.cpp \
- qpid/ha/Enum.h \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
- qpid/ha/LogPrefix.cpp \
- qpid/ha/LogPrefix.h \
+ qpid/ha/HeldQueue.h \
qpid/ha/Membership.cpp \
qpid/ha/Membership.h \
qpid/ha/Primary.cpp \
qpid/ha/Primary.h \
+ qpid/ha/PrimaryConnectionMonitor. \
+ qpid/ha/QueueGuard.cpp \
+ qpid/ha/QueueGuard.h \
qpid/ha/QueueReplicator.cpp \
qpid/ha/QueueReplicator.h \
qpid/ha/ReplicatingSubscription.cpp \
qpid/ha/ReplicatingSubscription.h \
- qpid/ha/Settings.h
+ qpid/ha/ReplicationTest.cpp \
+ qpid/ha/ReplicationTest.h \
+ qpid/ha/Settings.h \
+ qpid/ha/UnreadyQueueSet.cpp \
+ qpid/ha/UnreadyQueueSet.h \
+ qpid/ha/types.cpp \
+ qpid/ha/types.h
ha_la_LIBADD = libqpidbroker.la
ha_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 44fb098e79..f2cc2a3454 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -45,7 +45,7 @@ using types::Variant;
using std::string;
Backup::Backup(HaBroker& hb, const Settings& s) :
- logPrefix(hb), haBroker(hb), broker(hb.getBroker()), settings(s)
+ logPrefix("HA backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
{
// Empty brokerUrl means delay initialization until seBrokertUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
@@ -64,14 +64,14 @@ Url Backup::linkUrl(const Url& brokers) const {
for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (!isSelf(*i)) url.push_back(*i);
if (url.empty()) throw Url::Invalid("HA Backup failover URL is empty");
- QPID_LOG(debug, logPrefix << "Backup failover URL (excluding self): " << url);
+ QPID_LOG(debug, logPrefix << " failover URL (excluding self): " << url);
return url;
*/
}
void Backup::initialize(const Url& brokers) {
if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(info, logPrefix << "Backup broker URL: " << brokers);
+ QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers);
sys::Mutex::ScopedLock l(lock);
Url url = linkUrl(brokers);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 92387ece60..ca3c2a02d0 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -22,7 +22,6 @@
*
*/
-#include "LogPrefix.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
@@ -57,7 +56,8 @@ class Backup
Url linkUrl(const Url&) const;
void initialize(const Url&);
- LogPrefix logPrefix;
+ std::string logPrefix;
+
sys::Mutex lock;
HaBroker& haBroker;
broker::Broker& broker;
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
index d80fe23458..9776fac016 100644
--- a/qpid/cpp/src/qpid/ha/LogPrefix.cpp
+++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h
@@ -1,3 +1,6 @@
+#ifndef QPID_HA_BACKUPCONNECTIONEXCLUDER_H
+#define QPID_HA_BACKUPCONNECTIONEXCLUDER_H
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -18,33 +21,26 @@
* under the License.
*
*/
-#include "LogPrefix.h"
-#include "HaBroker.h"
-#include <iostream>
+
+#include "qpid/broker/ConnectionObserver.h"
+#include "qpid/broker/Connection.h"
namespace qpid {
namespace ha {
-LogPrefix::LogPrefix(HaBroker& hb, const std::string& msg) : haBroker(&hb), status(0) {
- if (msg.size()) setMessage(msg);
-}
-
-LogPrefix::LogPrefix(LogPrefix& lp, const std::string& msg)
- : haBroker(lp.haBroker), status(0)
+/**
+ * Exclude connections to a backup broker.
+ */
+class BackupConnectionExcluder : public broker::ConnectionObserver
{
- if (msg.size()) setMessage(msg);
-}
+ public:
+ void opened(broker::Connection& connection) {
+ throw Exception("HA backup rejected connection "+connection.getMgmtId());
+ }
-LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {}
-
-void LogPrefix::setMessage(const std::string& msg) {
- tail = " "+msg+":";
-}
-
-std::ostream& operator<<(std::ostream& o, const LogPrefix& l) {
- return o << "HA("
- << printable(l.status ? *l.status : l.haBroker->getStatus())
- << ")" << l.tail << " ";
-}
+ void closed(broker::Connection&) {}
+};
}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BACKUPCONNECTIONEXCLUDER_H*/
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index 2673646646..0c5de9542f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -42,6 +42,9 @@ using types::Uuid;
using types::Variant;
using framing::FieldTable;
+BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
+ logId(id.str().substr(0,9)+"..."), hostName(host), port(port_), systemId(id) {}
+
FieldTable BrokerInfo::asFieldTable() const {
Variant::Map m = asMap();
FieldTable ft;
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h
index b0864e0402..55479df4b9 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.h
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h
@@ -22,7 +22,7 @@
*
*/
-#include "Enum.h"
+#include "types.h"
#include "qpid/Url.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/types/Uuid.h"
@@ -40,15 +40,15 @@ class BrokerInfo
{
public:
BrokerInfo() {}
- BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
- hostName(host), port(port_), systemId(id) {}
+ BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id);
BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
BrokerInfo(const types::Variant::Map& m) { assign(m); }
types::Uuid getSystemId() const { return systemId; }
std::string getHostName() const { return hostName; }
BrokerStatus getStatus() const { return status; }
- uint16_t getPort() const { return port; }
+ uint16_t getPort() const { return port; }
+ std::string getLogId() const { return logId; }
void setStatus(BrokerStatus s) { status = s; }
@@ -59,6 +59,7 @@ class BrokerInfo
void assign(const types::Variant::Map&);
private:
+ std::string logId;
std::string hostName;
uint16_t port;
types::Uuid systemId;
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 0173495a00..81667e0437 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -170,7 +170,7 @@ Variant::Map asMapVoid(const Variant& value) {
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix(hb),
+ logPrefix("HA backup: "), replicationTest(hb.getReplicationTest()),
haBroker(hb), broker(hb.getBroker()), link(l)
{}
@@ -291,10 +291,10 @@ void BrokerReplicator::route(Deliverable& msg) {
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
- if (!isReplicated(
+ if (!replicationTest.isReplicated(
values[ARGS].asMap(), values[AUTODEL].asBool(), values[EXCL].asBool()))
return;
- if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) {
+ if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
@@ -335,7 +335,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (!queue) {
QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name);
- } else if (!haBroker.replicateLevel(queue->getSettings())) {
+ } else if (!replicationTest.replicateLevel(queue->getSettings())) {
QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name);
} else {
boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
@@ -353,8 +353,8 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
- if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated exchange.
- if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) {
+ if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
+ if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
@@ -383,7 +383,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (!exchange) {
QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name);
- } else if (!haBroker.replicateLevel(exchange->getArgs())) {
+ } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
@@ -401,8 +401,8 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
broker.getQueues().find(values[QNAME].asString());
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && haBroker.replicateLevel(exchange->getArgs()) &&
- queue && haBroker.replicateLevel(queue->getSettings()))
+ if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
+ queue && replicationTest.replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
@@ -421,8 +421,8 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
broker.getQueues().find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && haBroker.replicateLevel(exchange->getArgs()) &&
- queue && haBroker.replicateLevel(queue->getSettings()))
+ if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
+ queue && replicationTest.replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
@@ -441,7 +441,7 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!isReplicated(values[ARGUMENTS].asMap(),
+ if (!replicationTest.isReplicated(values[ARGUMENTS].asMap(),
values[AUTODELETE].asBool(),
values[EXCLUSIVE].asBool()))
return;
@@ -465,7 +465,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!haBroker.replicateLevel(argsMap)) return;
+ if (!replicationTest.replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
if (broker.createExchange(
@@ -512,8 +512,8 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
// Automatically replicate binding if queue and exchange exist and are replicated
- if (exchange && haBroker.replicateLevel(exchange->getArgs()) &&
- queue && haBroker.replicateLevel(queue->getSettings()))
+ if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
+ queue && replicationTest.replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
@@ -533,7 +533,7 @@ const string REPLICATE_DEFAULT="replicateDefault";
void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
- ReplicateLevel primary = haBroker.replicateLevel(
+ ReplicateLevel primary = replicationTest.replicateLevel(
values[REPLICATE_DEFAULT].asString());
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
@@ -545,22 +545,11 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
}
}
-namespace {
-const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
-}
-
-bool BrokerReplicator::isReplicated(
- const Variant::Map& args, bool autodelete, bool exclusive)
-{
- bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
- return haBroker.replicateLevel(args) && !ignore;
-}
-
void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
{
- if (haBroker.replicateLevel(queue->getSettings()) == ALL) {
+ if (replicationTest.replicateLevel(queue->getSettings()) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
- new QueueReplicator(haBroker, queue, link));
+ new QueueReplicator(haBroker.getBrokerInfo(), queue, link));
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index f7466d6406..677de31370 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -23,8 +23,8 @@
*/
#include "Counter.h"
-#include "Enum.h"
-#include "LogPrefix.h"
+#include "types.h"
+#include "ReplicationTest.h"
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
#include <boost/shared_ptr.hpp>
@@ -97,7 +97,8 @@ class BrokerReplicator : public broker::Exchange,
bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive);
void ready();
- LogPrefix logPrefix;
+ std::string logPrefix;
+ ReplicationTest replicationTest;
HaBroker& haBroker;
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
index 9294f38ef3..61835b15d1 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -31,7 +31,7 @@ namespace qpid {
namespace ha {
ConnectionExcluder::ConnectionExcluder(HaBroker& hb, const types::Uuid& uuid)
- : haBroker(hb), logPrefix(hb), self(uuid) {}
+ : haBroker(hb), logPrefix("HA: "), self(uuid) {}
namespace {
bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
@@ -53,7 +53,7 @@ void ConnectionExcluder::opened(broker::Connection& connection) {
}
BrokerStatus status = haBroker.getStatus();
if (isBackup(status)) reject(connection);
- BrokerInfo info; // Get info about a connecting backup.
+ BrokerInfo info; // Avoid self connections.
if (getBrokerInfo(connection, info)) {
if (info.getSystemId() == self) {
QPID_LOG(debug, logPrefix << "Rejected self connection");
@@ -65,8 +65,7 @@ void ConnectionExcluder::opened(broker::Connection& connection) {
return;
}
}
- else // This is a client connection.
- if (status == RECOVERING) reject(connection); // FIXME aconway 2012-05-29: allow clients in recovery
+ // else: Primary node accepts connections.
}
void ConnectionExcluder::reject(broker::Connection& connection) {
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
index 629fda7519..c24a138e2c 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -22,9 +22,9 @@
*
*/
-#include "LogPrefix.h"
+#include "types.h"
#include "qpid/broker/ConnectionObserver.h"
-#include "qpid/framing/Uuid.h"
+#include "qpid/types/Uuid.h"
#include "qpid/sys/Mutex.h"
#include <boost/function.hpp>
@@ -35,6 +35,7 @@ class Connection;
}
namespace ha {
+class HaBroker;
/**
* Exclude normal connections to a backup broker.
@@ -58,7 +59,7 @@ class ConnectionExcluder : public broker::ConnectionObserver
void reject(broker::Connection&);
HaBroker& haBroker;
- LogPrefix logPrefix;
+ std::string logPrefix;
types::Uuid self;
};
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
new file mode 100644
index 0000000000..694a253fc3
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ConnectionObserver.h"
+#include "BrokerInfo.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace ha {
+
+ConnectionObserver::ConnectionObserver(const types::Uuid& uuid)
+ : logPrefix("HA: "), self(uuid) {}
+
+bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
+ framing::FieldTable ft;
+ if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
+ info = BrokerInfo(ft);
+ return true;
+ }
+ return false;
+}
+
+void ConnectionObserver::setObserver(const ObserverPtr& o){
+ sys::Mutex::ScopedLock l(lock);
+ observer = o;
+}
+
+ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
+ sys::Mutex::ScopedLock l(lock);
+ return observer;
+}
+
+void ConnectionObserver::opened(broker::Connection& connection) {
+ if (connection.isLink()) return; // Allow outgoing links.
+ if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+ QPID_LOG(debug, logPrefix << "Allowing admin connection: "
+ << connection.getMgmtId());
+ return; // No need to call observer, always allow admins.
+ }
+ BrokerInfo info; // Avoid self connections.
+ if (getBrokerInfo(connection, info) && info.getSystemId() == self)
+ throw Exception("HA rejected self connection");
+ ObserverPtr o(getObserver());
+ if (o) o->opened(connection);
+}
+
+void ConnectionObserver::closed(broker::Connection& connection) {
+ ObserverPtr o(getObserver());
+ if (o) o->closed(connection);
+}
+
+const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin";
+const std::string ConnectionObserver::BACKUP_TAG="qpid.ha-backup";
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
new file mode 100644
index 0000000000..a950f41739
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h
@@ -0,0 +1,72 @@
+#ifndef QPID_HA_CONNECTIONOBSERVER_H
+#define QPID_HA_CONNECTIONOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/ConnectionObserver.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "boost/shared_ptr.hpp"
+
+namespace qpid {
+namespace ha {
+class BrokerInfo;
+
+/**
+ * Observes connections, delegates to another ConnectionObserver for
+ * actions specific to primary or backup.
+ *
+ * THREAD SAFE: called in arbitrary connection threads.
+ *
+ * Main role of this class is to provide a continuous observer object
+ * on the connection so we can't lose observations between removing
+ * one observer and adding another.
+ */
+class ConnectionObserver : public broker::ConnectionObserver
+{
+ public:
+ typedef boost::shared_ptr<broker::ConnectionObserver> ObserverPtr;
+
+ static const std::string ADMIN_TAG;
+ static const std::string BACKUP_TAG;
+
+ static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info);
+
+ ConnectionObserver(const types::Uuid& self);
+
+ void setObserver(const ObserverPtr&);
+ ObserverPtr getObserver();
+
+ void opened(broker::Connection& connection);
+ void closed(broker::Connection& connection);
+
+ private:
+ sys::Mutex lock;
+ std::string logPrefix;
+ ObserverPtr observer;
+ types::Uuid self;
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_CONNECTIONOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 6095837cd6..046800791d 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -19,11 +19,13 @@
*
*/
#include "Backup.h"
-#include "ConnectionExcluder.h"
+#include "BackupConnectionExcluder.h"
+#include "ConnectionObserver.h"
#include "HaBroker.h"
#include "Primary.h"
-#include "Settings.h"
+#include "PrimaryConnectionMonitor.h"
#include "ReplicatingSubscription.h"
+#include "Settings.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
@@ -39,9 +41,9 @@
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
-#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include "qpid/log/Statement.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace ha {
@@ -49,20 +51,22 @@ namespace ha {
namespace _qmf = ::qmf::org::apache::qpid::ha;
using namespace management;
using namespace std;
+using types::Variant;
+using types::Uuid;
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : logPrefix(status),
+ : logPrefix("HA: "),
broker(b),
systemId(broker.getSystem()->getSystemId().data()),
settings(s),
mgmtObject(0),
status(STANDALONE),
- excluder(new ConnectionExcluder(*this, systemId)),
+ observer(new ConnectionObserver(systemId)),
brokerInfo(broker.getSystem()->getNodeName(),
// TODO aconway 2012-05-24: other transports?
broker.getPort(broker::Broker::TCP_TRANSPORT), systemId),
- membership(logPrefix, boost::bind(&HaBroker::membershipUpdate, this, _1))
-
+ membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1, _2)),
+ replicationTest(s.replicateDefault.get())
{
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
@@ -77,13 +81,15 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
boost::shared_ptr<ReplicatingSubscription::Factory>(
- new ReplicatingSubscription::Factory(*this)));
+ new ReplicatingSubscription::Factory()));
// If we are in a cluster, start as backup in joining state.
if (settings.cluster) {
status = JOINING;
+ observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>(
+ new BackupConnectionExcluder));
+ broker.getConnectionObservers().add(observer);
backup.reset(new Backup(*this, s));
- broker.getConnectionObservers().add(excluder);
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
}
@@ -99,57 +105,35 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
HaBroker::~HaBroker() {
QPID_LOG(debug, logPrefix << "Broker shut down: " << brokerInfo);
- broker.getConnectionObservers().remove(excluder);
+ broker.getConnectionObservers().remove(observer);
}
-void HaBroker::recover(sys::Mutex::ScopedLock&) {
+void HaBroker::recover(sys::Mutex::ScopedLock& l) {
setStatus(RECOVERING);
backup.reset(); // No longer replicating, close link.
+ IdSet backups = membership.otherBackups();
membership.reset(brokerInfo);
- primary.reset(new Primary(*this)); // Starts primary-ready check.
+ primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
+ observer->setObserver( // Allow connections
+ boost::shared_ptr<broker::ConnectionObserver>(
+ new PrimaryConnectionMonitor(*this)));
+ if (primary->isActive()) activate(l);
}
-// Called back from Primary ready check.
+// Called back from Primary active check.
void HaBroker::activate() {
sys::Mutex::ScopedLock l(lock);
activate(l);
}
-void HaBroker::activate(sys::Mutex::ScopedLock&) {
- BrokerStatus oldStatus = status;
- setStatus(ACTIVE);
- if (oldStatus != RECOVERING) // Already set membership
- membership.reset(brokerInfo);
- backup.reset(); // No longer replicating, close link.
-}
-
-ReplicateLevel HaBroker::replicateLevel(const std::string& str) {
- Enum<ReplicateLevel> rl;
- if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get());
- else return getSettings().replicateDefault.get();
-}
-
-ReplicateLevel HaBroker::replicateLevel(const framing::FieldTable& f) {
- if (f.isSet(QPID_REPLICATE))
- return replicateLevel(f.getAsString(QPID_REPLICATE));
- else
- return getSettings().replicateDefault.get();
-}
-
-ReplicateLevel HaBroker::replicateLevel(const types::Variant::Map& m) {
- types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
- if (i != m.end())
- return replicateLevel(i->second.asString());
- else
- return getSettings().replicateDefault.get();
-}
+void HaBroker::activate(sys::Mutex::ScopedLock&) { setStatus(ACTIVE); }
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
sys::Mutex::ScopedLock l(lock);
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
switch (status) {
- case JOINING: activate(l); break;
+ case JOINING: recover(l); break;
case CATCHUP:
// FIXME aconway 2012-04-27: don't allow promotion in catch-up
// QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
@@ -171,20 +155,16 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url), l);
break;
}
- case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
- setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
- break;
- }
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, logPrefix << "replicate individual queue "
+ QPID_LOG(debug, logPrefix << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
- types::Uuid uuid(true);
+ Uuid uuid(true);
std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
url[0].host, url[0].port, protocol,
@@ -193,7 +173,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
boost::shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(*this, queue, link));
+ boost::shared_ptr<QueueReplicator> qr(
+ new QueueReplicator(brokerInfo, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
break;
@@ -205,13 +186,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
-void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+void HaBroker::setClientUrl(const Url& url, sys::Mutex::ScopedLock& l) {
if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
clientUrl = url;
updateClientUrl(l);
}
-void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
+void HaBroker::updateClientUrl(sys::Mutex::ScopedLock&) {
Url url = clientUrl.empty() ? brokerUrl : clientUrl;
if (url.empty()) throw Url::Invalid("HA client URL is empty");
mgmtObject->set_publicUrl(url.str());
@@ -220,7 +201,7 @@ void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
}
-void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
+void HaBroker::setBrokerUrl(const Url& url, sys::Mutex::ScopedLock& l) {
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
brokerUrl = url;
mgmtObject->set_brokersUrl(brokerUrl.str());
@@ -229,11 +210,6 @@ void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
if (clientUrl.empty()) updateClientUrl(l);
}
-void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) {
- expectedBackups = n;
- mgmtObject->set_expectedBackups(n);
-}
-
std::vector<Url> HaBroker::getKnownBrokers() const {
return knownBrokers;
}
@@ -259,7 +235,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) {
static const BrokerStatus TRANSITIONS[][2] = {
{ CATCHUP, RECOVERING }, // FIXME aconway 2012-04-27: illegal transition, allow while fixing behavior
{ JOINING, CATCHUP }, // Connected to primary
- { JOINING, ACTIVE }, // Chosen as initial primary.
+ { JOINING, RECOVERING }, // Chosen as initial primary.
{ CATCHUP, READY }, // Caught up all queues, ready to take over.
{ READY, RECOVERING }, // Chosen as new primary
{ RECOVERING, ACTIVE }
@@ -293,10 +269,13 @@ void HaBroker::statusChanged(sys::Mutex::ScopedLock& l) {
setLinkProperties(l);
}
-void HaBroker::membershipUpdate(const types::Variant::List& brokers) {
+void HaBroker::membershipUpdate(const Variant::List& brokers, const IdSet& otherBackups)
+{
QPID_LOG(debug, logPrefix << "Membership update: " << brokers);
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
+ sys::Mutex::ScopedLock l(lock);
+ if (primary.get()) primary->getUnreadyQueueSet().setExpectedBackups(otherBackups);
}
void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) {
@@ -305,13 +284,13 @@ void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) {
// If this is a backup then any links we make are backup links
// and need to be tagged.
QPID_LOG(debug, logPrefix << "Backup setting info for outgoing links: " << brokerInfo);
- linkProperties.setTable(ConnectionExcluder::BACKUP_TAG, brokerInfo.asFieldTable());
+ linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
}
else {
// If this is a primary then any links are federation links
// and should not be tagged.
QPID_LOG(debug, logPrefix << "Primary removing backup info for outgoing links");
- linkProperties.erase(ConnectionExcluder::BACKUP_TAG);
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
}
broker.setLinkClientProperties(linkProperties);
}
@@ -326,7 +305,8 @@ void HaBroker::deactivatedBackup(const std::string& queue) {
activeBackups.erase(queue);
}
-std::set<std::string> HaBroker::getActiveBackups() const {
+// FIXME aconway 2012-05-31: strip out.
+HaBroker::QueueNames HaBroker::getActiveBackups() const {
sys::Mutex::ScopedLock l(lock);
return activeBackups;
}
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 224a0923c5..80e8a6cc3d 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -24,8 +24,8 @@
#include "BrokerInfo.h"
#include "Membership.h"
-#include "Enum.h"
-#include "LogPrefix.h"
+#include "types.h"
+#include "ReplicationTest.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
@@ -37,8 +37,14 @@
#include <boost/shared_ptr.hpp>
namespace qpid {
+
+namespace types {
+class Variant;
+}
+
namespace broker {
class Broker;
+class Queue;
}
namespace framing {
class FieldTable;
@@ -46,7 +52,7 @@ class FieldTable;
namespace ha {
class Backup;
-class ConnectionExcluder;
+class ConnectionObserver;
class Primary;
/**
@@ -76,11 +82,7 @@ class HaBroker : public management::Manageable
void activate();
Backup* getBackup() { return backup.get(); }
-
- // Translate replicate levels.
- ReplicateLevel replicateLevel(const std::string& str);
- ReplicateLevel replicateLevel(const framing::FieldTable& f);
- ReplicateLevel replicateLevel(const types::Variant::Map& m);
+ ReplicationTest getReplicationTest() const { return replicationTest; }
// Keep track of the set of actively replicated queues on a backup
// so that it can be transferred to the Primary on promotion.
@@ -89,19 +91,18 @@ class HaBroker : public management::Manageable
void deactivatedBackup(const std::string& queue);
QueueNames getActiveBackups() const;
- boost::shared_ptr<ConnectionExcluder> getExcluder() { return excluder; }
+ boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
Membership& getMembership() { return membership; }
- void membershipUpdate(const types::Variant::List&);
+ void membershipUpdate(const types::Variant::List&, const IdSet&);
private:
- void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
- void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
- void setExpectedBackups(size_t, const sys::Mutex::ScopedLock&);
- void updateClientUrl(const sys::Mutex::ScopedLock&);
+ void setClientUrl(const Url&, sys::Mutex::ScopedLock&);
+ void setBrokerUrl(const Url&, sys::Mutex::ScopedLock&);
+ void updateClientUrl(sys::Mutex::ScopedLock&);
- bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); }
+ bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); }
void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
void recover(sys::Mutex::ScopedLock&);
@@ -111,7 +112,7 @@ class HaBroker : public management::Manageable
std::vector<Url> getKnownBrokers() const;
- LogPrefix logPrefix;
+ std::string logPrefix;
broker::Broker& broker;
types::Uuid systemId;
const Settings settings;
@@ -122,12 +123,12 @@ class HaBroker : public management::Manageable
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
Url clientUrl, brokerUrl;
std::vector<Url> knownBrokers;
- size_t expectedBackups;
BrokerStatus status;
QueueNames activeBackups;
- boost::shared_ptr<ConnectionExcluder> excluder;
+ boost::shared_ptr<ConnectionObserver> observer;
BrokerInfo brokerInfo;
Membership membership;
+ ReplicationTest replicationTest;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index b6aa0d4a91..42758c4689 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -40,9 +40,6 @@ struct Options : public qpid::Options {
("ha-replicate",
optValue(settings.replicateDefault, "LEVEL"),
"Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'")
- // FIXME aconway 2012-04-30: required-backups? Also need timeout.
- ("ha-expected-backups", optValue(settings.expectedBackups, "N"),
- "Number of backups expected to be active in the HA cluster.")
("ha-username", optValue(settings.username, "USER"),
"Username for connections between HA brokers")
("ha-password", optValue(settings.password, "PASS"),
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index 7d22a019d5..6c6961f094 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -25,31 +25,26 @@ namespace ha {
void Membership::reset(const BrokerInfo& b) {
- {
- sys::Mutex::ScopedLock l(lock);
- brokers.clear();
- brokers[b.getSystemId()] = b;
- }
- update();
+ sys::Mutex::ScopedLock l(lock);
+ brokers.clear();
+ brokers[b.getSystemId()] = b;
+ update(l);
}
void Membership::add(const BrokerInfo& b) {
- {
- sys::Mutex::ScopedLock l(lock);
- brokers[b.getSystemId()] = b;
- }
- update();
+ sys::Mutex::ScopedLock l(lock);
+ brokers[b.getSystemId()] = b;
+ update(l);
}
void Membership::remove(const types::Uuid& id) {
- {
- sys::Mutex::ScopedLock l(lock);
- BrokerMap::iterator i = brokers.find(id);
- if (i != brokers.end())
- brokers.erase(i);
+ sys::Mutex::ScopedLock l(lock);
+ BrokerMap::iterator i = brokers.find(id);
+ if (i != brokers.end()) {
+ brokers.erase(i);
+ update(l);
}
- update();
}
bool Membership::contains(const types::Uuid& id) {
@@ -58,32 +53,47 @@ bool Membership::contains(const types::Uuid& id) {
}
void Membership::assign(const types::Variant::List& list) {
- {
- sys::Mutex::ScopedLock l(lock);
- brokers.clear();
- for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
- BrokerInfo b(i->asMap());
- brokers[b.getSystemId()] = b;
- }
+ sys::Mutex::ScopedLock l(lock);
+ brokers.clear();
+ for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ BrokerInfo b(i->asMap());
+ brokers[b.getSystemId()] = b;
}
- update();
+ update(l);
}
types::Variant::List Membership::asList() const {
sys::Mutex::ScopedLock l(lock);
+ return asList(l);
+}
+
+types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
types::Variant::List list;
for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
return list;
}
-void Membership::update() {
+void Membership::update(sys::Mutex::ScopedLock& l) {
if (updateCallback) {
- types::Variant::List list;
- for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
- list.push_back(i->second.asMap());
- updateCallback(list);
+ types::Variant::List list = asList(l);
+ IdSet ids = otherBackups(l);
+ sys::Mutex::ScopedUnlock u(lock);
+ updateCallback(list, ids);
}
}
+IdSet Membership::otherBackups() const {
+ sys::Mutex::ScopedLock l(lock);
+ return otherBackups(l);
+}
+
+IdSet Membership::otherBackups(sys::Mutex::ScopedLock&) const {
+ IdSet result;
+ for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+ if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self)
+ result.insert(i->second.getSystemId());
+ return result;
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
index 8af03e0f40..bdb55425dc 100644
--- a/qpid/cpp/src/qpid/ha/Membership.h
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -23,7 +23,7 @@
*/
#include "BrokerInfo.h"
-#include "LogPrefix.h"
+#include "types.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/types/Variant.h"
@@ -40,25 +40,32 @@ namespace ha {
class Membership
{
public:
- Membership(LogPrefix lp, boost::function<void(const types::Variant::List&)> updateFn)
- : logPrefix(lp), updateCallback(updateFn) {}
+ typedef boost::function<void (const types::Variant::List&,
+ const IdSet&) > UpdateCallback;
+
+ Membership(const types::Uuid& self_, UpdateCallback updateFn)
+ : self(self_), updateCallback(updateFn) {}
void reset(const BrokerInfo& b); ///< Reset to contain just one member.
void add(const BrokerInfo& b);
void remove(const types::Uuid& id);
bool contains(const types::Uuid& id);
+ /** Return IDs of all backups other than self */
+ IdSet otherBackups() const;
void assign(const types::Variant::List&);
types::Variant::List asList() const;
private:
typedef std::map<types::Uuid, BrokerInfo> BrokerMap;
- void update();
+ IdSet otherBackups(sys::Mutex::ScopedLock&) const;
+ types::Variant::List asList(sys::Mutex::ScopedLock&) const;
+ void update(sys::Mutex::ScopedLock&);
mutable sys::Mutex lock;
- LogPrefix logPrefix;
+ types::Uuid self;
BrokerMap brokers;
- boost::function<void(const types::Variant::List&)> updateCallback;
+ UpdateCallback updateCallback;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index ae48e48c6f..63cba14484 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -22,9 +22,11 @@
#include "ConnectionExcluder.h"
#include "HaBroker.h"
#include "Primary.h"
+#include "ReplicatingSubscription.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
namespace qpid {
@@ -32,52 +34,29 @@ namespace ha {
Primary* Primary::instance = 0;
-Primary::Primary(HaBroker& b) :
- haBroker(b), logPrefix(b),
- expected(b.getSettings().expectedBackups),
- unready(0), activated(false)
+Primary::Primary(HaBroker& hb, const IdSet& backups) :
+ haBroker(hb), logPrefix("HA primary: "),
+ unready(0), activated(false),
+ queues(hb.getBroker(), hb.getReplicationTest(), backups)
{
+ assert(instance == 0);
instance = this; // Let queue replicators find us.
- if (expected == 0) // No need for ready check
- activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor.
+ if (backups.empty()) {
+ QPID_LOG(debug, logPrefix << "Not waiting for backups");
+ activated = true;
+ }
else {
- // Set up the primary-ready check: ready when all queues have
- // expected number of backups. Note clients are excluded at this point
- // so dont't have to worry about changes to the set of queues.
- HaBroker::QueueNames names = haBroker.getActiveBackups();
- for (HaBroker::QueueNames::const_iterator i = names.begin(); i != names.end(); ++i)
- {
- queues[*i] = 0;
- ++unready;
- QPID_LOG(debug, logPrefix << "Need backup of " << *i
- << ", " << unready << " unready queues");
- }
- if (queues.empty())
- activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor.
- else {
- QPID_LOG(debug, logPrefix << "Waiting for " << expected
- << " backups on " << queues.size() << " queues");
- }
+ QPID_LOG(debug, logPrefix << "Waiting for backups: " << backups);
}
}
-void Primary::readyReplica(const std::string& q) {
+void Primary::readyReplica(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
- if (!activated) {
- QueueCounts::iterator i = queues.find(q);
- if (i != queues.end()) {
- ++i->second;
- if (i->second == expected) --unready;
- QPID_LOG(debug, logPrefix << i->second << " backups caught up on " << q
- << ", " << unready << " unready queues");
- if (unready == 0) activate(l);
- }
+ if (queues.ready(rs.getQueue(), rs.getBrokerInfo().getSystemId()) && !activated) {
+ activated = true;
+ haBroker.activate();
+ QPID_LOG(notice, logPrefix << "Activated, all initial queues are safe.");
}
}
-void Primary::activate(sys::Mutex::ScopedLock&) {
- activated = true;
- haBroker.activate();
-}
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 7e347fdbe2..3a1a9be9e8 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -22,7 +22,8 @@
*
*/
-#include "LogPrefix.h"
+#include "UnreadyQueueSet.h"
+#include "types.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
#include <map>
@@ -36,10 +37,11 @@ class Queue;
namespace ha {
class HaBroker;
+class ReplicatingSubscription;
/**
- * State associated with a primary broker. Tracks replicating
- * subscriptions to determine when primary is ready.
+ * State associated with a primary broker. Tracks replicating
+ * subscriptions to determine when primary is active.
*
* THREAD SAFE: readyReplica is called in arbitray threads.
*/
@@ -47,22 +49,22 @@ class Primary
{
public:
static Primary* get() { return instance; }
- Primary(HaBroker& b);
- void readyReplica(const std::string& q);
- void removeReplica(const std::string& q);
+ Primary(HaBroker& hb, const IdSet& expectedBackups);
- private:
- typedef std::map<std::string, size_t> QueueCounts;
+ void readyReplica(const ReplicatingSubscription&);
+ void removeReplica(const std::string& q);
- void activate(sys::Mutex::ScopedLock&);
+ UnreadyQueueSet& getUnreadyQueueSet() { return queues; }
+ bool isActive() { return activated; }
+ private:
sys::Mutex lock;
HaBroker& haBroker;
- LogPrefix logPrefix;
- QueueCounts queues;
+ std::string logPrefix;
size_t expected, unready;
bool activated;
+ UnreadyQueueSet queues;
static Primary* instance;
};
diff --git a/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h
new file mode 100644
index 0000000000..1aa61b2dea
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h
@@ -0,0 +1,76 @@
+#ifndef QPID_HA_PRIMARYCONNECTIONOBSERVER_H
+#define QPID_HA_PRIMARYCONNECTIONOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "ConnectionObserver.h"
+#include "qpid/broker/ConnectionObserver.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+
+namespace broker {
+class Connection;
+}
+
+namespace ha {
+class HaBroker;
+
+/**
+ * Monitor connections on a primary broker. Update membership and
+ * primary readiness.
+ *
+ * THREAD SAFE: has no state, just mediates between other thread-safe objects.
+ */
+class PrimaryConnectionMonitor : public broker::ConnectionObserver
+{
+ public:
+ PrimaryConnectionMonitor(HaBroker& hb) : haBroker(hb) {}
+
+ void opened(broker::Connection& connection) {
+ BrokerInfo info;
+ if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+ QPID_LOG(debug, "HA primary: Backup connected: " << info);
+ haBroker.getMembership().add(info);
+ // FIXME aconway 2012-06-01: changes to expected backup set for unready queues.
+ }
+ }
+
+ void closed(broker::Connection& connection) {
+ BrokerInfo info;
+ if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
+ QPID_LOG(debug, "HA primary: Backup disconnected: " << info);
+ haBroker.getMembership().remove(info.getSystemId());
+ // FIXME aconway 2012-06-01: changes to expected backup set for unready queues.
+ }
+ }
+ private:
+ void reject(broker::Connection&);
+ HaBroker& haBroker;
+ };
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_PRIMARYCONNECTIONOBSERVER_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
new file mode 100644
index 0000000000..55dc6b0d50
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueGuard.h"
+#include "ReplicatingSubscription.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/log/Statement.h"
+#include <sstream>
+
+namespace qpid {
+namespace ha {
+
+using namespace broker;
+using sys::Mutex;
+
+QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
+ : queue(q), subscription(0)
+{
+ // Set a log prefix message that identifies the remote broker.
+ std::ostringstream os;
+ os << "HA subscription " << queue.getName() << "@" << info.getLogId() << ": ";
+ logPrefix = os.str();
+}
+
+void QueueGuard::initialize() {
+ Mutex::ScopedLock l(lock);
+ queue.addObserver(shared_from_this());
+}
+
+void QueueGuard::enqueued(const QueuedMessage& qm) {
+ // Delay completion
+ QPID_LOG(trace, logPrefix << "Delaying completion of " << qm);
+ qm.payload->getIngressCompletion().startCompleter();
+ {
+ sys::Mutex::ScopedLock l(lock);
+ assert(!delayed.contains(qm.position));
+ delayed += qm.position;
+ }
+}
+
+// FIXME aconway 2012-06-05: ERROR, must call on ReplicatingSubscription
+
+void QueueGuard::dequeued(const QueuedMessage& qm) {
+ QPID_LOG(trace, logPrefix << "Dequeued " << qm);
+ ReplicatingSubscription* rs = 0;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ rs = subscription;
+ }
+ if (rs) rs->dequeued(qm);
+}
+
+void QueueGuard::cancel() {
+ queue.removeObserver(shared_from_this());
+ queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
+}
+
+void QueueGuard::attach(ReplicatingSubscription& rs) {
+ sys::Mutex::ScopedLock l(lock);
+ subscription = &rs;
+}
+
+void QueueGuard::complete(const QueuedMessage& qm, sys::Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Completed " << qm);
+ // The same message can be completed twice, by acknowledged and
+ // dequeued, remove it from the set so we only call
+ // finishCompleter() once
+ if (delayed.contains(qm.position)) {
+ qm.payload->getIngressCompletion().finishCompleter();
+ delayed -= qm.position;
+ }
+}
+
+void QueueGuard::complete(const QueuedMessage& qm) {
+ Mutex::ScopedLock l(lock);
+ complete(qm, l);
+}
+
+// FIXME aconway 2012-06-04: TODO support for timeout.
+
+}} // namespaces qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
new file mode 100644
index 0000000000..739c1e0e13
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -0,0 +1,101 @@
+#ifndef QPID_HA_QUEUEGUARD_H
+#define QPID_HA_QUEUEGUARD_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/framing/SequenceNumber.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/types/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <deque>
+#include <set>
+
+namespace qpid {
+namespace broker {
+class Queue;
+class QueuedMessage;
+class Message;
+}
+
+namespace ha {
+class BrokerInfo;
+class ReplicatingSubscription;
+
+/**
+ * A queue guard is a QueueObserver that delays completion of new
+ * messages arriving on a queue. It works as part of a
+ * ReplicatingSubscription to ensure messages are not acknowledged
+ * till they have been replicated.
+ *
+ * The guard is created before the ReplicatingSubscription to protect
+ * messages arriving before the creation of the subscription has not
+ * yet seen.
+ *
+ * THREAD SAFE: Called concurrently via QueueObserver::enqueued in
+ * arbitrary connection threads, and from ReplicatingSubscription
+ * in the subscriptions thread.
+ */
+class QueueGuard : public broker::QueueObserver,
+ public boost::enable_shared_from_this<QueueGuard>
+{
+ public:
+ QueueGuard(broker::Queue& q, const BrokerInfo&);
+
+ /** Must be called after ctor, requires a shared_ptr to this to exist.
+ * Must be called before ReplicatingSubscription::initialize(this)
+ */
+ void initialize();
+
+ /** QueueObserver override. Delay completion of the message. */
+ void enqueued(const broker::QueuedMessage&);
+
+ /** QueueObserver override: Complete a delayed message */
+ void dequeued(const broker::QueuedMessage&);
+
+ /** Complete a delayed message. */
+ void complete(const broker::QueuedMessage&);
+
+ /** Complete all delayed messages. */
+ void cancel();
+
+ void attach(ReplicatingSubscription&);
+
+ // Unused QueueObserver functions.
+ void acquired(const broker::QueuedMessage&) {}
+ void requeued(const broker::QueuedMessage&) {}
+
+ private:
+ sys::Mutex lock;
+ std::string logPrefix;
+ broker::Queue& queue;
+ framing::SequenceSet delayed;
+ ReplicatingSubscription* subscription;
+
+ void complete(const broker::QueuedMessage&, sys::Mutex::ScopedLock&);
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_QUEUEGUARD_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 58c5e452d7..4d12015008 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -21,7 +21,6 @@
#include "Counter.h"
#include "QueueReplicator.h"
-#include "HaBroker.h"
#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
@@ -60,15 +59,15 @@ bool QueueReplicator::isEventKey(const std::string key) {
return ret;
}
-QueueReplicator::QueueReplicator(HaBroker& hb,
+QueueReplicator::QueueReplicator(const BrokerInfo& info,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
- haBroker(hb), logPrefix(hb), queue(q), link(l)
+ logPrefix("HA backup of "+q->getName()+": "),
+ queue(q), link(l), brokerInfo(info)
{
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
- logPrefix.setMessage(q->getName());
QPID_LOG(info, logPrefix << "Created");
}
@@ -119,7 +118,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER,
queue->getPosition());
settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
- haBroker.getBrokerInfo().asFieldTable());
+ brokerInfo.asFieldTable());
SequenceNumber front;
if (ReplicatingSubscription::getFront(*queue, front))
settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front);
@@ -143,7 +142,7 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) {
+void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
// Thread safe: only calls thread safe Queue functions.
if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet
QueuedMessage message;
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 1b221a8d28..2f55e6cc85 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -22,7 +22,7 @@
*
*/
-#include "LogPrefix.h"
+#include "BrokerInfo.h"
#include "qpid/broker/Exchange.h"
#include "qpid/framing/SequenceSet.h"
#include <boost/enable_shared_from_this.hpp>
@@ -42,7 +42,6 @@ class Deliverable;
namespace ha {
class Counter;
-class HaBroker;
/**
* Exchange created on a backup broker to replicate a queue on the primary.
@@ -63,7 +62,7 @@ class QueueReplicator : public broker::Exchange,
/** Test if a string is an event key */
static bool isEventKey(const std::string key);
- QueueReplicator(HaBroker&,
+ QueueReplicator(const BrokerInfo&,
boost::shared_ptr<broker::Queue> q,
boost::shared_ptr<broker::Link> l);
@@ -81,15 +80,15 @@ class QueueReplicator : public broker::Exchange,
private:
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
- void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+ void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
- HaBroker& haBroker;
- LogPrefix logPrefix;
+ std::string logPrefix;
std::string bridgeName;
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<broker::Bridge> bridge;
+ BrokerInfo brokerInfo;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 4490e309aa..f7bfe6fda0 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -19,8 +19,8 @@
*
*/
+#include "QueueGuard.h"
#include "ReplicatingSubscription.h"
-#include "HaBroker.h"
#include "Primary.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
@@ -129,14 +129,11 @@ ReplicatingSubscription::Factory::create(
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
- haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
- queue->addObserver(rs);
- // NOTE: initialize must be called _after_ addObserver, so
- // messages can't be enqueued after setting readyPosition
- // but before registering the observer.
- rs->initialize();
+ boost::shared_ptr<QueueGuard> guard(new QueueGuard(*queue, rs->getBrokerInfo()));
+ guard->initialize(); // Must call before ReplicatingSubscription::initialize
+ rs->initialize(guard);
}
return rs;
}
@@ -177,7 +174,6 @@ ostream& operator<<(ostream& o, const QueueRange& qr) {
}
ReplicatingSubscription::ReplicatingSubscription(
- HaBroker& hb,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -190,24 +186,19 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments),
- haBroker(hb),
- logPrefix(hb),
dummy(new Queue(mask(name))),
ready(false)
{
try {
+ FieldTable ft;
+ if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
+ throw Exception("Replicating subscription does not have broker info");
+ info.assign(ft);
+
// Set a log prefix message that identifies the remote broker.
- // FIXME aconway 2012-05-24: use URL instead of host:port, include transport?
ostringstream os;
- os << queue->getName() << "@";
- FieldTable ft;
- if (arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) {
- BrokerInfo info(ft);
- os << info.getHostName() << ":" << info.getPort();
- }
- else
- os << parent->getSession().getConnection().getUrl();
- logPrefix.setMessage(os.str());
+ os << "HA subscription " << queue->getName() << "@" << info.getLogId() << ": ";
+ logPrefix = os.str();
QueueRange primary(*queue);
QueueRange backup(arguments);
@@ -251,17 +242,20 @@ ReplicatingSubscription::~ReplicatingSubscription() {
}
// Called in subscription's connection thread when the subscription is created.
-void ReplicatingSubscription::initialize() {
- sys::Mutex::ScopedLock l(lock); // QueueObserver methods can be called concurrently
+void ReplicatingSubscription::initialize(const boost::shared_ptr<QueueGuard>& g) {
+ sys::Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
+ // Attach to the guard.
+ guard = g;
+ guard->attach(*this);
// Send initial dequeues and position to the backup.
- // There most be a shared_ptr(this) when sending.
+ // There must be a shared_ptr(this) when sending.
sendDequeueEvent(l);
sendPositionEvent(position, l);
backupPosition = position;
// Set the ready position. All messages after this position have
- // been seen by us as QueueObserver.
+ // been seen by the guard.
QueueRange range;
{
// Drop the lock, QueueRange will lock the queues message lock
@@ -320,82 +314,28 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
}
}
-void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) {
+void ReplicatingSubscription::setReady(sys::Mutex::ScopedLock&) {
if (ready) return;
ready = true;
// Notify Primary that a subscription is ready.
{
sys::Mutex::ScopedUnlock u(lock);
QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
- if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName());
- }
-}
-
-// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
-
-// Mark a message completed. May be called by acknowledge or dequeued,
-// in arbitrary connection threads.
-void ReplicatingSubscription::complete(
- const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
-{
- // Handle completions for the subscribed queue, not the internal event queue.
- if (qm.queue == getQueue().get()) {
- QPID_LOG(trace, logPrefix << "Completed " << qm);
- Delayed::iterator i= delayed.find(qm.position);
- // The same message can be completed twice, by acknowledged and
- // dequeued, remove it from the set so it only gets completed
- // once.
- if (i != delayed.end()) {
- assert(i->second.payload == qm.payload);
- qm.payload->getIngressCompletion().finishCompleter();
- delayed.erase(i);
- }
+ if (Primary::get()) Primary::get()->readyReplica(*this);
}
}
-// Called before we get notified of the message being available and
-// under the message lock in the queue.
-// Called in arbitrary connection thread *with the queue lock held*
-void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
- // Delay completion
- QPID_LOG(trace, logPrefix << "Delaying completion of " << qm);
- qm.payload->getIngressCompletion().startCompleter();
- {
- sys::Mutex::ScopedLock l(lock);
- assert(delayed.find(qm.position) == delayed.end());
- delayed[qm.position] = qm;
- }
-}
-
-// Function to complete a delayed message, called by cancel()
-void ReplicatingSubscription::cancelComplete(
- const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
-{
- QPID_LOG(trace, logPrefix << "Cancel completed " << v.second);
- v.second.payload->getIngressCompletion().finishCompleter();
-}
-
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
- getQueue()->removeObserver(
- boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
- {
- sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "Cancel backup subscription to "
- << getQueue()->getName());
- for_each(delayed.begin(), delayed.end(),
- boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
- delayed.clear();
- }
+ guard->cancel();
ConsumerImpl::cancel();
}
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
- sys::Mutex::ScopedLock l(lock);
// Finish completion of message, it has been acknowledged by the backup.
- complete(msg, l);
+ guard->complete(msg);
}
// Hide the "queue deleted" error for a ReplicatingSubscription when a
@@ -403,7 +343,7 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
bool ReplicatingSubscription::hideDeletedError() { return true; }
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&)
{
if (dequeues.empty()) return;
QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
@@ -418,24 +358,27 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
}
}
-// QueueObserver override. Called after the message has been removed
+// Called via QueueObserver::dequeued override on guard.
+// Called after the message has been removed
// from the deque and under the messageLock in the queue. Called in
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
+ bool doComplete = false;
QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(qm.position);
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
- if (qm.position > position) complete(qm, l);
+ if (qm.position > position) doComplete = true;
}
+ // If we have not yet sent this message to the backup, then
+ // complete it now as it will never be accepted.
+ if (doComplete) guard->complete(qm);
notify(); // Ensure a call to doDispatch
}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex::ScopedLock&)
{
if (pos == backupPosition) return; // No need to send.
QPID_LOG(trace, logPrefix << "Sending position " << pos
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index ab02949952..e69a2159e6 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -23,6 +23,7 @@
*/
#include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY
+#include "BrokerInfo.h"
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/ConsumerFactory.h"
@@ -43,27 +44,25 @@ class Buffer;
}
namespace ha {
-class LogPrefix;
+class QueueGuard;
/**
- * A susbcription that represents a backup replicating a queue.
+ * A susbcription that replicates to a remote backup.
*
- * Runs on the primary. Delays completion of messages till the backup
- * has acknowledged, informs backup of locally dequeued messages.
+ * Runs on the primary. In conjunction with a QueueGuard, delays
+ * completion of messages till the backup has acknowledged, informs
+ * backup of locally dequeued messages.
*
- * THREAD SAFE: Used as a consumer in subscription's connection
- * thread, and as a QueueObserver in arbitrary connection threads.
+ * THREAD SAFE: Called in subscription's connection thread but also
+ * in arbitrary connection threads via dequeued.
*
* Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
*/
-class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
- public broker::QueueObserver
+class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{
public:
struct Factory : public broker::ConsumerFactory {
- HaBroker& haBroker;
- Factory(HaBroker& hb) : haBroker(hb) {}
boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
@@ -88,9 +87,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
*/
static bool getNext(broker::Queue&, framing::SequenceNumber from,
framing::SequenceNumber& result);
+ static bool isEmpty(broker::Queue&);
- ReplicatingSubscription(HaBroker&,
- broker::SemanticState* parent,
+ ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
@@ -98,11 +97,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
~ReplicatingSubscription();
- // QueueObserver overrides. NB called with queue lock held.
- void enqueued(const broker::QueuedMessage&);
- void dequeued(const broker::QueuedMessage&);
- void acquired(const broker::QueuedMessage&) {}
- void requeued(const broker::QueuedMessage&) {}
+ // Called via QueueGuard::dequeued
+ void dequeued(const broker::QueuedMessage& qm);
// Consumer overrides.
bool deliver(broker::QueuedMessage& msg);
@@ -111,30 +107,30 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
bool browseAcquired() const { return true; }
bool hideDeletedError();
+
/** Initialization that must be done after construction because it
- * requires a shared_ptr to this to exist.
+ * requires a shared_ptr to this to exist. Will attach to guard
*/
- void initialize();
+ void initialize(const boost::shared_ptr<QueueGuard>& guard);
+
+ BrokerInfo getBrokerInfo() const { return info; }
protected:
bool doDispatch();
- private:
- typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
- HaBroker& haBroker;
- LogPrefix logPrefix;
+ private:
+ std::string logPrefix;
boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
- Delayed delayed;
framing::SequenceSet dequeues;
- framing::SequenceNumber backupPosition;
framing::SequenceNumber readyPosition;
+ framing::SequenceNumber backupPosition;
bool ready;
+ BrokerInfo info;
+ boost::shared_ptr<QueueGuard> guard;
- void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
- void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
- void sendDequeueEvent(const sys::Mutex::ScopedLock&);
- void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
- void setReady(const sys::Mutex::ScopedLock&);
+ void sendDequeueEvent(sys::Mutex::ScopedLock&);
+ void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&);
+ void setReady(sys::Mutex::ScopedLock&);
void sendEvent(const std::string& key, framing::Buffer&);
friend struct Factory;
};
diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
new file mode 100644
index 0000000000..1db101dc94
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
@@ -0,0 +1,70 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReplicationTest.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace qpid {
+namespace ha {
+
+using types::Variant;
+
+ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) {
+ Enum<ReplicateLevel> rl;
+ if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get());
+ else return replicateDefault;
+}
+
+ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE))
+ return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else
+ return replicateDefault;
+}
+
+ReplicateLevel ReplicationTest::replicateLevel(const Variant::Map& m) {
+ Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end())
+ return replicateLevel(i->second.asString());
+ else
+ return replicateDefault;
+}
+
+namespace {
+const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
+}
+
+bool ReplicationTest::isReplicated(const Variant::Map& args, bool autodelete, bool exclusive) {
+ bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
+ return replicateLevel(args) && !ignore;
+}
+
+bool ReplicationTest::isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive) {
+ bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT);
+ return replicateLevel(args) && !ignore;
+}
+
+bool ReplicationTest::isReplicated(const broker::Queue& q) {
+ return isReplicated(q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner());
+}
+
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h
index da01154a80..4851f34e84 100644
--- a/qpid/cpp/src/qpid/ha/LogPrefix.h
+++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_LOGPREFIX_H
-#define QPID_HA_LOGPREFIX_H
+#ifndef QPID_HA_REPLICATIONTEST_H
+#define QPID_HA_REPLICATIONTEST_H
/*
*
@@ -22,38 +22,41 @@
*
*/
-#include "Enum.h"
-#include <iosfwd>
+#include "types.h"
+#include "qpid/types/Variant.h"
#include <string>
namespace qpid {
-namespace ha {
-class HaBroker;
+namespace broker {
+class Queue;
+}
+
+namespace framing {
+class FieldTable;
+}
+namespace ha {
/**
- * Standard information to prefix log messages.
+ * Test whether something is replicated, taking into account the
+ * default replication level.
*/
-class LogPrefix
+class ReplicationTest
{
public:
- /** For use by all classes other than HaBroker */
- LogPrefix(HaBroker& hb, const std::string& queue=std::string());
- LogPrefix(LogPrefix& lp, const std::string& queue);
- /** For use by the HaBroker itself. */
- LogPrefix(BrokerStatus&);
+ ReplicationTest(ReplicateLevel replicateDefault_) :
+ replicateDefault(replicateDefault_) {}
- void setMessage(const std::string&);
+ ReplicateLevel replicateLevel(const std::string& str);
+ ReplicateLevel replicateLevel(const framing::FieldTable& f);
+ ReplicateLevel replicateLevel(const types::Variant::Map& m);
+ bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive);
+ bool isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive);
+ bool isReplicated(const broker::Queue&);
private:
- HaBroker* haBroker;
- BrokerStatus* status;
- std::string tail;
- friend std::ostream& operator<<(std::ostream& o, const LogPrefix& l);
+ ReplicateLevel replicateDefault;
};
-
-std::ostream& operator<<(std::ostream& o, const LogPrefix& l);
-
}} // namespace qpid::ha
-#endif /*!QPID_HA_LOGPREFIX_H*/
+#endif /*!QPID_HA_REPLICATIONTEST_H*/
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 08d42471b8..213a5f64d5 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -22,7 +22,7 @@
*
*/
-#include "Enum.h"
+#include "types.h"
#include <string>
namespace qpid {
@@ -34,13 +34,12 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), expectedBackups(0), replicateDefault(NONE)
+ Settings() : cluster(false), replicateDefault(NONE)
{}
bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
- size_t expectedBackups;
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
private:
diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp
new file mode 100644
index 0000000000..279eb2c0e1
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "UnreadyQueueSet.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueRegistry.h"
+#include <boost/bind.hpp>
+#include <iostream>
+#include <algorithm>
+
+namespace qpid {
+namespace ha {
+
+using sys::Mutex;
+
+UnreadyQueueSet::UnreadyQueueSet(broker::Broker& broker, ReplicationTest rt, const IdSet& ids) :
+ logPrefix("HA unsafe queues: "), replicationTest(rt), expected(ids),
+ initializing(true), initialQueues(0)
+{
+ if (!expected.empty()) {
+ QPID_LOG(debug, logPrefix << "Recovering, waiting for backups: " << expected);
+ broker.getQueues().eachQueue(boost::bind(&UnreadyQueueSet::queueCreate, this, _1));
+ initialQueues = queues.size();
+ }
+ initializing = false;
+}
+
+void UnreadyQueueSet::setExpectedBackups(const IdSet& ids) {
+ Mutex::ScopedLock l(lock);
+ expected = ids;
+}
+
+bool UnreadyQueueSet::ready(const boost::shared_ptr<broker::Queue>& q, const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
+ QPID_LOG(debug, logPrefix << "Replicating subscription ready: " << id << " on "
+ << q->getName());
+ QueueMap::iterator i = queues.find(q);
+ if (i != queues.end()) {
+ // if (i->second.guard->ready(id)) {
+ // QPID_LOG(debug, logPrefix << "Releasing guard on " << q->getName());
+ // remove(i, l);
+ if (i->second.initial) --initialQueues;
+ // }
+ }
+ return initialQueues == 0;
+}
+
+void UnreadyQueueSet::queueCreate(const boost::shared_ptr<broker::Queue>& q) {
+ Mutex::ScopedLock l(lock);
+ if (replicationTest.isReplicated(*q) && !expected.empty()) {
+ QPID_LOG(debug, logPrefix << "Guarding " << q->getName() << " for " << expected);
+ // GuardPtr guard(new QueueGuard(*q, expected));
+ // FIXME aconway 2012-06-05: q->addObserver(guard);
+ queues[q] = Entry(initializing);//, guard);
+ }
+}
+
+void UnreadyQueueSet::queueDestroy(const boost::shared_ptr<broker::Queue>& q) {
+ Mutex::ScopedLock l(lock);
+ remove(queues.find(q), l);
+}
+
+void UnreadyQueueSet::remove(QueueMap::iterator i, sys::Mutex::ScopedLock&) {
+ if (i != queues.end()) {
+ QPID_LOG(debug, logPrefix << "Queue is safe: " << i->first->getName());
+ // FIXME aconway 2012-06-05: i->first->removeObserver(i->second.guard);
+ //i->second.guard->release();
+ queues.erase(i);
+ }
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h
new file mode 100644
index 0000000000..0731282c2b
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h
@@ -0,0 +1,88 @@
+#ifndef QPID_HA_BROKERGUARD_H
+#define QPID_HA_BROKERGUARD_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ReplicationTest.h"
+#include "types.h"
+#include "qpid/broker/ConfigurationObserver.h"
+#include "qpid/sys/Mutex.h"
+#include <set>
+#include <map>
+
+namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace ha {
+class QueueGuard;
+
+/**
+ * ConfigurationObserver that sets a QueueGuard on all existing/new queues
+ * so they are safe until their ReplicatingSubscriptions catch up.
+ *
+ * THREAD SAFE: Called concurrently as a ConfigurationObserver and via ready()
+ */
+class UnreadyQueueSet : public qpid::broker::ConfigurationObserver
+{
+ public:
+ /** Caller should call broker.getConfigurationObservers().add(shared_ptr(this)) */
+ UnreadyQueueSet(broker::Broker&, ReplicationTest rt, const IdSet& expected);
+
+ void setExpectedBackups(const IdSet&);
+
+ /** Backup id is ready on queue.
+ *@return true if all initial queuse are now ready.
+ */
+ bool ready(const boost::shared_ptr<broker::Queue>&, const types::Uuid& id);
+
+ // ConfigurationObserver overrides
+ void queueCreate(const boost::shared_ptr<broker::Queue>&);
+ void queueDestroy(const boost::shared_ptr<broker::Queue>&);
+
+ // FIXME aconway 2012-05-31: handle IdSet changes.
+ private:
+ typedef boost::shared_ptr<QueueGuard> GuardPtr;
+ struct Entry {
+ bool initial;
+ // FIXME aconway 2012-06-05: GuardPtr guard;
+ // Entry(bool i=false, GuardPtr g=GuardPtr()) : initial(i), guard(g) {}
+ Entry(bool i=false) : initial(i) {}
+ };
+ typedef std::map<boost::shared_ptr<broker::Queue>, Entry> QueueMap;
+
+ void remove(QueueMap::iterator i, sys::Mutex::ScopedLock&);
+
+ sys::Mutex lock;
+ std::string logPrefix;
+ ReplicationTest replicationTest;
+ IdSet expected;
+ QueueMap queues;
+ bool initializing;
+ size_t initialQueues; // Count of initial queues still unready.
+};
+
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_BROKERGUARD_H*/
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index ce14032694..3da482e732 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -31,9 +31,6 @@
<property name="publicUrl" type="sstr"
desc="URL advertized to clients to connect to the cluster."/>
- <property name="expectedBackups" type="uint16"
- desc="Number of HA backup brokers expected."/>
-
<property name="replicateDefault" type="sstr"
desc="Replication for queues/exchanges with no qpid.replicate argument"/>
@@ -51,10 +48,6 @@
<arg name="url" type="sstr" dir="I"/>
</method>
- <method name="setExpectedBackups" desc="Set number of backups expected">
- <arg name="expectedBackups" type="uint16" dir="I"/>
- </method>
-
<method name="replicate" desc="Replicate individual queue from remote broker.">
<arg name="broker" type="sstr" dir="I"/>
<arg name="queue" type="sstr" dir="I"/>
diff --git a/qpid/cpp/src/qpid/ha/Enum.cpp b/qpid/cpp/src/qpid/ha/types.cpp
index a5ee7ea51f..92acd76fca 100644
--- a/qpid/cpp/src/qpid/ha/Enum.cpp
+++ b/qpid/cpp/src/qpid/ha/types.cpp
@@ -19,30 +19,33 @@
*
*/
-#include "Enum.h"
+#include "types.h"
#include "qpid/Msg.h"
#include "qpid/Exception.h"
#include <algorithm>
#include <iostream>
+#include <iterator>
#include <assert.h>
namespace qpid {
namespace ha {
-const std::string QPID_REPLICATE("qpid.replicate");
+using namespace std;
-std::string EnumBase::str() const {
+const string QPID_REPLICATE("qpid.replicate");
+
+string EnumBase::str() const {
assert(value < count);
return names[value];
}
-void EnumBase::parse(const std::string& s) {
+void EnumBase::parse(const string& s) {
if (!parseNoThrow(s))
throw Exception(QPID_MSG("Invalid " << names[count] << " value: " << s));
}
-bool EnumBase::parseNoThrow(const std::string& s) {
- const char** i = std::find(names, names+count, s);
+bool EnumBase::parseNoThrow(const string& s) {
+ const char** i = find(names, names+count, s);
value = i - names;
return value < count;
}
@@ -58,15 +61,21 @@ template <> const char* Enum<BrokerStatus>::NAMES[] = {
};
template <> const size_t Enum<BrokerStatus>::N = 7;
-std::ostream& operator<<(std::ostream& o, EnumBase e) {
+ostream& operator<<(ostream& o, EnumBase e) {
return o << e.str();
}
-std::istream& operator>>(std::istream& i, EnumBase& e) {
- std::string s;
+istream& operator>>(istream& i, EnumBase& e) {
+ string s;
i >> s;
e.parse(s);
return i;
}
+ostream& operator<<(ostream& o, const IdSet& ids) {
+ ostream_iterator<qpid::types::Uuid> out(o, " ");
+ copy(ids.begin(), ids.end(), out);
+ return o;
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Enum.h b/qpid/cpp/src/qpid/ha/types.h
index adad21d2d4..e12c039c79 100644
--- a/qpid/cpp/src/qpid/ha/Enum.h
+++ b/qpid/cpp/src/qpid/ha/types.h
@@ -23,7 +23,9 @@
*/
#include "qpid/types/Variant.h"
+#include "qpid/types/Uuid.h"
#include <string>
+#include <set>
#include <iosfwd>
namespace qpid {
@@ -93,5 +95,15 @@ inline bool isPrimary(BrokerStatus s) {
inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
extern const std::string QPID_REPLICATE;
+
+// FIXME aconway 2012-06-04: rename types.h->types.h
+
+/**
+ * Define IdSet type, not a typedef so we can overload operator <<
+ */
+class IdSet : public std::set<types::Uuid> {};
+
+std::ostream& operator<<(std::ostream& o, const IdSet& ids);
+
}} // qpid::ha
#endif /*!QPID_HA_ENUM_H*/
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 1b93504b64..c32b7f2c96 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -248,7 +248,7 @@ class Broker(Popen):
self.log = "%s.log" % self.name
i = 1
while (os.path.exists(self.log)):
- self.log = "%s-%d.log" % (self.name, i)
+ self.log = "%s.%d.log" % (self.name, i)
i += 1
def get_log(self):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 6e270851f0..86679611c4 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -24,7 +24,7 @@ from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Co
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
from uuid import UUID
@@ -75,7 +75,10 @@ class HaBroker(Broker):
if not self._agent: self._agent = QmfAgent(self.host_port())
return self._agent
- def ha_status(self): self.agent().getHaBroker().status
+ def ha_status(self): return self.agent().getHaBroker().status
+
+ def wait_status(self, status):
+ assert retry(lambda: self.ha_status() == status), "%r != %r"%(self.ha_status(), status)
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -97,6 +100,14 @@ class HaBroker(Broker):
try: wait_address(bs, address)
finally: bs.connection.close()
+ def assert_browse(self, queue, expected, **kwargs):
+ """Verify queue contents by browsing."""
+ bs = self.connect().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
+
def assert_browse_backup(self, queue, expected, **kwargs):
"""Combines wait_backup and assert_browse_retry."""
bs = self.connect_admin().session()
@@ -157,9 +168,11 @@ class HaCluster(object):
if promote_next: self[(i+1) % len(self)].promote()
def restart(self, i):
+ """Start a broker with the same name and data directory. It will get
+ a separate log file: foo.n.log"""
b = self._brokers[i]
self._brokers[i] = HaBroker(
- self.test, name=self.next_name(), port=b.port(), brokers_url=self.url,
+ self.test, name=b.name, port=b.port(), brokers_url=self.url,
**self.kwargs)
def bounce(self, i, promote_next=True):
@@ -347,7 +360,8 @@ class ReplicationTests(BrokerTest):
primary.kill()
assert retry(lambda: not is_running(primary.pid))
backup.promote()
- self.assert_browse_retry(s, "q", ["foo"])
+ sender.send("bar")
+ self.assert_browse_retry(s, "q", ["foo", "bar"])
c.close()
def test_failover_cpp(self):
@@ -630,27 +644,6 @@ class ReplicationTests(BrokerTest):
assert valid_address(s, "ad")
assert valid_address(s, "time")
- def test_recovering(self):
- """Verify that the primary broker does not go active until expected
- backups have connected"""
- cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"])
- c = cluster[0].connect()
- for i in xrange(10):
- s = c.session().sender("q%s;{create:always}"%i)
- for j in xrange(100): s.send(str(j))
- cluster.kill(0) # Fail over to 1
- cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients.
- cluster.restart(0)
- c = retry(cluster[1].try_connect)
- self.assertTrue(c)
- cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]);
-
- # Verify in logs that all queue catch-up happened before the transition to active.
- log = open(cluster[1].log).read()
- i = log.find("Status change: recovering -> active")
- self.failIf(i < 0)
- self.assertEqual(log.find("caught up", i), -1)
-
def test_broker_info(self):
"""Check that broker information is correctly published via management"""
cluster = HaCluster(self, 3)
@@ -667,7 +660,7 @@ class ReplicationTests(BrokerTest):
# Check that all brokers have the same membership as the cluster
for broker in cluster:
qmf = broker.agent().getHaBroker()
- assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
# Add a new broker, check it is updated everywhere
b = cluster.start()
cluster_ports.append(b.port())
@@ -720,12 +713,10 @@ class LongTests(BrokerTest):
def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
- # Start a cluster, all members will be killed during the test.
- # FIXME aconway 2012-05-01: try expected-backups=1, requires catchup-ready fixed.
- brokers = HaCluster(self, 3, args=["--ha-expected-backups=2"])
+ brokers = HaCluster(self, 3)
# Start sender and receiver threads
- sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
+ sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False)
receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
receiver.start()
sender.start()
@@ -744,14 +735,14 @@ class LongTests(BrokerTest):
# otherwise we can lose messages. When we implement non-promotion
# of catchup brokers we can make this stronger: wait only for
# there to be at least one ready backup.
- assert retry(brokers[i%3].try_connect, 1)
+ brokers[i%3].wait_status("active")
brokers.bounce(i%3)
i += 1
def enough(): # Verify we're still running
receiver.check() # Verify no exceptions
return receiver.received > n + 100
# FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec.
- assert retry(enough, 10)
+ assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n)
except:
traceback.print_exc()
raise
@@ -764,6 +755,53 @@ class LongTests(BrokerTest):
brokers.kill(i, False)
if dead: raise Exception("Brokers not running: %s"%dead)
+
+class RecoveryTests(BrokerTest):
+ """Tests for recovery after a failure."""
+
+ def test_queue_hold(self):
+ """Verify that the broker holds queues without sufficient backup,
+ i.e. does not complete messages sent to those queues."""
+
+ cluster = HaCluster(self, 4);
+ # Wait for the primary to be ready
+ cluster[0].wait_status("active")
+
+ # Create a queue before the failure.
+ s1 = cluster.connect(0).session().sender("q1;{create:always}")
+ for b in cluster: b.wait_backup("q1")
+ for i in xrange(100): s1.send(str(i))
+
+ # Kill primary and 2 backups
+ for i in [0,1,2]: cluster.kill(i, False)
+ cluster[3].promote() # New primary, backups will be 1 and 2
+ cluster[3].wait_status("recovering")
+
+ # Create a queue after the failure
+ s2 = cluster.connect(3).session().sender("q2;{create:always}")
+
+ # Verify that messages sent are not completed
+ for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+ self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 100)
+
+ # Verify we can receive even if sending is on hold:
+ cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+
+ # Restart backups, verify queues are released only when both backups are up
+ cluster.restart(1)
+ self.assertEqual(s1.unsettled(), 100)
+ self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(cluster[3].ha_status(), "recovering")
+ cluster.restart(2)
+
+ def settled(sender): sender.sync(); return sender.unsettled() == 0;
+ assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+ assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+ cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+ cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+ cluster[3].wait_status("active"),
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")