summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/ha
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/ha')
-rw-r--r--cpp/src/qpid/ha/AlternateExchangeSetter.h4
-rw-r--r--cpp/src/qpid/ha/Backup.cpp106
-rw-r--r--cpp/src/qpid/ha/Backup.h20
-rw-r--r--cpp/src/qpid/ha/BrokerInfo.cpp24
-rw-r--r--cpp/src/qpid/ha/BrokerInfo.h8
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.cpp606
-rw-r--r--cpp/src/qpid/ha/BrokerReplicator.h52
-rw-r--r--cpp/src/qpid/ha/ConnectionObserver.cpp6
-rw-r--r--cpp/src/qpid/ha/ConnectionObserver.h2
-rw-r--r--cpp/src/qpid/ha/HaBroker.cpp252
-rw-r--r--cpp/src/qpid/ha/HaBroker.h62
-rw-r--r--cpp/src/qpid/ha/HaPlugin.cpp26
-rw-r--r--cpp/src/qpid/ha/Membership.cpp113
-rw-r--r--cpp/src/qpid/ha/Membership.h45
-rw-r--r--cpp/src/qpid/ha/Primary.cpp122
-rw-r--r--cpp/src/qpid/ha/Primary.h23
-rw-r--r--cpp/src/qpid/ha/QueueGuard.cpp94
-rw-r--r--cpp/src/qpid/ha/QueueGuard.h12
-rw-r--r--cpp/src/qpid/ha/QueueRange.h11
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.cpp161
-rw-r--r--cpp/src/qpid/ha/QueueReplicator.h18
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.cpp53
-rw-r--r--cpp/src/qpid/ha/RemoteBackup.h22
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.cpp57
-rw-r--r--cpp/src/qpid/ha/ReplicatingSubscription.h17
-rw-r--r--cpp/src/qpid/ha/ReplicationTest.cpp39
-rw-r--r--cpp/src/qpid/ha/ReplicationTest.h26
-rw-r--r--cpp/src/qpid/ha/Role.h55
-rw-r--r--cpp/src/qpid/ha/Settings.h16
-rw-r--r--cpp/src/qpid/ha/StandAlone.h45
-rw-r--r--cpp/src/qpid/ha/StatusCheck.cpp132
-rw-r--r--cpp/src/qpid/ha/StatusCheck.h71
-rw-r--r--cpp/src/qpid/ha/types.cpp6
-rw-r--r--cpp/src/qpid/ha/types.h1
34 files changed, 1559 insertions, 748 deletions
diff --git a/cpp/src/qpid/ha/AlternateExchangeSetter.h b/cpp/src/qpid/ha/AlternateExchangeSetter.h
index 08690e68bc..2386a01084 100644
--- a/cpp/src/qpid/ha/AlternateExchangeSetter.h
+++ b/cpp/src/qpid/ha/AlternateExchangeSetter.h
@@ -43,12 +43,14 @@ class AlternateExchangeSetter
AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {}
+ /** If altEx is already known, call setter(altEx) now else save for later */
void setAlternate(const std::string& altEx, const SetFunction& setter) {
- broker::Exchange::shared_ptr ex = exchanges.find(altEx);
+ boost::shared_ptr<broker::Exchange> ex = exchanges.find(altEx);
if (ex) setter(ex); // Set immediately.
else setters.insert(Setters::value_type(altEx, setter)); // Save for later.
}
+ /** Add an exchange and call any setters that are waiting for it. */
void addExchange(boost::shared_ptr<broker::Exchange> exchange) {
// Update the setters for this exchange
std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName());
diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp
index 6852a58b0c..2affc12bf6 100644
--- a/cpp/src/qpid/ha/Backup.cpp
+++ b/cpp/src/qpid/ha/Backup.cpp
@@ -20,9 +20,12 @@
*/
#include "Backup.h"
#include "BrokerReplicator.h"
+#include "ConnectionObserver.h"
#include "HaBroker.h"
+#include "Primary.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "StatusCheck.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
@@ -44,28 +47,38 @@ using namespace framing;
using namespace broker;
using types::Variant;
using std::string;
+using sys::Mutex;
Backup::Backup(HaBroker& hb, const Settings& s) :
- logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s)
+ logPrefix("Backup: "), membership(hb.getMembership()), stopped(false),
+ haBroker(hb), broker(hb.getBroker()), settings(s),
+ statusCheck(
+ new StatusCheck(
+ logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo()))
{
- // Empty brokerUrl means delay initialization until seBrokertUrl() is called.
- if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
+ // Set link properties to tag outgoing links.
+ framing::FieldTable linkProperties = broker.getLinkClientProperties();
+ linkProperties.setTable(
+ ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable());
+ broker.setLinkClientProperties(linkProperties);
}
-void Backup::initialize(const Url& brokers) {
- if (brokers.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
- string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
- types::Uuid uuid(true);
- // Declare the link
- std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
- broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
- brokers[0].host, brokers[0].port, protocol,
- false, // durable
- settings.mechanism, settings.username, settings.password,
- false); // no amq.failover - don't want to use client URL.
- {
- sys::Mutex::ScopedLock l(lock);
+void Backup::setBrokerUrl(const Url& brokers) {
+ if (brokers.empty()) return;
+ Mutex::ScopedLock l(lock);
+ if (stopped) return;
+ if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers);
+ if (!link) { // Not yet initialized
+ QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers);
+ string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol;
+ types::Uuid uuid(true);
+ std::pair<Link::shared_ptr, bool> result;
+ result = broker.getLinks().declare(
+ broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
+ brokers[0].host, brokers[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password,
+ false); // no amq.failover - don't want to use client URL.
link = result.first;
replicator.reset(new BrokerReplicator(haBroker, link));
replicator->initialize();
@@ -74,36 +87,57 @@ void Backup::initialize(const Url& brokers) {
link->setUrl(brokers); // Outside the lock, once set link doesn't change.
}
-Backup::~Backup() {
+void Backup::stop(Mutex::ScopedLock&) {
+ if (stopped) return;
+ stopped = true;
+ QPID_LOG(debug, logPrefix << "Leaving backup role.");
if (link) link->close();
- if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
+ if (replicator.get()) {
+ replicator->shutdown();
+ replicator.reset();
+ }
}
-// Called via management.
-void Backup::setBrokerUrl(const Url& url) {
- // Ignore empty URLs seen during start-up for some tests.
- if (url.empty()) return;
- bool linkSet = false;
+Role* Backup::recover(Mutex::ScopedLock&) {
+ BrokerInfo::Set backups;
{
- sys::Mutex::ScopedLock l(lock);
- linkSet = link;
+ Mutex::ScopedLock l(lock);
+ if (stopped) return 0;
+ stop(l); // Stop backup activity before starting primary.
+ QPID_LOG(notice, "Promoting to primary: " << haBroker.getBrokerInfo());
+ // Reset membership before allowing backups to connect.
+ backups = membership.otherBackups();
+ membership.clear();
+ return new Primary(haBroker, backups);
}
- if (linkSet)
- link->setUrl(url); // Outside lock, once set link doesn't change
- else
- initialize(url); // Deferred initialization
}
-void Backup::setStatus(BrokerStatus status) {
- switch (status) {
- case READY:
- QPID_LOG(notice, logPrefix << "Ready to become primary.");
+Role* Backup::promote() {
+ Mutex::ScopedLock l(lock);
+ if (stopped) return 0;
+ switch (haBroker.getStatus()) {
+ case JOINING:
+ if (statusCheck->canPromote()) return recover(l);
+ else {
+ QPID_LOG(error,
+ logPrefix << "Joining active cluster, cannot be promoted.");
+ throw Exception("Joining active cluster, cannot be promoted.");
+ }
break;
case CATCHUP:
- QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted.");
+ QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+ throw Exception("Still catching up, cannot be promoted.");
+ break;
+ case READY: return recover(l); break;
default:
- assert(0);
+ assert(0); // Not a valid state for the Backup role..
}
+ return 0; // Keep compiler happy
+}
+
+Backup::~Backup() {
+ Mutex::ScopedLock l(lock);
+ stop(l);
}
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h
index 4f2d5babde..4943ca5e2e 100644
--- a/cpp/src/qpid/ha/Backup.h
+++ b/cpp/src/qpid/ha/Backup.h
@@ -22,6 +22,7 @@
*
*/
+#include "Role.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
@@ -38,30 +39,41 @@ namespace ha {
class Settings;
class BrokerReplicator;
class HaBroker;
+class StatusCheck;
+class Membership;
/**
- * State associated with a backup broker. Manages connections to primary.
+ * Backup role: Manages connections to primary, replicates management events and queue contents.
*
* THREAD SAFE
*/
-class Backup
+class Backup : public Role
{
public:
Backup(HaBroker&, const Settings&);
~Backup();
+
+ std::string getLogPrefix() const { return logPrefix; }
+
void setBrokerUrl(const Url&);
- void setStatus(BrokerStatus);
+
+ Role* promote();
private:
- void initialize(const Url&);
+ void stop(sys::Mutex::ScopedLock&);
+ Role* recover(sys::Mutex::ScopedLock&);
+
std::string logPrefix;
+ Membership& membership;
sys::Mutex lock;
+ bool stopped;
HaBroker& haBroker;
broker::Broker& broker;
Settings settings;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<BrokerReplicator> replicator;
+ std::auto_ptr<StatusCheck> statusCheck;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/BrokerInfo.cpp b/cpp/src/qpid/ha/BrokerInfo.cpp
index c8bd1a14be..8efed91b17 100644
--- a/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -33,27 +33,22 @@ namespace qpid {
namespace ha {
namespace {
-std::string SYSTEM_ID="system-id";
-std::string HOST_NAME="host-name";
-std::string PORT="port";
-std::string STATUS="status";
+const std::string SYSTEM_ID="system-id";
+const std::string HOST_NAME="host-name";
+const std::string PORT="port";
+const std::string STATUS="status";
}
using types::Uuid;
using types::Variant;
using framing::FieldTable;
-BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) :
- hostName(host), port(port_), systemId(id)
-{
- updateLogId();
-}
+BrokerInfo::BrokerInfo() : port(0), status(JOINING) {}
-void BrokerInfo::updateLogId() {
- std::ostringstream o;
- o << hostName << ":" << port;
- logId = o.str();
-}
+BrokerInfo::BrokerInfo(const types::Uuid& id, BrokerStatus s,
+ const std::string& host, uint16_t port_) :
+ hostName(host), port(port_), systemId(id), status(s)
+{}
FieldTable BrokerInfo::asFieldTable() const {
Variant::Map m = asMap();
@@ -91,7 +86,6 @@ void BrokerInfo::assign(const Variant::Map& m) {
hostName = get(m, HOST_NAME).asString();
port = get(m, PORT).asUint16();
status = BrokerStatus(get(m, STATUS).asUint8());
- updateLogId();
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
diff --git a/cpp/src/qpid/ha/BrokerInfo.h b/cpp/src/qpid/ha/BrokerInfo.h
index 642f7c1361..40358336b0 100644
--- a/cpp/src/qpid/ha/BrokerInfo.h
+++ b/cpp/src/qpid/ha/BrokerInfo.h
@@ -43,8 +43,9 @@ class BrokerInfo
typedef std::set<BrokerInfo> Set;
typedef std::map<types::Uuid, BrokerInfo> Map;
- BrokerInfo() {}
- BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id);
+ BrokerInfo();
+ BrokerInfo(const types::Uuid& id, BrokerStatus,
+ const std::string& host=std::string(), uint16_t port=0);
BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
BrokerInfo(const types::Variant::Map& m) { assign(m); }
@@ -52,7 +53,6 @@ class BrokerInfo
std::string getHostName() const { return hostName; }
BrokerStatus getStatus() const { return status; }
uint16_t getPort() const { return port; }
- std::string getLogId() const { return logId; }
void setStatus(BrokerStatus s) { status = s; }
@@ -66,8 +66,6 @@ class BrokerInfo
bool operator<(const BrokerInfo x) const { return systemId < x.systemId; }
private:
- void updateLogId();
- std::string logId;
std::string hostName;
uint16_t port;
types::Uuid systemId;
diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp
index 3a3c9c2954..983b976d76 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -23,16 +23,19 @@
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Connection.h"
+#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/reply_exceptions.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventUnbind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
@@ -41,6 +44,7 @@
#include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include <boost/bind.hpp>
#include <algorithm>
#include <sstream>
#include <iostream>
@@ -57,23 +61,25 @@ using qmf::org::apache::qpid::broker::EventQueueDeclare;
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
using qmf::org::apache::qpid::ha::EventMembersUpdate;
+using qpid::broker::amqp_0_10::MessageTransfer;
using namespace framing;
-using std::string;
+using namespace std;
using std::ostream;
using types::Variant;
using namespace broker;
namespace {
-const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
+const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator");
const string CLASS_NAME("_class_name");
const string EVENT("_event");
const string OBJECT_NAME("_object_name");
const string PACKAGE_NAME("_package_name");
const string QUERY_RESPONSE("_query_response");
-const string SCHEMA_ID("_schema_id");
const string VALUES("_values");
+const string SCHEMA_ID("_schema_id");
+const string WHAT("_what");
const string ALTEX("altEx");
const string ALTEXCHANGE("altExchange");
@@ -81,24 +87,27 @@ const string ARGS("args");
const string ARGUMENTS("arguments");
const string AUTODEL("autoDel");
const string AUTODELETE("autoDelete");
-const string EXCL("excl");
-const string EXCLUSIVE("exclusive");
const string BIND("bind");
-const string UNBIND("unbind");
const string BINDING("binding");
+const string BINDING_KEY("bindingKey");
const string CREATED("created");
const string DISP("disp");
+const string DEST("dest");
const string DURABLE("durable");
const string EXCHANGE("exchange");
+const string EXCL("excl");
+const string EXCLUSIVE("exclusive");
const string EXNAME("exName");
const string EXTYPE("exType");
+const string HA_BROKER("habroker");
const string KEY("key");
const string NAME("name");
+const string PARTIAL("partial");
const string QNAME("qName");
const string QUEUE("queue");
const string TYPE("type");
-const string HA_BROKER("habroker");
-const string PARTIAL("partial");
+const string UNBIND("unbind");
+const string CONSUMER_COUNT("consumerCount");
const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#");
@@ -107,10 +116,6 @@ const string QMF_CONTENT("qmf.content");
const string QMF_DEFAULT_TOPIC("qmf.default.topic");
const string QMF_OPCODE("qmf.opcode");
-const string _WHAT("_what");
-const string _CLASS_NAME("_class_name");
-const string _PACKAGE_NAME("_package_name");
-const string _SCHEMA_ID("_schema_id");
const string OBJECT("OBJECT");
const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
@@ -118,21 +123,18 @@ const string QMF_DEFAULT_DIRECT("qmf.default.direct");
const string _QUERY_REQUEST("_query_request");
const string BROKER("broker");
const string MEMBERS("members");
-
-template <class T> bool match(Variant::Map& schema) {
- return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
-}
+const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
void sendQuery(const string& packageName, const string& className, const string& queueName,
SessionHandler& sessionHandler)
{
framing::AMQP_ServerProxy peer(sessionHandler.out);
Variant::Map request;
- request[_WHAT] = OBJECT;
+ request[WHAT] = OBJECT;
Variant::Map schema;
- schema[_CLASS_NAME] = className;
- schema[_PACKAGE_NAME] = packageName;
- request[_SCHEMA_ID] = schema;
+ schema[CLASS_NAME] = className;
+ schema[PACKAGE_NAME] = packageName;
+ request[SCHEMA_ID] = schema;
AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
method.setBof(true);
@@ -170,19 +172,144 @@ Variant::Map asMapVoid(const Variant& value) {
}
} // namespace
+// Listens for errors on the bridge session.
+class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener {
+ public:
+ ErrorListener(const std::string& lp, BrokerReplicator& br) :
+ logPrefix(lp), brokerReplicator(br) {}
+
+ void connectionException(framing::connection::CloseCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Connection error: " << msg);
+ }
+ void channelException(framing::session::DetachCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Channel error: " << msg);
+ }
+ void executionException(framing::execution::ErrorCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached.");
+ }
+
+ private:
+ std::string logPrefix;
+ BrokerReplicator& brokerReplicator;
+};
+
+class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver
+{
+ public:
+ ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {}
+ virtual void connection(Connection&) {}
+ virtual void opened(Connection&) {}
+
+ virtual void closed(Connection& c) {
+ if (brokerReplicator.link && &c == brokerReplicator.connection)
+ brokerReplicator.disconnected();
+ }
+ virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); }
+ private:
+ BrokerReplicator& brokerReplicator;
+};
+
+/** Keep track of queues or exchanges during the update process to solve 2
+ * problems.
+ *
+ * 1. Once all responses are processed, remove any queues/exchanges
+ * that were not mentioned as they no longer exist on the primary.
+ *
+ * 2. During the update if we see an event for an object we should
+ * ignore any subsequent responses for that object as they are out
+ * of date.
+ */
+class BrokerReplicator::UpdateTracker {
+ public:
+ typedef std::set<std::string> Names;
+ typedef boost::function<void (const std::string&)> CleanFn;
+
+ UpdateTracker(const std::string& type_, // "queue" or "exchange"
+ CleanFn f, const ReplicationTest& rt)
+ : type(type_), cleanFn(f), repTest(rt) {}
+
+ /** Destructor cleans up remaining initial queues. */
+ ~UpdateTracker() {
+ // Don't throw in a destructor.
+ try { for_each(initial.begin(), initial.end(), cleanFn); }
+ catch (const std::exception& e) {
+ QPID_LOG(error, "Error in cleanup of lost objects: " << e.what());
+ }
+ }
+
+ /** Add an exchange name */
+ void addExchange(Exchange::shared_ptr ex) {
+ if (repTest.getLevel(*ex))
+ initial.insert(ex->getName());
+ }
+
+ /** Add a queue name. */
+ void addQueue(Queue::shared_ptr q) {
+ if (repTest.getLevel(*q))
+ initial.insert(q->getName());
+ }
+
+ /** Received an event for name */
+ void event(const std::string& name) {
+ initial.erase(name); // no longer a candidate for deleting
+ events.insert(name); // we have seen an event for this name
+ }
+
+ /** Received a response for name.
+ *@return true if this response should be processed, false if we have
+ *already seen an event for this object.
+ */
+ bool response(const std::string& name) {
+ initial.erase(name); // no longer a candidate for deleting
+ return events.find(name) == events.end(); // true if no event seen yet.
+ }
+
+ private:
+ void clean(const std::string& name) {
+ QPID_LOG(info, "Backup updated, deleting " << type << " " << name);
+ cleanFn(name);
+ }
+
+ std::string type;
+ Names initial, events;
+ CleanFn cleanFn;
+ ReplicationTest repTest;
+};
+
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
- haBroker(hb), broker(hb.getBroker()), link(l),
+ logPrefix("Backup: "), replicationTest(NONE),
+ haBroker(hb), broker(hb.getBroker()),
+ exchanges(broker.getExchanges()), queues(broker.getQueues()),
+ link(l),
initialized(false),
- alternates(hb.getBroker().getExchanges())
-{}
+ alternates(hb.getBroker().getExchanges()),
+ connection(0)
+{
+ connectionObserver.reset(new ConnectionObserver(*this));
+ broker.getConnectionObservers().add(connectionObserver);
+ framing::FieldTable args = getArgs();
+ args.setString(QPID_REPLICATE, printable(NONE).str());
+ setArgs(args);
+
+ dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare;
+ dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete;
+ dispatch[EventExchangeDeclare::getFullName()] = &BrokerReplicator::doEventExchangeDeclare;
+ dispatch[EventExchangeDelete::getFullName()] = &BrokerReplicator::doEventExchangeDelete;
+ dispatch[EventBind::getFullName()] = &BrokerReplicator::doEventBind;
+ dispatch[EventUnbind::getFullName()] = &BrokerReplicator::doEventUnbind;
+ dispatch[EventMembersUpdate::getFullName()] = &BrokerReplicator::doEventMembersUpdate;
+ dispatch[EventSubscribe::getFullName()] = &BrokerReplicator::doEventSubscribe;
+}
void BrokerReplicator::initialize() {
// Can't do this in the constructor because we need a shared_ptr to this.
types::Uuid uuid(true);
const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
- broker.getLinks().declare(
+ std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare(
name, // name for bridge
*link, // parent
false, // durable
@@ -195,21 +322,47 @@ void BrokerReplicator::initialize() {
"", // excludes
false, // dynamic
0, // sync?
- // shared_ptr keeps this in memory until outstanding initializeBridge
+ // shared_ptr keeps this in memory until outstanding connected
// calls are run.
- boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2)
+ boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2)
);
+ assert(result.second);
+ result.first->setErrorListener(
+ boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this)));
}
-BrokerReplicator::~BrokerReplicator() { }
+BrokerReplicator::~BrokerReplicator() { shutdown(); }
+
+namespace {
+void collectQueueReplicators(
+ const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect)
+{
+ boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+ if (qr) collect.insert(qr);
+}
+} // namespace
+
+void BrokerReplicator::shutdown() {
+ // NOTE: this is called in a QMF dispatch thread, not the Link's connection
+ // thread. It's OK to be unlocked because it doesn't use any mutable state,
+ // it only calls thread safe functions objects belonging to the Broker.
+
+ // Unregister with broker objects:
+ if (connectionObserver) {
+ broker.getConnectionObservers().remove(connectionObserver);
+ connectionObserver.reset();
+ }
+ broker.getExchanges().destroy(getName());
+}
// This is called in the connection IO thread when the bridge is started.
-void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) {
// Use the credentials of the outgoing Link connection for creating queues,
// exchanges etc. We know link->getConnection() is non-zero because we are
// being called in the connections thread context.
//
- assert(link->getConnection());
+ connection = link->getConnection();
+ assert(connection);
userId = link->getConnection()->getUserId();
remoteHost = link->getConnection()->getUrl();
@@ -221,6 +374,19 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
<< " status:" << printable(haBroker.getStatus()));
initialized = true;
+ exchangeTracker.reset(
+ new UpdateTracker("exchange",
+ boost::bind(&BrokerReplicator::deleteExchange, this, _1),
+ replicationTest));
+ exchanges.eachExchange(
+ boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1));
+
+ queueTracker.reset(
+ new UpdateTracker("queue",
+ boost::bind(&BrokerReplicator::deleteQueue, this, _1, true),
+ replicationTest));
+ queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1));
+
framing::AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
@@ -231,9 +397,14 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable());
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable());
//subscribe to the queue
- peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+ FieldTable arguments;
+ arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+ peer.getMessage().subscribe(
+ queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/,
+ false/*exclusive*/, "", 0, arguments);
+ peer.getMessage().setFlowMode(args.i_dest, 1); // Window
+ peer.getMessage().flow(args.i_dest, 0, haBroker.getSettings().getFlowMessages());
+ peer.getMessage().flow(args.i_dest, 1, haBroker.getSettings().getFlowBytes());
// Issue a query request for queues, exchanges, bindings and the habroker
// using event queue as the reply-to address
@@ -247,12 +418,12 @@ void BrokerReplicator::route(Deliverable& msg) {
// We transition from JOINING->CATCHUP on the first message received from the primary.
// Until now we couldn't be sure if we had a good connection to the primary.
if (haBroker.getStatus() == JOINING) {
- haBroker.setStatus(CATCHUP);
+ haBroker.getMembership().setStatus(CATCHUP);
QPID_LOG(notice, logPrefix << "Connected to primary " << primary);
}
Variant::List list;
try {
- if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage()))
+ if (!MessageTransfer::isQMFv2(msg.getMessage()))
throw Exception("Unexpected message, not QMF2 event or query response.");
// decode as list
string content = msg.getMessage().getContent();
@@ -264,13 +435,9 @@ void BrokerReplicator::route(Deliverable& msg) {
QPID_LOG(trace, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
- if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
- else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values);
- else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values);
- else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values);
- else if (match<EventBind>(schema)) doEventBind(values);
- else if (match<EventUnbind>(schema)) doEventUnbind(values);
- else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values);
+ EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]);
+ EventDispatchMap::iterator j = dispatch.find(key);
+ if (j != dispatch.end()) (this->*(j->second))(values);
}
} else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
@@ -285,15 +452,21 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (type == BINDING) doResponseBind(values);
else if (type == HA_BROKER) doResponseHaBroker(values);
}
- if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
- // We have received all of the exchange response.
+ if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) {
+ QPID_LOG(debug, logPrefix << "All exchange responses received.")
+ exchangeTracker.reset(); // Clean up exchanges that no longer exist in the primary
alternates.clear();
}
+ if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) {
+ QPID_LOG(debug, logPrefix << "All queue responses received.");
+ queueTracker.reset(); // Clean up queues that no longer exist in the primary
+ }
}
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
- << ": while handling: " << list);
- haBroker.shutdown();
+;
+ haBroker.shutdown(
+ QPID_MSG(logPrefix << "Configuration replication failed: "
+ << e.what() << ": while handling: " << list));
throw;
}
}
@@ -301,31 +474,22 @@ void BrokerReplicator::route(Deliverable& msg) {
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
Variant::Map argsMap = asMapVoid(values[ARGS]);
- bool autoDel = values[AUTODEL].asBool();
- bool excl = values[EXCL].asBool();
- if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[QNAME].asString();
QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
+ QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
+ if (queueTracker.get()) queueTracker->event(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a queue with this name, replace it.
// The queue was definitely created on the primary.
- if (broker.getQueues().find(name)) {
- QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name);
- broker.getQueues().destroy(name);
- stopQueueReplicator(name);
+ if (queues.find(name)) {
+ QPID_LOG(warning, logPrefix << "Declare event, replacing exsiting queue: "
+ << name);
+ deleteQueue(name);
}
- settings.populate(args, settings.storeSettings);
- std::pair<boost::shared_ptr<Queue>, bool> result =
- broker.createQueue(
- name,
- settings,
- 0 /*i.e. no owner regardless of exclusivity on master*/,
- values[ALTEX].asString(),
- userId,
- remoteHost);
- assert(result.second); // Should be true since we destroyed existing queue above
- startQueueReplicator(result.first);
+ replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args,
+ values[ALTEX].asString());
}
}
@@ -333,7 +497,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::findQueueReplicator(
const std::string& qname)
{
string rname = QueueReplicator::replicatorName(qname);
- boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+ boost::shared_ptr<broker::Exchange> ex = exchanges.find(rname);
return boost::dynamic_pointer_cast<QueueReplicator>(ex);
}
@@ -341,79 +505,85 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// The remote queue has already been deleted so replicator
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
- boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
- if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
+ boost::shared_ptr<Queue> queue = queues.find(name);
+ if (queue && replicationTest.getLevel(*queue)) {
QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
- stopQueueReplicator(name);
- broker.deleteQueue(name, userId, remoteHost);
+ if (queueTracker.get()) queueTracker->event(name);
+ deleteQueue(name);
}
}
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
- if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
- if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+ if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
string name = values[EXNAME].asString();
QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
+ if (exchangeTracker.get()) exchangeTracker->event(name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
// If we already have a exchange with this name, replace it.
// The exchange was definitely created on the primary.
- if (broker.getExchanges().find(name)) {
- broker.getExchanges().destroy(name);
- QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name);
+ if (exchanges.find(name)) {
+ deleteExchange(name);
+ QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: "
+ << name);
}
- boost::shared_ptr<Exchange> exchange =
- createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString());
- assert(exchange);
+ CreateExchangeResult result = createExchange(
+ name, values[EXTYPE].asString(), values[DURABLE].asBool(), args,
+ values[ALTEX].asString());
+ replicatedExchanges.insert(name);
+ assert(result.second);
}
}
void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
+ boost::shared_ptr<Exchange> exchange = exchanges.find(name);
if (!exchange) {
- QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name);
- } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
+ QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
+ } else if (!replicationTest.getLevel(*exchange)) {
QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
} else {
QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
- broker.deleteExchange(name, userId, remoteHost);
+ if (exchangeTracker.get()) exchangeTracker->event(name);
+ deleteExchange(name);
+ replicatedExchanges.erase(name);
}
}
void BrokerReplicator::doEventBind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
- broker.getExchanges().find(values[EXNAME].asString());
+ exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
- broker.getQueues().find(values[QNAME].asString());
- // We only replicate binds for a replicated queue to replicated
- // exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ queues.find(values[QNAME].asString());
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
+ // We only replicate binds for a replicated queue to replicated exchange
+ // that both exist locally. Respect the replication level set in the
+ // bind arguments, but replicate by default.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
- << " key=" << key);
+ << " key=" << key
+ << " args=" << args);
exchange->bind(queue, key, &args, 0);
}
}
void BrokerReplicator::doEventUnbind(Variant::Map& values) {
boost::shared_ptr<Exchange> exchange =
- broker.getExchanges().find(values[EXNAME].asString());
+ exchanges.find(values[EXNAME].asString());
boost::shared_ptr<Queue> queue =
- broker.getQueues().find(values[QNAME].asString());
+ queues.find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue))
{
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
@@ -424,7 +594,17 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) {
Variant::List members = values[MEMBERS].asList();
- haBroker.setMembership(members);
+ setMembership(members);
+}
+
+void BrokerReplicator::doEventSubscribe(Variant::Map& values) {
+ // Ignore queue replicator subscriptions.
+ if (QueueReplicator::isReplicatorName(values[DEST].asString())) return;
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]);
+ if (qr) {
+ qr->setSubscribed();
+ QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]);
+ }
}
namespace {
@@ -441,40 +621,68 @@ string getAltExchange(const types::Variant& var) {
}
else return string();
}
+
+Variant getHaUuid(const Variant::Map& map) {
+ Variant::Map::const_iterator i = map.find(QPID_HA_UUID);
+ return i == map.end() ? Variant() : i->second;
}
+} // namespace
+
+
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.isReplicated(
- CONFIGURATION,
- values[ARGUMENTS].asMap(),
- values[AUTODELETE].asBool(),
- values[EXCLUSIVE].asBool()))
- return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name(values[NAME].asString());
+ if (!queueTracker.get())
+ throw Exception(QPID_MSG("Unexpected queue response: " << values));
+ if (!queueTracker->response(name)) return; // Response is out-of-date
QPID_LOG(debug, logPrefix << "Queue response: " << name);
+ // If we see a queue with the same name as one we have, but not the same UUID,
+ // then replace the one we have.
+ boost::shared_ptr<Queue> queue = queues.find(name);
+ if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) {
+ QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: "
+ << name);
+ deleteQueue(name);
+ }
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
- boost::shared_ptr<Queue> queue =
- createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
- getAltExchange(values[ALTEXCHANGE]));
- // It is normal for the queue to already exist if we are failing over.
- if (queue) startQueueReplicator(queue);
- else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name);
+ boost::shared_ptr<QueueReplicator> qr = replicateQueue(
+ name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args,
+ getAltExchange(values[ALTEXCHANGE]));
+ if (qr) {
+ Variant::Map::const_iterator i = values.find(CONSUMER_COUNT);
+ if (i != values.end() && isIntegerType(i->second.getType())) {
+ if (i->second.asInt64()) qr->setSubscribed();
+ }
+ }
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicationTest.replicateLevel(argsMap)) return;
+ if (!replicationTest.getLevel(argsMap)) return;
string name = values[NAME].asString();
+ if (!exchangeTracker.get())
+ throw Exception(QPID_MSG("Unexpected exchange response: " << values));
+ if (!exchangeTracker->response(name)) return; // Response is out of date.
QPID_LOG(debug, logPrefix << "Exchange response: " << name);
framing::FieldTable args;
qpid::amqp_0_10::translate(argsMap, args);
- boost::shared_ptr<Exchange> exchange = createExchange(
+ // If we see an exchange with the same name as one we have, but not the same UUID,
+ // then replace the one we have.
+ boost::shared_ptr<Exchange> exchange = exchanges.find(name);
+ if (exchange &&
+ exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID))
+ {
+ QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: "
+ << name);
+ deleteExchange(name);
+ }
+ CreateExchangeResult result = createExchange(
name, values[TYPE].asString(), values[DURABLE].asBool(), args,
getAltExchange(values[ALTEXCHANGE]));
- // It is normal for the exchange to already exist if we are failing over.
- QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name);
+ replicatedExchanges.insert(name);
}
namespace {
@@ -501,19 +709,25 @@ const std::string QUEUE_REF("queueRef");
void BrokerReplicator::doResponseBind(Variant::Map& values) {
std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]);
std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
- boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
- boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
+ boost::shared_ptr<Exchange> exchange = exchanges.find(exName);
+ boost::shared_ptr<Queue> queue = queues.find(qName);
- // Automatically replicate binding if queue and exchange exist and are replicated
- if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
- queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+ framing::FieldTable args;
+ qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+
+ // Automatically replicate binding if queue and exchange exist and are replicated.
+ // Respect replicate setting in binding args but default to replicated.
+ if (exchange && replicationTest.getLevel(*exchange) &&
+ queue && replicationTest.getLevel(*queue) &&
+ ReplicationTest(ALL).getLevel(args))
{
- string key = values[KEY].asString();
+ string key = values[BINDING_KEY].asString();
QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
<< " queue:" << qName
- << " key:" << key);
- framing::FieldTable args;
- qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
+ << " key:" << key
+ << " args:" << args);
+// framing::FieldTable args;
+// qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
exchange->bind(queue, key, &args, 0);
}
}
@@ -527,42 +741,65 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
- ReplicateLevel primary = replicationTest.replicateLevel(
- values[REPLICATE_DEFAULT].asString());
+ ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
<< ") does not match primary (" << primary << ")"));
- haBroker.setMembership(values[MEMBERS].asList());
+ setMembership(values[MEMBERS].asList());
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what()
- << ": " << values);
- haBroker.shutdown();
+ haBroker.shutdown(
+ QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what()
+ << ": " << values));
+
throw;
}
}
-void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue)
+boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
+ const boost::shared_ptr<Queue>& queue)
{
- if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
+ if (replicationTest.getLevel(*queue) == ALL) {
boost::shared_ptr<QueueReplicator> qr(
new QueueReplicator(haBroker, queue, link));
- if (!broker.getExchanges().registerExchange(qr))
+ if (!exchanges.registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
+ return qr;
}
+ return boost::shared_ptr<QueueReplicator>();
}
-void BrokerReplicator::stopQueueReplicator(const std::string& name) {
- boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
- if (qr) {
- qr->deactivate();
- // QueueReplicator's bridge is now queued for destruction but may not
- // actually be destroyed.
- broker.getExchanges().destroy(qr->getName());
+void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
+ Queue::shared_ptr queue = queues.find(name);
+ if (queue) {
+ // Purge before deleting to ensure that we don't reroute any
+ // messages. Any reroutes will be done at the primary and
+ // replicated as normal.
+ if (purge) queue->purge(0, boost::shared_ptr<Exchange>());
+ broker.deleteQueue(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Queue deleted: " << name);
+ }
+}
+
+void BrokerReplicator::deleteExchange(const std::string& name) {
+ try {
+ boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name);
+ if (!exchange) {
+ QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name);
+ return;
+ }
+ if (exchange->inUseAsAlternate()) {
+ QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name);
+ return;
+ }
+ broker.deleteExchange(name, userId, remoteHost);
+ QPID_LOG(debug, logPrefix << "Exchange deleted: " << name);
+ } catch (const framing::NotFoundException&) {
+ QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name);
}
}
-boost::shared_ptr<Queue> BrokerReplicator::createQueue(
+boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue(
const std::string& name,
bool durable,
bool autodelete,
@@ -571,7 +808,7 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue(
{
QueueSettings settings(durable, autodelete);
settings.populate(arguments, settings.storeSettings);
- std::pair<boost::shared_ptr<Queue>, bool> result =
+ CreateQueueResult result =
broker.createQueue(
name,
settings,
@@ -579,24 +816,23 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue(
string(), // Set alternate exchange below
userId,
remoteHost);
- if (result.second) {
- if (!alternateExchange.empty()) {
- alternates.setAlternate(
- alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
- }
- return result.first;
+ boost::shared_ptr<QueueReplicator> qr;
+ if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first);
+ if (result.second && !alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1));
}
- else return boost::shared_ptr<Queue>();
+ return qr;
}
-boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
+BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange(
const std::string& name,
const std::string& type,
bool durable,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange)
{
- std::pair<boost::shared_ptr<Exchange>, bool> result =
+ CreateExchangeResult result =
broker.createExchange(
name,
type,
@@ -605,15 +841,12 @@ boost::shared_ptr<Exchange> BrokerReplicator::createExchange(
args,
userId,
remoteHost);
- if (result.second) {
- alternates.addExchange(result.first);
- if (!alternateExchange.empty()) {
- alternates.setAlternate(
- alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
- }
- return result.first;
+ alternates.addExchange(result.first);
+ if (!alternateExchange.empty()) {
+ alternates.setAlternate(
+ alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1));
}
- else return boost::shared_ptr<Exchange>();
+ return result;
}
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; }
@@ -626,4 +859,61 @@ void BrokerReplicator::write(char* /*target*/) {}
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
+void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) {
+ boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
+ if (!qr) return;
+ assert(qr);
+ if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
+ if (qr->getQueue()->getSettings().autoDeleteDelay) {
+ // Start the auto-delete timer
+ Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId);
+ }
+ else {
+ // Delete immediately. Don't purge, the primary is gone so we need
+ // to reroute the deleted messages.
+ deleteQueue(qr->getQueue()->getName(), false);
+ }
+ }
+}
+
+// Callback function for accumulating exchange candidates
+namespace {
+ void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) {
+ c.push_back(i);
+ }
+}
+
+void BrokerReplicator::disconnected() {
+ QPID_LOG(info, logPrefix << "Disconnected from " << primary);
+ connection = 0;
+ // Clean up auto-delete queues
+ vector<boost::shared_ptr<Exchange> > collect;
+ // Make a copy so we can work outside the ExchangeRegistry lock
+ exchanges.eachExchange(
+ boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1));
+ for_each(collect.begin(), collect.end(),
+ boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1));
+}
+
+void BrokerReplicator::setMembership(const Variant::List& brokers) {
+ Membership& membership(haBroker.getMembership());
+ membership.assign(brokers);
+ // Check if the primary has signalled a change in my status:
+ // from CATCHUP to READY when we are caught up.
+ // from READY TO CATCHUP if we are timed out during fail-over.
+ BrokerInfo info;
+ if (membership.get(membership.getSelf(), info)) {
+ BrokerStatus oldStatus = haBroker.getStatus();
+ BrokerStatus newStatus = info.getStatus();
+ if (oldStatus == CATCHUP && newStatus == READY) {
+ QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready");
+ haBroker.getMembership().setStatus(READY);
+ }
+ else if (oldStatus == READY && newStatus == CATCHUP) {
+ QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up");
+ haBroker.getMembership().setStatus(CATCHUP);
+ }
+ }
+}
+
}} // namespace broker
diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h
index f6983e8719..a42d607263 100644
--- a/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/cpp/src/qpid/ha/BrokerReplicator.h
@@ -31,6 +31,7 @@
#include "qpid/management/ManagementObject.h"
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
+#include <set>
namespace qpid {
@@ -40,6 +41,9 @@ class Broker;
class Link;
class Bridge;
class SessionHandler;
+class Connection;
+class QueueRegistry;
+class ExchangeRegistry;
}
namespace framing {
@@ -58,7 +62,9 @@ class QueueReplicator;
* exchanges and bindings to replicate the primary.
* It also creates QueueReplicators for newly replicated queues.
*
- * THREAD UNSAFE: Only called in Link connection thread, no need for locking.
+ * THREAD UNSAFE:
+ * All members except shutdown are only called in the Link's connection thread context.
+ * shutdown() does not use any mutable state.
*
*/
class BrokerReplicator : public broker::Exchange,
@@ -76,6 +82,7 @@ class BrokerReplicator : public broker::Exchange,
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store);
void route(broker::Deliverable&);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
+ void shutdown();
// DataSource interface - used to write persistence data to async store
uint64_t getSize();
@@ -83,8 +90,20 @@ class BrokerReplicator : public broker::Exchange,
private:
typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
+ typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult;
+ typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult;
- void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ typedef std::pair<std::string,std::string> EventKey;
+ typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&);
+ typedef std::map<EventKey, DispatchFunction> EventDispatchMap;
+
+ typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap;
+
+ class UpdateTracker;
+ class ErrorListener;
+ class ConnectionObserver;
+
+ void connected(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
@@ -93,6 +112,7 @@ class BrokerReplicator : public broker::Exchange,
void doEventBind(types::Variant::Map&);
void doEventUnbind(types::Variant::Map&);
void doEventMembersUpdate(types::Variant::Map&);
+ void doEventSubscribe(types::Variant::Map&);
void doResponseQueue(types::Variant::Map& values);
void doResponseExchange(types::Variant::Map& values);
@@ -100,32 +120,50 @@ class BrokerReplicator : public broker::Exchange,
void doResponseHaBroker(types::Variant::Map& values);
QueueReplicatorPtr findQueueReplicator(const std::string& qname);
- void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- void stopQueueReplicator(const std::string& name);
+ QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
- boost::shared_ptr<broker::Queue> createQueue(
+ QueueReplicatorPtr replicateQueue(
const std::string& name,
bool durable,
bool autodelete,
const qpid::framing::FieldTable& arguments,
const std::string& alternateExchange);
- boost::shared_ptr<broker::Exchange> createExchange(
+ CreateExchangeResult createExchange(
const std::string& name,
const std::string& type,
bool durable,
const qpid::framing::FieldTable& args,
const std::string& alternateExchange);
+ bool deactivate(boost::shared_ptr<broker::Exchange> ex, bool destroy);
+ void deleteQueue(const std::string& name, bool purge=true);
+ void deleteExchange(const std::string& name);
+
+ void autoDeleteCheck(boost::shared_ptr<broker::Exchange>);
+
+ void disconnected();
+
+ void setMembership(const types::Variant::List&); // Set membership from list.
+
std::string logPrefix;
- std::string userId, remoteHost;
ReplicationTest replicationTest;
+ std::string userId, remoteHost;
HaBroker& haBroker;
broker::Broker& broker;
+ broker::ExchangeRegistry& exchanges;
+ broker::QueueRegistry& queues;
boost::shared_ptr<broker::Link> link;
bool initialized;
AlternateExchangeSetter alternates;
qpid::Address primary;
+ typedef std::set<std::string> StringSet;
+ StringSet replicatedExchanges; // exchanges that have been replicated.
+ broker::Connection* connection;
+ EventDispatchMap dispatch;
+ std::auto_ptr<UpdateTracker> queueTracker;
+ std::auto_ptr<UpdateTracker> exchangeTracker;
+ boost::shared_ptr<ConnectionObserver> connectionObserver;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/ha/ConnectionObserver.cpp b/cpp/src/qpid/ha/ConnectionObserver.cpp
index 81ba3e4301..775222efd3 100644
--- a/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -30,7 +30,7 @@ namespace qpid {
namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
- : haBroker(hb), logPrefix("Connections: "), self(uuid) {}
+ : haBroker(hb), logPrefix("Backup: "), self(uuid) {}
bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
framing::FieldTable ft;
@@ -41,9 +41,11 @@ bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, Bro
return false;
}
-void ConnectionObserver::setObserver(const ObserverPtr& o){
+void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix)
+{
sys::Mutex::ScopedLock l(lock);
observer = o;
+ logPrefix = newlogPrefix;
}
ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() {
diff --git a/cpp/src/qpid/ha/ConnectionObserver.h b/cpp/src/qpid/ha/ConnectionObserver.h
index e3a6d1154a..5374660dbe 100644
--- a/cpp/src/qpid/ha/ConnectionObserver.h
+++ b/cpp/src/qpid/ha/ConnectionObserver.h
@@ -55,7 +55,7 @@ class ConnectionObserver : public broker::ConnectionObserver
ConnectionObserver(HaBroker& haBroker, const types::Uuid& self);
- void setObserver(const ObserverPtr&);
+ void setObserver(const ObserverPtr&, const std::string& logPrefix);
ObserverPtr getObserver();
void opened(broker::Connection& connection);
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp
index ffbcb684bc..590db7efa5 100644
--- a/cpp/src/qpid/ha/HaBroker.cpp
+++ b/cpp/src/qpid/ha/HaBroker.cpp
@@ -26,6 +26,7 @@
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
+#include "StandAlone.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
@@ -41,7 +42,6 @@
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h"
-#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
@@ -54,134 +54,100 @@ using namespace std;
using types::Variant;
using types::Uuid;
using sys::Mutex;
+using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
// Called in Plugin::earlyInitialize
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : logPrefix("Broker: "),
- broker(b),
- systemId(broker.getSystem()->getSystemId().data()),
+ : systemId(b.getSystem()->getSystemId().data()),
settings(s),
+ broker(b),
observer(new ConnectionObserver(*this, systemId)),
- mgmtObject(0),
- status(STANDALONE),
- membership(systemId),
- replicationTest(s.replicateDefault.get())
+ role(new StandAlone),
+ membership(BrokerInfo(systemId, STANDALONE), *this)
{
// If we are joining a cluster we must start excluding clients now,
// otherwise there's a window for a client to connect before we get to
// initialize()
if (settings.cluster) {
- QPID_LOG(debug, logPrefix << "Rejecting client connections.");
- observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>(
- new BackupConnectionExcluder));
+ QPID_LOG(debug, "Broker startup, rejecting client connections.");
+ shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder);
+ observer->setObserver(excluder, "Backup: ");
broker.getConnectionObservers().add(observer);
}
}
+namespace {
+const std::string NONE("none");
+bool isNone(const std::string& x) { return x.empty() || x == NONE; }
+}
+
// Called in Plugin::initialize
void HaBroker::initialize() {
-
// FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port.
- brokerInfo = BrokerInfo(
- broker.getSystem()->getNodeName(),
- broker.getPort(broker::Broker::TCP_TRANSPORT),
- systemId);
-
- QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo);
+ membership.add(
+ BrokerInfo(
+ membership.getSelf(),
+ settings.cluster ? JOINING : membership.getStatus(),
+ broker.getSystem()->getNodeName(),
+ broker.getPort(broker::Broker::TCP_TRANSPORT)
+ )
+ );
+ QPID_LOG(notice, "Initializing: " << membership.getInfo());
// Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
if (settings.cluster && !ma)
throw Exception("Cannot start HA: management is disabled");
_qmf::Package packageInit(ma);
- mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
+ mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker"));
mgmtObject->set_replicateDefault(settings.replicateDefault.str());
mgmtObject->set_systemId(systemId);
ma->addObject(mgmtObject);
+ membership.setMgmtObject(mgmtObject);
// Register a factory for replicating subscriptions.
broker.getConsumerFactories().add(
- boost::shared_ptr<ReplicatingSubscription::Factory>(
+ shared_ptr<ReplicatingSubscription::Factory>(
new ReplicatingSubscription::Factory()));
// If we are in a cluster, start as backup in joining state.
if (settings.cluster) {
- status = JOINING;
- backup.reset(new Backup(*this, settings));
+ assert(membership.getStatus() == JOINING);
+ role.reset(new Backup(*this, settings));
broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+ if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl));
+ if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl));
}
-
- if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl));
- if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl));
-
-
- // NOTE: lock is not needed in a constructor, but create one
- // to pass to functions that have a ScopedLock parameter.
- Mutex::ScopedLock l(lock);
- statusChanged(l);
}
HaBroker::~HaBroker() {
- QPID_LOG(notice, logPrefix << "Shut down: " << brokerInfo);
+ QPID_LOG(notice, role->getLogPrefix() << "Shut down");
broker.getConnectionObservers().remove(observer);
}
-void HaBroker::recover() {
- auto_ptr<Backup> b;
- {
- Mutex::ScopedLock l(lock);
- // No longer replicating, close link. Note: link must be closed before we
- // setStatus(RECOVERING) as that will remove our broker info from the
- // outgoing link properties so we won't recognize self-connects.
- b = backup;
- }
- b.reset(); // Call destructor outside of lock.
- BrokerInfo::Set backups;
- {
- Mutex::ScopedLock l(lock);
- setStatus(RECOVERING, l);
- backups = membership.otherBackups();
- membership.reset(brokerInfo);
- // Drop the lock, new Primary may call back on activate.
- }
- // Outside of lock, may call back on activate()
- primary.reset(new Primary(*this, backups)); // Starts primary-ready check.
-}
-
-// Called back from Primary active check.
-void HaBroker::activate() { setStatus(ACTIVE); }
-
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
- switch (getStatus()) {
- case JOINING: recover(); break;
- case CATCHUP:
- QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
- throw Exception("Still catching up, cannot be promoted.");
- break;
- case READY: recover(); break;
- case RECOVERING: break;
- case ACTIVE: break;
- case STANDALONE: break;
- }
- break;
+ Role* r = role->promote();
+ if (r) role.reset(r);
+ break;
}
case _qmf::HaBroker::METHOD_SETBROKERSURL: {
setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url));
break;
}
case _qmf::HaBroker::METHOD_SETPUBLICURL: {
- setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
+ setPublicUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url));
break;
}
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, logPrefix << "Replicate individual queue "
+ QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue "
<< bq_args.i_queue << " from " << bq_args.i_broker);
- boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+ shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
Uuid uuid(true);
@@ -191,10 +157,10 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
false, // durable
settings.mechanism, settings.username, settings.password,
false); // no amq.failover - don't want to use client URL.
- boost::shared_ptr<broker::Link> link = result.first;
+ shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- boost::shared_ptr<QueueReplicator> qr(
+ shared_ptr<QueueReplicator> qr(
new QueueReplicator(*this, queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
@@ -207,31 +173,23 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
return Manageable::STATUS_OK;
}
-void HaBroker::setClientUrl(const Url& url) {
+void HaBroker::setPublicUrl(const Url& url) {
Mutex::ScopedLock l(lock);
- if (url.empty()) throw Exception("Invalid empty URL for HA client failover");
- clientUrl = url;
- updateClientUrl(l);
-}
-
-void HaBroker::updateClientUrl(Mutex::ScopedLock&) {
- Url url = clientUrl.empty() ? brokerUrl : clientUrl;
- if (url.empty()) throw Url::Invalid("HA client URL is empty");
+ publicUrl = url;
mgmtObject->set_publicUrl(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
+ QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url) {
- Mutex::ScopedLock l(lock);
- if (url.empty()) throw Url::Invalid("HA broker URL is empty");
- brokerUrl = url;
- mgmtObject->set_brokersUrl(brokerUrl.str());
- QPID_LOG(info, logPrefix << "Broker URL set to: " << url);
- if (backup.get()) backup->setBrokerUrl(brokerUrl);
- // Updating broker URL also updates defaulted client URL:
- if (clientUrl.empty()) updateClientUrl(l);
+ {
+ Mutex::ScopedLock l(lock);
+ brokerUrl = url;
+ mgmtObject->set_brokersUrl(brokerUrl.str());
+ QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url);
+ }
+ role->setBrokerUrl(url); // Oustside lock
}
std::vector<Url> HaBroker::getKnownBrokers() const {
@@ -239,116 +197,14 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
return knownBrokers;
}
-void HaBroker::shutdown() {
- QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
+void HaBroker::shutdown(const std::string& message) {
+ QPID_LOG(critical, message);
broker.shutdown();
+ throw Exception(message);
}
BrokerStatus HaBroker::getStatus() const {
- Mutex::ScopedLock l(lock);
- return status;
-}
-
-void HaBroker::setStatus(BrokerStatus newStatus) {
- Mutex::ScopedLock l(lock);
- setStatus(newStatus, l);
-}
-
-namespace {
-bool checkTransition(BrokerStatus from, BrokerStatus to) {
- // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
- static const BrokerStatus TRANSITIONS[][2] = {
- { JOINING, CATCHUP }, // Connected to primary
- { JOINING, RECOVERING }, // Chosen as initial primary.
- { CATCHUP, READY }, // Caught up all queues, ready to take over.
- { READY, RECOVERING }, // Chosen as new primary
- { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
- { RECOVERING, ACTIVE } // All expected backups are ready
- };
- static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
- for (size_t i = 0; i < N; ++i) {
- if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
- return true;
- }
- return false;
-}
-} // namespace
-
-void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) {
- QPID_LOG(info, logPrefix << "Status change: "
- << printable(status) << " -> " << printable(newStatus));
- bool legal = checkTransition(status, newStatus);
- assert(legal);
- if (!legal) {
- QPID_LOG(critical, logPrefix << "Illegal state transition: "
- << printable(status) << " -> " << printable(newStatus));
- shutdown();
- }
- status = newStatus;
- statusChanged(l);
-}
-
-void HaBroker::statusChanged(Mutex::ScopedLock& l) {
- mgmtObject->set_status(printable(status).str());
- brokerInfo.setStatus(status);
- setLinkProperties(l);
-}
-
-void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
- QPID_LOG(info, logPrefix << "Membership changed: " << membership);
- Variant::List brokers = membership.asList();
- mgmtObject->set_members(brokers);
- broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
-}
-
-void HaBroker::setMembership(const Variant::List& brokers) {
- Mutex::ScopedLock l(lock);
- membership.assign(brokers);
- QPID_LOG(info, logPrefix << "Membership update: " << membership);
- BrokerInfo info;
- // Update my status to what the primary says it is. The primary can toggle
- // status between READY and CATCHUP based on the state of our subscriptions.
- if (membership.get(systemId, info) && status != info.getStatus()) {
- setStatus(info.getStatus(), l);
- if (backup.get()) backup->setStatus(status);
- }
- membershipUpdated(l);
-}
-
-void HaBroker::resetMembership(const BrokerInfo& b) {
- Mutex::ScopedLock l(lock);
- membership.reset(b);
- QPID_LOG(debug, logPrefix << "Membership reset to: " << membership);
- membershipUpdated(l);
-}
-
-void HaBroker::addBroker(const BrokerInfo& b) {
- Mutex::ScopedLock l(lock);
- membership.add(b);
- QPID_LOG(debug, logPrefix << "Membership add: " << b);
- membershipUpdated(l);
-}
-
-void HaBroker::removeBroker(const Uuid& id) {
- Mutex::ScopedLock l(lock);
- membership.remove(id);
- QPID_LOG(debug, logPrefix << "Membership remove: " << id);
- membershipUpdated(l);
-}
-
-void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
- framing::FieldTable linkProperties = broker.getLinkClientProperties();
- if (isBackup(status)) {
- // If this is a backup then any outgoing links are backup
- // links and need to be tagged.
- linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
- }
- else {
- // If this is a primary then any outgoing links are federation links
- // and should not be tagged.
- linkProperties.erase(ConnectionObserver::BACKUP_TAG);
- }
- broker.setLinkClientProperties(linkProperties);
+ return membership.getStatus();
}
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h
index 7dabe6e35b..6b15c88e0a 100644
--- a/cpp/src/qpid/ha/HaBroker.h
+++ b/cpp/src/qpid/ha/HaBroker.h
@@ -25,14 +25,12 @@
#include "BrokerInfo.h"
#include "Membership.h"
#include "types.h"
-#include "ReplicationTest.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include "qpid/management/Manageable.h"
#include "qpid/types/Variant.h"
-#include <memory>
#include <set>
#include <boost/shared_ptr.hpp>
@@ -54,11 +52,15 @@ namespace ha {
class Backup;
class ConnectionObserver;
class Primary;
-
+class Role;
/**
* HA state and actions associated with a HA broker. Holds all the management info.
*
* THREAD SAFE: may be called in arbitrary broker IO or timer threads.
+
+ * NOTE: HaBroker and Role subclasses follow this lock hierarchy:
+ * - HaBroker MUST NOT hold its own lock across calls Role subclasses.
+ * - Role subclasses MAY hold their locks accross calls to HaBroker.
*/
class HaBroker : public management::Manageable
{
@@ -71,66 +73,46 @@ class HaBroker : public management::Manageable
void initialize();
// Implement Manageable.
- qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; }
+ qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; }
management::Manageable::status_t ManagementMethod (
uint32_t methodId, management::Args& args, std::string& text);
broker::Broker& getBroker() { return broker; }
const Settings& getSettings() const { return settings; }
- /** Shut down the broker. Caller should log a critical error message. */
- void shutdown();
+ /** Shut down the broker because of a critical error. */
+ void shutdown(const std::string& message);
BrokerStatus getStatus() const;
- void setStatus(BrokerStatus);
- void activate();
-
- Backup* getBackup() { return backup.get(); }
- ReplicationTest getReplicationTest() const { return replicationTest; }
-
boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
- const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
-
- void setMembership(const types::Variant::List&); // Set membership from list.
- void resetMembership(const BrokerInfo& b); // Reset to contain just one member.
- void addBroker(const BrokerInfo& b); // Add a broker to the membership.
- void removeBroker(const types::Uuid& id); // Remove a broker from membership.
-
+ BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
+ Membership& getMembership() { return membership; }
types::Uuid getSystemId() const { return systemId; }
private:
- void setClientUrl(const Url&);
+
+ void setPublicUrl(const Url&);
void setBrokerUrl(const Url&);
void updateClientUrl(sys::Mutex::ScopedLock&);
- bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); }
-
- void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
- void recover();
- void statusChanged(sys::Mutex::ScopedLock&);
- void setLinkProperties(sys::Mutex::ScopedLock&);
-
std::vector<Url> getKnownBrokers() const;
- void membershipUpdated(sys::Mutex::ScopedLock&);
-
- std::string logPrefix;
- broker::Broker& broker;
- types::Uuid systemId;
+ // Immutable members
+ const types::Uuid systemId;
const Settings settings;
+ // Member variables protected by lock
mutable sys::Mutex lock;
- boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
- std::auto_ptr<Backup> backup;
- std::auto_ptr<Primary> primary;
- qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
- Url clientUrl, brokerUrl;
+ Url publicUrl, brokerUrl;
std::vector<Url> knownBrokers;
- BrokerStatus status;
- BrokerInfo brokerInfo;
+
+ // Independently thread-safe member variables
+ broker::Broker& broker;
+ qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject;
+ boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary
+ boost::shared_ptr<Role> role;
Membership membership;
- ReplicationTest replicationTest;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp
index f7fe553d9b..d26b466847 100644
--- a/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/cpp/src/qpid/ha/HaPlugin.cpp
@@ -33,9 +33,11 @@ struct Options : public qpid::Options {
addOptions()
("ha-cluster", optValue(settings.cluster, "yes|no"),
"Join a HA active/passive cluster.")
+ ("ha-queue-replication", optValue(settings.queueReplication, "yes|no"),
+ "Enable replication of specific queues without joining a cluster")
("ha-brokers-url", optValue(settings.brokerUrl,"URL"),
"URL with address of each broker in the cluster.")
- ("ha-public-url", optValue(settings.clientUrl,"URL"),
+ ("ha-public-url", optValue(settings.publicUrl,"URL"),
"URL advertized to clients to connect to the cluster.")
("ha-replicate",
optValue(settings.replicateDefault, "LEVEL"),
@@ -48,6 +50,10 @@ struct Options : public qpid::Options {
"Authentication mechanism for connections between HA brokers")
("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"),
"Maximum time to wait for an expected backup to connect and become ready.")
+ ("ha-flow-messages", optValue(settings.flowMessages, "N"),
+ "Flow control message count limit for replication, 0 means no limit")
+ ("ha-flow-bytes", optValue(settings.flowBytes, "N"),
+ "Flow control byte limit for replication, 0 means no limit")
;
}
};
@@ -64,17 +70,23 @@ struct HaPlugin : public Plugin {
void earlyInitialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker) {
- // Must create the HaBroker in earlyInitialize so it can set up its
- // connection observer before clients start connecting.
- haBroker.reset(new ha::HaBroker(*broker, settings));
- broker->addFinalizer(boost::bind(&HaPlugin::finalize, this));
+ if (broker && (settings.cluster || settings.queueReplication)) {
+ if (!broker->getManagementAgent()) {
+ QPID_LOG(info, "HA plugin disabled because management is disabled");
+ if (settings.cluster)
+ throw Exception("Cannot start HA: management is disabled");
+ } else {
+ // Must create the HaBroker in earlyInitialize so it can set up its
+ // connection observer before clients start connecting.
+ haBroker.reset(new ha::HaBroker(*broker, settings));
+ broker->addFinalizer(boost::bind(&HaPlugin::finalize, this));
+ }
}
}
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker) haBroker->initialize();
+ if (broker && haBroker.get()) haBroker->initialize();
}
void finalize() {
diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp
index 74580f9b1e..6c64d86fd7 100644
--- a/cpp/src/qpid/ha/Membership.cpp
+++ b/cpp/src/qpid/ha/Membership.cpp
@@ -19,6 +19,12 @@
*
*/
#include "Membership.h"
+#include "HaBroker.h"
+#include "qpid/broker/Broker.h"
+#include "qpid/management/ManagementAgent.h"
+#include "qpid/types/Variant.h"
+#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h"
+#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include <boost/bind.hpp>
#include <iostream>
#include <iterator>
@@ -26,37 +32,58 @@
namespace qpid {
namespace ha {
+namespace _qmf = ::qmf::org::apache::qpid::ha;
-void Membership::reset(const BrokerInfo& b) {
+using sys::Mutex;
+using types::Variant;
+
+Membership::Membership(const BrokerInfo& info, HaBroker& b)
+ : haBroker(b), self(info.getSystemId())
+{
+ brokers[self] = info;
+}
+
+void Membership::clear() {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo me = brokers[self];
brokers.clear();
- brokers[b.getSystemId()] = b;
+ brokers[self] = me;
}
void Membership::add(const BrokerInfo& b) {
+ Mutex::ScopedLock l(lock);
brokers[b.getSystemId()] = b;
+ update(l);
}
void Membership::remove(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
+ if (id == self) return; // Never remove myself
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
- }
+ update(l);
+ }
}
bool Membership::contains(const types::Uuid& id) {
+ Mutex::ScopedLock l(lock);
return brokers.find(id) != brokers.end();
}
void Membership::assign(const types::Variant::List& list) {
- brokers.clear();
+ Mutex::ScopedLock l(lock);
+ clear();
for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
}
+ update(l);
}
types::Variant::List Membership::asList() const {
+ Mutex::ScopedLock l(lock);
types::Variant::List list;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
@@ -64,6 +91,7 @@ types::Variant::List Membership::asList() const {
}
BrokerInfo::Set Membership::otherBackups() const {
+ Mutex::ScopedLock l(lock);
BrokerInfo::Set result;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (i->second.getStatus() == READY && i->second.getSystemId() != self)
@@ -71,15 +99,84 @@ BrokerInfo::Set Membership::otherBackups() const {
return result;
}
-bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
- BrokerInfo::Map::iterator i = brokers.find(id);
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(id);
if (i == brokers.end()) return false;
result = i->second;
return true;
}
-std::ostream& operator<<(std::ostream& o, const Membership& members) {
- return o << members.brokers;
+void Membership::update(Mutex::ScopedLock& l) {
+ QPID_LOG(info, "Membership: " << brokers);
+ Variant::List brokers = asList();
+ if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str());
+ if (mgmtObject) mgmtObject->set_members(brokers);
+ haBroker.getBroker().getManagementAgent()->raiseEvent(
+ _qmf::EventMembersUpdate(brokers));
+}
+
+void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) {
+ Mutex::ScopedLock l(lock);
+ mgmtObject = mo;
+ update(l);
+}
+
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+ // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+ static const BrokerStatus TRANSITIONS[][2] = {
+ { STANDALONE, JOINING }, // Initialization of backup broker
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, RECOVERING }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { READY, RECOVERING }, // Chosen as new primary
+ { READY, CATCHUP }, // Timed out failing over, demoted to catch-up.
+ { RECOVERING, ACTIVE } // All expected backups are ready
+ };
+ static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+ for (size_t i = 0; i < N; ++i) {
+ if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+ return true;
+ }
+ return false;
+}
+} // namespace
+
+void Membership::setStatus(BrokerStatus newStatus) {
+ BrokerStatus status = getStatus();
+ QPID_LOG(info, "Status change: "
+ << printable(status) << " -> " << printable(newStatus));
+ bool legal = checkTransition(status, newStatus);
+ if (!legal) {
+ haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status)
+ << " -> " << printable(newStatus)));
+ }
+
+ Mutex::ScopedLock l(lock);
+ brokers[self].setStatus(newStatus);
+ if (mgmtObject) mgmtObject->set_status(printable(newStatus).str());
+ update(l);
+}
+
+BrokerStatus Membership::getStatus() const {
+ Mutex::ScopedLock l(lock);
+ return getStatus(l);
+}
+
+BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const {
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second.getStatus();
+}
+
+BrokerInfo Membership::getInfo() const {
+ Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::const_iterator i = brokers.find(self);
+ assert(i != brokers.end());
+ return i->second;
}
+// FIXME aconway 2013-01-23: move to .h?
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Membership.h b/cpp/src/qpid/ha/Membership.h
index 8406dccd5d..956569fbd8 100644
--- a/cpp/src/qpid/ha/Membership.h
+++ b/cpp/src/qpid/ha/Membership.h
@@ -24,45 +24,72 @@
#include "BrokerInfo.h"
#include "types.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/types/Variant.h"
#include <boost/function.hpp>
#include <set>
#include <vector>
#include <iosfwd>
+
+namespace qmf { namespace org { namespace apache { namespace qpid { namespace ha {
+class HaBroker;
+}}}}}
+
namespace qpid {
+
+namespace broker {
+class Broker;
+}
+
+namespace types {
+class Uuid;
+}
+
namespace ha {
+class HaBroker;
/**
* Keep track of the brokers in the membership.
- * THREAD UNSAFE: caller must serialize
+ * Send management when events on membership changes.
+ * THREAD SAFE
*/
class Membership
{
public:
- Membership(const types::Uuid& self_) : self(self_) {}
+ Membership(const BrokerInfo& info, HaBroker&);
- void reset(const BrokerInfo& b); ///< Reset to contain just one member.
+ void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>);
+
+ void clear(); ///< Clear all but self.
void add(const BrokerInfo& b);
void remove(const types::Uuid& id);
bool contains(const types::Uuid& id);
+
/** Return IDs of all READY backups other than self */
BrokerInfo::Set otherBackups() const;
void assign(const types::Variant::List&);
types::Variant::List asList() const;
- bool get(const types::Uuid& id, BrokerInfo& result);
+ bool get(const types::Uuid& id, BrokerInfo& result) const;
+
+ types::Uuid getSelf() const { return self; }
+ BrokerInfo getInfo() const;
+ BrokerStatus getStatus() const;
+ void setStatus(BrokerStatus s);
private:
- types::Uuid self;
+ void update(sys::Mutex::ScopedLock&);
+ BrokerStatus getStatus(sys::Mutex::ScopedLock&) const;
+
+ mutable sys::Mutex lock;
+ HaBroker& haBroker;
+ boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject;
+ const types::Uuid self;
BrokerInfo::Map brokers;
- friend std::ostream& operator<<(std::ostream&, const Membership&);
};
-std::ostream& operator<<(std::ostream&, const Membership&);
-
}} // namespace qpid::ha
#endif /*!QPID_HA_MEMBERSHIP_H*/
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp
index e4bf9671b8..93dbbbea85 100644
--- a/cpp/src/qpid/ha/Primary.cpp
+++ b/cpp/src/qpid/ha/Primary.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/FieldValue.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
@@ -39,6 +41,8 @@ namespace qpid {
namespace ha {
using sys::Mutex;
+using namespace std;
+using namespace framing;
namespace {
@@ -58,6 +62,8 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver
PrimaryConfigurationObserver(Primary& p) : primary(p) {}
void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); }
void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
+ void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
+ void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); }
private:
Primary& primary;
};
@@ -76,8 +82,11 @@ class ExpectedBackupTimerTask : public sys::TimerTask {
Primary* Primary::instance = 0;
Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
- haBroker(hb), logPrefix("Primary: "), active(false)
+ haBroker(hb), membership(hb.getMembership()),
+ logPrefix("Primary: "), active(false),
+ replicationTest(hb.getSettings().replicateDefault.get())
{
+ hb.getMembership().setStatus(RECOVERING);
assert(instance == 0);
instance = this; // Let queue replicators find us.
if (expect.empty()) {
@@ -89,11 +98,10 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
// the QueueGuards are created.
QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(*i, haBroker.getReplicationTest(), false));
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) expectedBackups.insert(backup);
- backup->setInitialQueues(hb.getBroker().getQueues(), true); // Create guards
+ backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
}
// Set timeout for expected brokers to connect and become ready.
sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC));
@@ -102,14 +110,21 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
hb.getBroker().getTimer().add(timerTask);
}
+
+ // Remove backup tag property from outgoing link properties.
+ framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties();
+ linkProperties.erase(ConnectionObserver::BACKUP_TAG);
+ hb.getBroker().setLinkClientProperties(linkProperties);
+
configurationObserver.reset(new PrimaryConfigurationObserver(*this));
haBroker.getBroker().getConfigurationObservers().add(configurationObserver);
Mutex::ScopedLock l(lock); // We are now active as a configurationObserver
checkReady(l);
+
// Allow client connections
connectionObserver.reset(new PrimaryConnectionObserver(*this));
- haBroker.getObserver()->setObserver(connectionObserver);
+ haBroker.getObserver()->setObserver(connectionObserver, logPrefix);
}
Primary::~Primary() {
@@ -122,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLock&) {
active = true;
Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
- haBroker.activate();
+ membership.setStatus(ACTIVE);
}
}
@@ -130,7 +145,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
if (i != backups.end() && i->second->reportReady()) {
BrokerInfo info = i->second->getBrokerInfo();
info.setStatus(READY);
- haBroker.addBroker(info);
+ membership.add(info);
if (expectedBackups.erase(i->second)) {
QPID_LOG(info, logPrefix << "Expected backup is ready: " << info);
checkReady(l);
@@ -155,9 +170,10 @@ void Primary::timeoutExpectedBackups() {
expectedBackups.erase(i++);
backups.erase(info.getSystemId());
rb->cancel();
- // Downgrade the broker to CATCHUP
+ // Downgrade the broker's status to CATCHUP
+ // The broker will get this status change when it eventually connects.
info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ membership.add(info);
}
else ++i;
}
@@ -178,46 +194,78 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) {
}
}
+// NOTE: Called with queue registry lock held.
void Primary::queueCreate(const QueuePtr& q) {
- // Throw if there is an invalid replication level in the queue settings.
- haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings);
- Mutex::ScopedLock l(lock);
- for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
- i->second->queueCreate(q);
- checkReady(i, l);
+ // Set replication argument.
+ ReplicateLevel level = replicationTest.useLevel(*q);
+ QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+ << " replication: " << printable(level));
+ q->addArgument(QPID_REPLICATE, printable(level).str());
+ if (level) {
+ // Give each queue a unique id to avoid confusion of same-named queues.
+ q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
+ Mutex::ScopedLock l(lock);
+ for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
+ i->second->queueCreate(q);
+ checkReady(i, l);
+ }
}
}
+// NOTE: Called with queue registry lock held.
void Primary::queueDestroy(const QueuePtr& q) {
+ QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
Mutex::ScopedLock l(lock);
for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
i->second->queueDestroy(q);
checkReady(l);
}
+// NOTE: Called with exchange registry lock held.
+void Primary::exchangeCreate(const ExchangePtr& ex) {
+ ReplicateLevel level = replicationTest.useLevel(*ex);
+ QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
+ << " replication: " << printable(level));
+ FieldTable args = ex->getArgs();
+ args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg.
+ if (level) {
+ // Give each exchange a unique id to avoid confusion of same-named exchanges.
+ args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
+ }
+ ex->setArgs(args);
+}
+
+// NOTE: Called with exchange registry lock held.
+void Primary::exchangeDestroy(const ExchangePtr& ex) {
+ QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
+ // Do nothing
+ }
+
void Primary::opened(broker::Connection& connection) {
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
- QPID_LOG(debug, logPrefix << "New backup connected: " << info);
- boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(info, haBroker.getReplicationTest(), true));
+ QPID_LOG(info, logPrefix << "New backup connected: " << info);
+ boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
{
// Avoid deadlock with queue registry lock.
Mutex::ScopedUnlock u(lock);
- backup->setInitialQueues(haBroker.getBroker().getQueues(), false);
+ backup->setCatchupQueues(haBroker.getBroker().getQueues(), false);
}
backups[info.getSystemId()] = backup;
+ i = backups.find(info.getSystemId());
}
else {
- QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
- i->second->setConnected(true);
- checkReady(i, l);
+ QPID_LOG(info, logPrefix << "Known backup connected: " << info);
+ i->second->setConnection(&connection);
}
- if (info.getStatus() == JOINING) info.setStatus(CATCHUP);
- haBroker.addBroker(info);
+ if (info.getStatus() == JOINING) {
+ info.setStatus(CATCHUP);
+ membership.add(info);
+ }
+ if (i != backups.end()) checkReady(i, l);
}
else
QPID_LOG(debug, logPrefix << "Accepted client connection "
@@ -225,19 +273,20 @@ void Primary::opened(broker::Connection& connection) {
}
void Primary::closed(broker::Connection& connection) {
- // NOTE: It is possible for a backup connection to be rejected while we are
- // a backup, but closed() is called after we have become primary.
- //
- // For this reason we do not remove from the backups map here, the backups
- // map holds all the backups we know about whether connected or not.
- //
- Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
- haBroker.removeBroker(info.getSystemId());
+ Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(info.getSystemId());
- if (i != backups.end()) i->second->setConnected(false);
+ // NOTE: It is possible for a backup connection to be rejected while we
+ // are a backup, but closed() is called after we have become primary.
+ // Checking isConnected() lets us ignore such spurious closes.
+ if (i != backups.end() && i->second->isConnected()) {
+ QPID_LOG(info, logPrefix << "Backup disconnected: " << info);
+ membership.remove(info.getSystemId());
+ expectedBackups.erase(i->second);
+ backups.erase(i);
+ checkReady(l);
+ }
}
}
@@ -249,4 +298,9 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI
return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q);
}
+Role* Primary::promote() {
+ QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo());
+ return 0;
+}
+
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h
index 26883f4416..ff85837882 100644
--- a/cpp/src/qpid/ha/Primary.h
+++ b/cpp/src/qpid/ha/Primary.h
@@ -24,6 +24,8 @@
#include "types.h"
#include "BrokerInfo.h"
+#include "ReplicationTest.h"
+#include "Role.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
#include <boost/intrusive_ptr.hpp>
@@ -48,6 +50,7 @@ class HaBroker;
class ReplicatingSubscription;
class RemoteBackup;
class QueueGuard;
+class Membership;
/**
* State associated with a primary broker:
@@ -56,22 +59,30 @@ class QueueGuard;
*
* THREAD SAFE: called concurrently in arbitrary connection threads.
*/
-class Primary
+class Primary : public Role
{
public:
typedef boost::shared_ptr<broker::Queue> QueuePtr;
+ typedef boost::shared_ptr<broker::Exchange> ExchangePtr;
static Primary* get() { return instance; }
Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups);
~Primary();
+ // Role implementation
+ std::string getLogPrefix() const { return logPrefix; }
+ Role* promote();
+ void setBrokerUrl(const Url&) {}
+
void readyReplica(const ReplicatingSubscription&);
void removeReplica(const std::string& q);
// Called via ConfigurationObserver
void queueCreate(const QueuePtr&);
void queueDestroy(const QueuePtr&);
+ void exchangeCreate(const ExchangePtr&);
+ void exchangeDestroy(const ExchangePtr&);
// Called via ConnectionObserver
void opened(broker::Connection& connection);
@@ -91,17 +102,19 @@ class Primary
sys::Mutex lock;
HaBroker& haBroker;
+ Membership& membership;
std::string logPrefix;
bool active;
+ ReplicationTest replicationTest;
+
/**
* Set of expected backups that must be ready before we declare ourselves
- * active
+ * active. These are backups that were known and ready before the primary
+ * crashed. As new primary we expect them to re-connect.
*/
BackupSet expectedBackups;
/**
- * Map of all the remote backups we know about: any expected backups plus
- * all actual backups that have connected. We do not remove entries when a
- * backup disconnects. @see Primary::closed()
+ * Map of all the expected backups plus all connected backups.
*/
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp
index 77e1f81a38..d06d88ca29 100644
--- a/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/cpp/src/qpid/ha/QueueGuard.cpp
@@ -50,10 +50,10 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0)
+ : cancelled(false), queue(q), subscription(0)
{
std::ostringstream os;
- os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
+ os << "Primary guard " << queue.getName() << "@" << info << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
queue.addObserver(observer);
@@ -66,45 +66,31 @@ QueueGuard::~QueueGuard() { cancel(); }
// NOTE: Called with message lock held.
void QueueGuard::enqueued(const Message& m) {
// Delay completion
- QPID_LOG(trace, logPrefix << "Delayed completion of " << m);
+ QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence());
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return; // Don't record enqueues after we are cancelled.
+ assert(delayed.find(m.getSequence()) == delayed.end());
+ delayed[m.getSequence()] = m.getIngressCompletion();
m.getIngressCompletion()->startCompleter();
- {
- Mutex::ScopedLock l(lock);
- if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) {
- QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence());
- assert(false);
- }
- }
}
// NOTE: Called with message lock held.
void QueueGuard::dequeued(const Message& m) {
QPID_LOG(trace, logPrefix << "Dequeued " << m);
- ReplicatingSubscription* rs=0;
- {
- Mutex::ScopedLock l(lock);
- rs = subscription;
- }
- if (rs) rs->dequeued(m);
- complete(m.getSequence());
-}
-
-void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) {
- for (Delayed::iterator i = begin; i != end; ++i) {
- QPID_LOG(trace, logPrefix << "Completed " << i->first);
- i->second->finishCompleter();
- }
+ Mutex::ScopedLock l(lock);
+ if (subscription) subscription->dequeued(m);
+ complete(m.getSequence(), l);
}
void QueueGuard::cancel() {
queue.removeObserver(observer);
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- if (delayed.empty()) return; // No need if no delayed messages.
- delayed.swap(removed);
+ Mutex::ScopedLock l(lock);
+ if (cancelled) return;
+ cancelled = true;
+ for (Delayed::iterator i = delayed.begin(); i != delayed.end();) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
}
void QueueGuard::attach(ReplicatingSubscription& rs) {
@@ -113,38 +99,36 @@ void QueueGuard::attach(ReplicatingSubscription& rs) {
}
bool QueueGuard::subscriptionStart(SequenceNumber position) {
- Delayed removed;
- {
- Mutex::ScopedLock l(lock);
- // Complete any messages before or at the ReplicatingSubscription start position.
- // Those messages are already on the backup.
- for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) {
- removed.insert(*i);
- delayed.erase(i++);
- }
+ // Complete any messages before or at the ReplicatingSubscription start position.
+ // Those messages are already on the backup.
+ Mutex::ScopedLock l(lock);
+ Delayed::iterator i = delayed.begin();
+ while(i != delayed.end() && i->first <= position) {
+ complete(i, l);
+ delayed.erase(i++);
}
- completeRange(removed.begin(), removed.end());
return position >= range.back;
}
void QueueGuard::complete(SequenceNumber sequence) {
- boost::intrusive_ptr<broker::AsyncCompletion> m;
- {
- Mutex::ScopedLock l(lock);
- // The same message can be completed twice, by
- // ReplicatingSubscription::acknowledged and dequeued. Remove it
- // from the map so we only call finishCompleter() once
- Delayed::iterator i = delayed.find(sequence);
- if (i != delayed.end()) {
- m = i->second;
- delayed.erase(i);
- }
+ Mutex::ScopedLock l(lock);
+ complete(sequence, l);
+}
+void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) {
+ // The same message can be completed twice, by
+ // ReplicatingSubscription::acknowledged and dequeued. Remove it
+ // from the map so we only call finishCompleter() once
+ Delayed::iterator i = delayed.find(sequence);
+ if (i != delayed.end()) {
+ complete(i, l);
+ delayed.erase(i);
}
- if (m) {
- QPID_LOG(trace, logPrefix << "Completed " << sequence);
- m->finishCompleter();
- }
+}
+
+void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) {
+ QPID_LOG(trace, logPrefix << "Completed " << i->first);
+ i->second->finishCompleter();
}
}} // namespaces qpid::ha
diff --git a/cpp/src/qpid/ha/QueueGuard.h b/cpp/src/qpid/ha/QueueGuard.h
index 3904b3bd3f..e7ceb351e8 100644
--- a/cpp/src/qpid/ha/QueueGuard.h
+++ b/cpp/src/qpid/ha/QueueGuard.h
@@ -54,6 +54,9 @@ class ReplicatingSubscription;
* THREAD SAFE: Concurrent calls:
* - enqueued() via QueueObserver in arbitrary connection threads.
* - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread.
+ *
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
*/
class QueueGuard {
public:
@@ -104,17 +107,20 @@ class QueueGuard {
private:
class QueueObserver;
+ typedef std::map<framing::SequenceNumber,
+ boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
+
+ void complete(framing::SequenceNumber, sys::Mutex::ScopedLock &);
+ void complete(Delayed::iterator, sys::Mutex::ScopedLock &);
sys::Mutex lock;
+ bool cancelled;
std::string logPrefix;
broker::Queue& queue;
- typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed;
Delayed delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
QueueRange range;
-
- void completeRange(Delayed::iterator begin, Delayed::iterator end);
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/QueueRange.h b/cpp/src/qpid/ha/QueueRange.h
index d734326910..f67ac146e6 100644
--- a/cpp/src/qpid/ha/QueueRange.h
+++ b/cpp/src/qpid/ha/QueueRange.h
@@ -24,6 +24,7 @@
#include "ReplicatingSubscription.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueCursor.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/SequenceNumber.h"
#include <iostream>
@@ -51,15 +52,7 @@ struct QueueRange {
QueueRange() : front(1), back(0) { } // Empty range.
- QueueRange(broker::Queue& q) {
- if (ReplicatingSubscription::getFront(q, front))
- back = q.getPosition();
- else {
- back = q.getPosition();
- front = back+1; // empty
- }
- assert(front <= back + 1);
- }
+ QueueRange(broker::Queue& q) { q.getRange(front, back, broker::REPLICATOR); }
QueueRange(const framing::FieldTable& args) {
back = args.getAsInt(ReplicatingSubscription::QPID_BACK);
diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp
index cac1fdac29..98220b2098 100644
--- a/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -22,12 +22,15 @@
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
+#include "Settings.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/SequenceSet.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
@@ -37,43 +40,89 @@
namespace {
const std::string QPID_REPLICATOR_("qpid.replicator-");
const std::string TYPE_NAME("qpid.queue-replicator");
-const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
+const std::string QPID_HA("qpid.ha-");
}
namespace qpid {
namespace ha {
using namespace broker;
using namespace framing;
+using namespace std;
+using sys::Mutex;
-const std::string QPID_HA_EVENT_PREFIX("qpid.ha-event:");
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
-const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position");
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue");
+const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA+"position");
+const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
}
+bool QueueReplicator::isReplicatorName(const std::string& name) {
+ return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0;
+}
+
bool QueueReplicator::isEventKey(const std::string key) {
- const std::string& prefix = QPID_HA_EVENT_PREFIX;
+ const std::string& prefix = QPID_HA;
bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
return ret;
}
+class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
+ public:
+ ErrorListener(const std::string& prefix) : logPrefix(prefix) {}
+ void connectionException(framing::connection::CloseCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Connection error: " << msg);
+ }
+ void channelException(framing::session::DetachCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Channel error: " << msg);
+ }
+ void executionException(framing::execution::ErrorCode, const std::string& msg) {
+ QPID_LOG(error, logPrefix << "Execution error: " << msg);
+ }
+ void detach() {
+ QPID_LOG(debug, logPrefix << "Session detached");
+ }
+ private:
+ std::string logPrefix;
+};
+
+class QueueReplicator::QueueObserver : public broker::QueueObserver {
+ public:
+ QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {}
+ void enqueued(const Message&) {}
+ void dequeued(const Message&) {}
+ void acquired(const Message&) {}
+ void requeued(const Message&) {}
+ void consumerAdded( const Consumer& ) {}
+ void consumerRemoved( const Consumer& ) {}
+ // Queue observer is destroyed when the queue is.
+ void destroy() { queueReplicator->destroy(); }
+ private:
+ boost::shared_ptr<QueueReplicator> queueReplicator;
+};
+
QueueReplicator::QueueReplicator(HaBroker& hb,
boost::shared_ptr<Queue> q,
boost::shared_ptr<Link> l)
: Exchange(replicatorName(q->getName()), 0, q->getBroker()),
haBroker(hb),
logPrefix("Backup queue "+q->getName()+": "),
- queue(q), link(l), brokerInfo(hb.getBrokerInfo())
+ queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
+ settings(hb.getSettings())
{
+ args.setString(QPID_REPLICATE, printable(NONE).str());
Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
+ framing::FieldTable args = getArgs();
+ args.setString(QPID_REPLICATE, printable(NONE).str());
+ setArgs(args);
}
// This must be separate from the constructor so we can call shared_from_this.
void QueueReplicator::activate() {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
std::pair<Bridge::shared_ptr, bool> result =
queue->getBroker()->getLinks().declare(
bridgeName,
@@ -93,48 +142,57 @@ void QueueReplicator::activate() {
boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
);
bridge = result.first;
+ bridge->setErrorListener(
+ boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix)));
+ boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this()));
+ queue->addObserver(observer);
}
-QueueReplicator::~QueueReplicator() { deactivate(); }
+QueueReplicator::~QueueReplicator() {}
-void QueueReplicator::deactivate() {
- // destroy the route
- sys::Mutex::ScopedLock l(lock);
- if (bridge) {
- bridge->close();
- bridge.reset();
- QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName);
- }
+void QueueReplicator::destroy() {
+ // Called from Queue::destroyed()
+ Mutex::ScopedLock l(lock);
+ if (!bridge) return;
+ QPID_LOG(debug, logPrefix << "Destroyed.");
+ bridge->close();
+ // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
+ queue.reset();
+ link.reset();
+ bridge.reset();
+ getBroker()->getExchanges().destroy(getName());
}
// Called in a broker connection thread when the bridge is created.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
- FieldTable settings;
- settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
- settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
- settings.setInt(ReplicatingSubscription::QPID_BACK,
- queue->getPosition());
- settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
- brokerInfo.asFieldTable());
- SequenceNumber front;
- if (ReplicatingSubscription::getFront(*queue, front)) {
- settings.setInt(ReplicatingSubscription::QPID_FRONT, front);
- QPID_LOG(debug, "QPID_FRONT for " << queue->getName() << " is " << front);
+ FieldTable arguments;
+ arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
+ arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize?
+ arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition());
+ arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable());
+ SequenceNumber front, back;
+ queue->getRange(front, back, broker::REPLICATOR);
+ if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, front);
+ try {
+ peer.getMessage().subscribe(
+ args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
+ false/*exclusive*/, "", 0, arguments);
+ peer.getMessage().setFlowMode(getName(), 1); // Window
+ peer.getMessage().flow(getName(), 0, settings.getFlowMessages());
+ peer.getMessage().flow(getName(), 1, settings.getFlowBytes());
+ }
+ catch(const exception& e) {
+ QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what()));
+ throw;
}
- peer.getMessage().subscribe(
- args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
- false/*exclusive*/, "", 0, settings);
- // FIXME aconway 2012-05-22: use a finite credit window?
- peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
- peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-
qpid::Address primary;
link->getRemoteAddress(primary);
QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")");
- QPID_LOG(trace, logPrefix << "Subscription settings: " << settings);
+ QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
}
namespace {
@@ -147,17 +205,35 @@ template <class T> T decodeContent(Message& m) {
}
}
-void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
+void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
+ boost::shared_ptr<Queue> q;
+ {
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
+ q = queue;
+ }
// Thread safe: only calls thread safe Queue functions.
queue->dequeueMessageAt(n);
}
+namespace {
+bool getSequence(const Message& message, SequenceNumber& result) {
+ result = message.getSequence();
+ return true;
+}
+bool getNext(broker::Queue& q, SequenceNumber position, SequenceNumber& result) {
+ QueueCursor cursor(REPLICATOR);
+ return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1);
+}
+} // namespace
+
// Called in connection thread of the queues bridge to primary.
void QueueReplicator::route(Deliverable& msg)
{
try {
const std::string& key = msg.getMessage().getRoutingKey();
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
if (!isEventKey(key)) {
msg.deliverTo(queue);
// We are on a backup so the queue is not modified except via this.
@@ -176,16 +252,15 @@ void QueueReplicator::route(Deliverable& msg)
<< " to " << position);
// Verify that there are no messages after the new position in the queue.
SequenceNumber next;
- if (ReplicatingSubscription::getNext(*queue, position, next))
- throw Exception("Invalid position move, preceeds messages");
+ if (getNext(*queue, position, next))
+ throw Exception(QPID_MSG(logPrefix << "Invalid position " << position
+ << " preceeds message at " << next));
queue->setPosition(position);
}
// Ignore unknown event keys, may be introduced in later versions.
}
catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
- haBroker.shutdown();
- throw;
+ haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what()));
}
}
diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h
index 8d8a41a5ba..e8a793f611 100644
--- a/cpp/src/qpid/ha/QueueReplicator.h
+++ b/cpp/src/qpid/ha/QueueReplicator.h
@@ -41,6 +41,7 @@ class Deliverable;
namespace ha {
class HaBroker;
+class Settings;
/**
* Exchange created on a backup broker to replicate a queue on the primary.
@@ -57,7 +58,11 @@ class QueueReplicator : public broker::Exchange,
public:
static const std::string DEQUEUE_EVENT_KEY;
static const std::string POSITION_EVENT_KEY;
+ static const std::string QPID_SYNC_FREQUENCY;
+
static std::string replicatorName(const std::string& queueName);
+ static bool isReplicatorName(const std::string&);
+
/** Test if a string is an event key */
static bool isEventKey(const std::string key);
@@ -68,7 +73,6 @@ class QueueReplicator : public broker::Exchange,
~QueueReplicator();
void activate(); // Call after ctor
- void deactivate(); // Call before dtor
std::string getType() const;
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const);
@@ -80,8 +84,18 @@ class QueueReplicator : public broker::Exchange,
uint64_t getSize();
void write(char* target);
+ // Set if the queue has ever been subscribed to, used for auto-delete cleanup.
+ void setSubscribed() { subscribed = true; }
+ bool isSubscribed() { return subscribed; }
+
+ boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
+
private:
+ class ErrorListener;
+ class QueueObserver;
+
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+ void destroy(); // Called when the queue is destroyed.
void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
HaBroker& haBroker;
@@ -92,6 +106,8 @@ class QueueReplicator : public broker::Exchange,
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<broker::Bridge> bridge;
BrokerInfo brokerInfo;
+ bool subscribed;
+ const Settings& settings;
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp
index 3421380940..798ade3f73 100644
--- a/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -21,6 +21,7 @@
#include "RemoteBackup.h"
#include "QueueGuard.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueRegistry.h"
#include "qpid/log/Statement.h"
@@ -32,32 +33,45 @@ namespace ha {
using sys::Mutex;
using boost::bind;
-RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
- logPrefix("Primary: Remote backup "+info.getLogId()+": "),
- brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
-{}
+RemoteBackup::RemoteBackup(
+ const BrokerInfo& info, broker::Connection* c
+) : brokerInfo(info), replicationTest(NONE), connection(c), reportedReady(false)
+{
+ std::ostringstream oss;
+ oss << "Primary: Remote backup " << info << ": ";
+ logPrefix = oss.str();
+}
-void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards)
+void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
{
- QPID_LOG(debug, logPrefix << "Setting initial queues" << (createGuards ? " and guards" : ""));
- queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1, createGuards));
+ queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards));
+ QPID_LOG(debug, logPrefix << "Set " << catchupQueues.size() << " catch-up queues"
+ << (createGuards ? " and guards" : ""));
}
RemoteBackup::~RemoteBackup() { cancel(); }
void RemoteBackup::cancel() {
+ QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected")
+ << " backup: " << brokerInfo);
for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i)
i->second->cancel();
guards.clear();
+ if (connection) {
+ connection->abort();
+ connection = 0;
+ }
}
bool RemoteBackup::isReady() {
- return connected && initialQueues.empty();
+ return connection && catchupQueues.empty();
}
-void RemoteBackup::initialQueue(const QueuePtr& q, bool createGuard) {
- if (replicationTest.isReplicated(ALL, *q)) {
- initialQueues.insert(q);
+void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
+ if (replicationTest.getLevel(*q) == ALL) {
+ QPID_LOG(debug, logPrefix << "Catch-up queue"
+ << (createGuard ? " and guard" : "") << ": " << q->getName());
+ catchupQueues.insert(q);
if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo));
}
}
@@ -88,21 +102,24 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) {
}
void RemoteBackup::ready(const QueuePtr& q) {
- initialQueues.erase(q);
- QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName()
- << QueueSetPrinter(", waiting for: ", initialQueues));
- if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
+ catchupQueues.erase(q);
+ if (catchupQueues.size()) {
+ QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", "
+ << catchupQueues.size() << " remain to catch up");
+ }
+ else
+ QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() );
}
-// Called via ConfigurationObserver::queueCreate and from initialQueue
+// Called via ConfigurationObserver::queueCreate and from catchupQueue
void RemoteBackup::queueCreate(const QueuePtr& q) {
- if (replicationTest.isReplicated(ALL, *q))
+ if (replicationTest.getLevel(*q) == ALL)
guards[q].reset(new QueueGuard(*q, brokerInfo));
}
// Called via ConfigurationObserver
void RemoteBackup::queueDestroy(const QueuePtr& q) {
- initialQueues.erase(q);
+ catchupQueues.erase(q);
GuardMap::iterator i = guards.find(q);
if (i != guards.end()) {
i->second->cancel();
diff --git a/cpp/src/qpid/ha/RemoteBackup.h b/cpp/src/qpid/ha/RemoteBackup.h
index 8ee776e90b..769c50457e 100644
--- a/cpp/src/qpid/ha/RemoteBackup.h
+++ b/cpp/src/qpid/ha/RemoteBackup.h
@@ -33,6 +33,7 @@ namespace qpid {
namespace broker {
class Queue;
class QueueRegistry;
+class Connection;
}
namespace ha {
@@ -54,20 +55,20 @@ class RemoteBackup
/** Note: isReady() can be true after construction
*@param connected true if the backup is already connected.
*/
- RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected);
+ RemoteBackup(const BrokerInfo&, broker::Connection*);
~RemoteBackup();
- /** Set the initial queues for all queues in the registry.
- *@createGuards if true create guards also, if false guards will be created on demand.
+ /** Set all queues in the registry as catch-up queues.
+ *@createGuards if true create guards also, if false guards are created on demand.
*/
- void setInitialQueues(broker::QueueRegistry&, bool createGuards);
+ void setCatchupQueues(broker::QueueRegistry&, bool createGuards);
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
GuardPtr guard(const QueuePtr&);
/** Is the remote backup connected? */
- void setConnected(bool b) { connected=b; }
- bool isConnected() const { return connected; }
+ void setConnection(broker::Connection* c) { connection = c; }
+ bool isConnected() const { return connection; }
/** ReplicatingSubscription associated with queue is ready.
* Note: may set isReady()
@@ -80,7 +81,7 @@ class RemoteBackup
/** Called via ConfigurationObserver. Note: may set isReady() */
void queueDestroy(const QueuePtr&);
- /**@return true when all initial queues for this backup are ready. */
+ /**@return true when all catch-up queues for this backup are ready. */
bool isReady();
/**@return true if isReady() and this is the first call to reportReady */
@@ -94,15 +95,14 @@ class RemoteBackup
typedef std::map<QueuePtr, GuardPtr> GuardMap;
typedef std::set<QueuePtr> QueueSet;
- /** Add queue to guard as an initial queue */
- void initialQueue(const QueuePtr&, bool createGuard);
+ void catchupQueue(const QueuePtr&, bool createGuard);
std::string logPrefix;
BrokerInfo brokerInfo;
ReplicationTest replicationTest;
GuardMap guards;
- QueueSet initialQueues;
- bool connected;
+ QueueSet catchupQueues;
+ broker::Connection* connection;
bool reportedReady;
};
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 6f7519cd1f..933716e8fa 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -91,25 +91,6 @@ string mask(const string& in)
return DOLLAR + in + INTERNAL;
}
-namespace {
-bool getSequence(const Message& message, SequenceNumber& result)
-{
- result = message.getSequence();
- return true;
-}
-}
-bool ReplicatingSubscription::getNext(
- broker::Queue& q, SequenceNumber from, SequenceNumber& result)
-{
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from);
-}
-
-bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) {
- QueueCursor cursor(REPLICATOR);
- return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front)));
-}
-
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -157,7 +138,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// Set a log prefix message that identifies the remote broker.
ostringstream os;
- os << "Primary " << queue->getName() << "@" << info.getLogId() << ": ";
+ os << "Primary " << queue->getName() << "@" << info << ": ";
logPrefix = os.str();
// NOTE: Once the guard is attached we can have concurrent
@@ -171,6 +152,7 @@ ReplicatingSubscription::ReplicatingSubscription(
guard->attach(*this);
QueueRange backup(arguments); // Remote backup range.
+ QueueRange backupOriginal(backup);
QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
backupPosition = backup.back;
@@ -207,7 +189,7 @@ ReplicatingSubscription::ReplicatingSubscription(
// queue and hasn't been tampered with then that will be the case.
QPID_LOG(debug, logPrefix << "Subscribed:"
- << " backup:" << backup
+ << " backup:" << backupOriginal << " adjusted backup:" << backup
<< " primary:" << primary
<< " catch-up: " << position << "-" << primary.back
<< "(" << primary.back-position << ")");
@@ -222,9 +204,7 @@ ReplicatingSubscription::ReplicatingSubscription(
}
}
-ReplicatingSubscription::~ReplicatingSubscription() {
- QPID_LOG(debug, logPrefix << "Detroyed replicating subscription");
-}
+ReplicatingSubscription::~ReplicatingSubscription() {}
// Called in subscription's connection thread when the subscription is created.
// Called separate from ctor because sending events requires
@@ -248,19 +228,20 @@ void ReplicatingSubscription::initialize() {
}
// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) {
- position = m.getSequence();
+bool ReplicatingSubscription::deliver(
+ const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
+{
try {
- QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
{
Mutex::ScopedLock l(lock);
- //FIXME GRS: position is no longer set//assert(position == m.getSequence());
+ position = m.getSequence();
- // m.getSequence() is the position of the newly enqueued message on local queue.
+ // m.getSequence() is the position of the new message on local queue.
// backupPosition is latest position on backup queue before enqueueing
if (m.getSequence() <= backupPosition)
throw Exception(
- QPID_MSG("Expected position > " << backupPosition
+ QPID_MSG(logPrefix << "Expected position > " << backupPosition
<< " but got " << m.getSequence()));
if (m.getSequence() - backupPosition > 1) {
// Position has advanced because of messages dequeued ahead of us.
@@ -272,7 +253,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const
}
return ConsumerImpl::deliver(c, m);
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"
+ QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
<< ": " << e.what());
throw;
}
@@ -292,6 +273,7 @@ void ReplicatingSubscription::setReady() {
// Called in the subscription's connection thread.
void ReplicatingSubscription::cancel()
{
+ QPID_LOG(debug, logPrefix << "Cancelled");
guard->cancel();
ConsumerImpl::cancel();
}
@@ -299,7 +281,7 @@ void ReplicatingSubscription::cancel()
// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
// Finish completion of message, it has been acknowledged by the backup.
- QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]");
+ QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
guard->complete(r.getMessageId());
// If next message is protected by the guard then we are ready
if (r.getMessageId() >= guard->getRange().back) setReady();
@@ -328,7 +310,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const Message& m)
{
- QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]");
+ QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
{
Mutex::ScopedLock l(lock);
dequeues.add(m.getSequence());
@@ -396,7 +378,14 @@ bool ReplicatingSubscription::doDispatch()
Mutex::ScopedLock l(lock);
if (!dequeues.empty()) sendDequeueEvent(l);
}
- return ConsumerImpl::doDispatch();
+ try {
+ return ConsumerImpl::doDispatch();
+ }
+ catch (const std::exception& e) {
+ // FIXME aconway 2012-10-05: detect queue deletion, no warning.
+ QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
+ return false;
+ }
}
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h
index 8a2984846e..7fcb4ccf13 100644
--- a/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -61,10 +61,14 @@ class QueueGuard;
*
* Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
*
+ * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held
+ * QueueGuard MAY call ReplicatingSubscription with it's lock held.
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
{
public:
+ typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
+
struct Factory : public broker::ConsumerFactory {
boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
broker::SemanticState* parent,
@@ -80,17 +84,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
static const std::string QPID_FRONT;
static const std::string QPID_BROKER_INFO;
- // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription
- /** Get position of front message on queue.
- *@return false if queue is empty.
- */
- static bool getFront(broker::Queue&, framing::SequenceNumber& result);
- /** Get next message after from in queue.
- *@return false if none found.
- */
- static bool getNext(broker::Queue&, framing::SequenceNumber from,
- framing::SequenceNumber& result);
-
ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
@@ -114,6 +107,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
// Hide the "queue deleted" error for a ReplicatingSubscription when a
// queue is deleted, this is normal and not an error.
bool hideDeletedError() { return true; }
+ // Not counted for auto deletion and immediate message purposes.
+ bool isCounted() { return false; }
/** Initialization that must be done separately from construction
* because it requires a shared_ptr to this to exist.
diff --git a/cpp/src/qpid/ha/ReplicationTest.cpp b/cpp/src/qpid/ha/ReplicationTest.cpp
index 88a969dbfd..647523ef2c 100644
--- a/cpp/src/qpid/ha/ReplicationTest.cpp
+++ b/cpp/src/qpid/ha/ReplicationTest.cpp
@@ -19,7 +19,9 @@
*
*/
#include "ReplicationTest.h"
+#include "qpid/log/Statement.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/Exchange.h"
#include "qpid/framing/FieldTable.h"
namespace qpid {
@@ -27,48 +29,47 @@ namespace ha {
using types::Variant;
-ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) {
+ReplicateLevel ReplicationTest::getLevel(const std::string& str) {
Enum<ReplicateLevel> rl(replicateDefault);
if (!str.empty()) rl.parse(str);
return rl.get();
}
-ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) {
+ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) {
if (f.isSet(QPID_REPLICATE))
- return replicateLevel(f.getAsString(QPID_REPLICATE));
+ return getLevel(f.getAsString(QPID_REPLICATE));
else
return replicateDefault;
}
-ReplicateLevel ReplicationTest::replicateLevel(const Variant::Map& m) {
+ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
if (i != m.end())
- return replicateLevel(i->second.asString());
+ return getLevel(i->second.asString());
else
return replicateDefault;
}
-namespace {
-const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
+ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
+ const Variant::Map& qmap(q.getSettings().original);
+ Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE);
+ if (i != qmap.end())
+ return getLevel(i->second.asString());
+ else
+ return getLevel(q.getSettings().storeSettings);
}
-bool ReplicationTest::isReplicated(
- ReplicateLevel level, const Variant::Map& args, bool autodelete, bool exclusive)
-{
- bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
- return !ignore && replicateLevel(args) >= level;
+ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) {
+ return getLevel(ex.getArgs());
}
-bool ReplicationTest::isReplicated(
- ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive)
+ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q)
{
- bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT);
- return !ignore && replicateLevel(args) >= level;
+ return q.getSettings().isTemporary ? ReplicationTest(NONE).getLevel(q) : getLevel(q);
}
-bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q)
-{
- return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner());
+ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) {
+ return ReplicationTest::getLevel(ex);
}
diff --git a/cpp/src/qpid/ha/ReplicationTest.h b/cpp/src/qpid/ha/ReplicationTest.h
index 9f6976a8e4..7d44d82a21 100644
--- a/cpp/src/qpid/ha/ReplicationTest.h
+++ b/cpp/src/qpid/ha/ReplicationTest.h
@@ -30,6 +30,7 @@ namespace qpid {
namespace broker {
class Queue;
+class Exchange;
}
namespace framing {
@@ -47,21 +48,24 @@ class ReplicationTest
ReplicationTest(ReplicateLevel replicateDefault_) :
replicateDefault(replicateDefault_) {}
- // Return the simple replication level, accounting for defaults.
- ReplicateLevel replicateLevel(const std::string& str);
- ReplicateLevel replicateLevel(const framing::FieldTable& f);
- ReplicateLevel replicateLevel(const types::Variant::Map& m);
+ // Get the replication level set on an object, or default if not set.
+ ReplicateLevel getLevel(const std::string& str);
+ ReplicateLevel getLevel(const framing::FieldTable& f);
+ ReplicateLevel getLevel(const types::Variant::Map& m);
+ ReplicateLevel getLevel(const broker::Queue&);
+ ReplicateLevel getLevel(const broker::Exchange&);
+
+ // Calculate level for objects that may not have replication set,
+ // including auto-delete/exclusive settings.
+ ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive);
+ ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive);
+ ReplicateLevel useLevel(const broker::Queue&);
+ ReplicateLevel useLevel(const broker::Exchange&);
- // Return true if replication for a queue is enabled at level or higher,
- // taking account of default level and queue settings.
- bool isReplicated(ReplicateLevel level,
- const types::Variant::Map& args, bool autodelete, bool exclusive);
- bool isReplicated(ReplicateLevel level,
- const framing::FieldTable& args, bool autodelete, bool exclusive);
- bool isReplicated(ReplicateLevel level, const broker::Queue&);
private:
ReplicateLevel replicateDefault;
};
+
}} // namespace qpid::ha
#endif /*!QPID_HA_REPLICATIONTEST_H*/
diff --git a/cpp/src/qpid/ha/Role.h b/cpp/src/qpid/ha/Role.h
new file mode 100644
index 0000000000..9d6f7cd123
--- /dev/null
+++ b/cpp/src/qpid/ha/Role.h
@@ -0,0 +1,55 @@
+#ifndef QPID_HA_ROLE_H
+#define QPID_HA_ROLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <string>
+
+namespace qpid {
+struct Url;
+
+namespace ha {
+
+/**
+ * A HaBroker has a role, e.g. Primary, Backup, StandAlone.
+ * Role subclasses define the actions of the broker in each role.
+ * The Role interface allows the HaBroker to pass management actions
+ * to be implemented by the role.
+ */
+class Role
+{
+ public:
+ /** Log prefix appropriate to the role */
+ virtual std::string getLogPrefix() const = 0;
+
+ /** QMF promote method handler.
+ * @return The new role if promoted, 0 if not. Caller takes ownership.
+ */
+ virtual Role* promote() = 0;
+
+ virtual void setBrokerUrl(const Url& url) = 0;
+
+ private:
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_ROLE_H*/
diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h
index 37235b5c79..53b61415cf 100644
--- a/cpp/src/qpid/ha/Settings.h
+++ b/cpp/src/qpid/ha/Settings.h
@@ -23,6 +23,7 @@
*/
#include "types.h"
+#include "qpid/sys/IntegerTypes.h"
#include <string>
namespace qpid {
@@ -34,16 +35,25 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5)
+ Settings() : cluster(false), queueReplication(false),
+ replicateDefault(NONE), backupTimeout(5),
+ flowMessages(100), flowBytes(0)
{}
bool cluster; // True if we are a cluster member.
- std::string clientUrl;
+ bool queueReplication; // True if enabled.
+ std::string publicUrl;
std::string brokerUrl;
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
double backupTimeout;
- private:
+
+ uint32_t flowMessages, flowBytes;
+
+ static const uint32_t NO_LIMIT=0xFFFFFFFF;
+ static uint32_t flowValue(uint32_t n) { return n ? n : NO_LIMIT; }
+ uint32_t getFlowMessages() const { return flowValue(flowMessages); }
+ uint32_t getFlowBytes() const { return flowValue(flowBytes); }
};
}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/StandAlone.h b/cpp/src/qpid/ha/StandAlone.h
new file mode 100644
index 0000000000..d052996d40
--- /dev/null
+++ b/cpp/src/qpid/ha/StandAlone.h
@@ -0,0 +1,45 @@
+#ifndef QPID_HA_STANDALONE_H
+#define QPID_HA_STANDALONE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+struct Url;
+
+namespace ha {
+
+/**
+ * Stand-alone role: acts as a stand-alone broker, no clustering.
+ * HA module needed to setting up replication via QMF methods.
+ */
+class StandAlone : public Role
+{
+ public:
+ std::string getLogPrefix() const { return logPrefix; }
+ Role* promote() { return 0; }
+ void setBrokerUrl(const Url&) {}
+
+ private:
+ std::string logPrefix;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_STANDALONE_H*/
diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp
new file mode 100644
index 0000000000..17613ce3dd
--- /dev/null
+++ b/cpp/src/qpid/ha/StatusCheck.cpp
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "StatusCheck.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Address.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace ha {
+
+using namespace qpid::messaging;
+using namespace qpid::types;
+using namespace std;
+using namespace sys;
+
+const string HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker";
+
+class StatusCheckThread : public sys::Runnable {
+ public:
+ StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self)
+ : url(addr), statusCheck(sc), brokerInfo(self) {}
+ void run();
+ private:
+ Url url;
+ StatusCheck& statusCheck;
+ uint16_t linkHeartbeatInterval;
+ BrokerInfo brokerInfo;
+};
+
+void StatusCheckThread::run() {
+ QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url);
+ Variant::Map options, clientProperties;
+ clientProperties = brokerInfo.asMap(); // Detect self connections.
+ clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups.
+
+ options["client-properties"] = clientProperties;
+ options["heartbeat"] = statusCheck.linkHeartbeatInterval;
+ Connection c(url.str(), options);
+
+ try {
+ c.open();
+ Session session = c.createSession();
+ messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}");
+ Receiver r = session.createReceiver(responses);
+ Sender s = session.createSender("qmf.default.direct/broker");
+ Message request;
+ request.setReplyTo(responses);
+ request.setContentType("amqp/map");
+ request.setProperty("x-amqp-0-10.app-id", "qmf2");
+ request.setProperty("qmf.opcode", "_query_request");
+ Variant::Map oid;
+ oid["_object_name"] = HA_BROKER;
+ Variant::Map content;
+ content["_what"] = "OBJECT";
+ content["_object_id"] = oid;
+ encode(content, request);
+ s.send(request);
+ Message response = r.fetch(statusCheck.linkHeartbeatInterval*Duration::SECOND);
+ session.acknowledge();
+ Variant::List contentIn;
+ decode(response, contentIn);
+ if (contentIn.size() == 1) {
+ Variant::Map details = contentIn.front().asMap()["_values"].asMap();
+ string status = details["status"].getString();
+ if (status != "joining") {
+ statusCheck.setPromote(false);
+ QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is "
+ << status << ", this broker will refuse promotion.");
+ }
+ QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
+ }
+ } catch(const exception& error) {
+ QPID_LOG(info, "Checking status of " << url << ": " << error.what());
+ }
+ delete this;
+}
+
+StatusCheck::StatusCheck(const string& lp, uint16_t lh, const BrokerInfo& self)
+ : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self)
+{}
+
+StatusCheck::~StatusCheck() {
+ // Join any leftovers
+ for (size_t i = 0; i < threads.size(); ++i) threads[i].join();
+}
+
+void StatusCheck::setUrl(const Url& url) {
+ Mutex::ScopedLock l(lock);
+ for (size_t i = 0; i < url.size(); ++i)
+ threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo)));
+}
+
+bool StatusCheck::canPromote() {
+ Mutex::ScopedLock l(lock);
+ while (!threads.empty()) {
+ Thread t = threads.back();
+ threads.pop_back();
+ Mutex::ScopedUnlock u(lock);
+ t.join();
+ }
+ return promote;
+}
+
+void StatusCheck::setPromote(bool p) {
+ Mutex::ScopedLock l(lock);
+ promote = p;
+}
+
+}} // namespace qpid::ha
diff --git a/cpp/src/qpid/ha/StatusCheck.h b/cpp/src/qpid/ha/StatusCheck.h
new file mode 100644
index 0000000000..997ced4159
--- /dev/null
+++ b/cpp/src/qpid/ha/StatusCheck.h
@@ -0,0 +1,71 @@
+#ifndef QPID_HA_STATUSCHECK_H
+#define QPID_HA_STATUSCHECK_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "BrokerInfo.h"
+#include "qpid/Url.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
+#include <vector>
+
+namespace qpid {
+namespace ha {
+
+// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect
+// against bad promotion if there are READY brokers when this broker starts.
+// It will not help the situation where brokers became READY after this one starts.
+//
+
+/**
+ * Check whether a JOINING broker can be promoted .
+ *
+ * A JOINING broker can be promoted as long as all the other brokers are also
+ * JOINING. If there are READY brokers in the cluster the JOINING broker should
+ * refuse to promote so that one of the READY brokers can. This situation
+ * only comes about if the primary is dead and no new primary has been promoted.
+ *
+ * THREAD SAFE: setUrl and canPromote are called in arbitrary management threads.
+ */
+class StatusCheck
+{
+ public:
+ StatusCheck(const std::string& logPrefix, uint16_t linkHeartbeatInteval, const BrokerInfo& self);
+ ~StatusCheck();
+ void setUrl(const Url&);
+ bool canPromote();
+
+ private:
+ void setPromote(bool p);
+
+ std::string logPrefix;
+ sys::Mutex lock;
+ std::vector<sys::Thread> threads;
+ bool promote;
+ uint16_t linkHeartbeatInterval;
+ BrokerInfo brokerInfo;
+ friend class StatusCheckThread;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_STATUSCHECK_H*/
diff --git a/cpp/src/qpid/ha/types.cpp b/cpp/src/qpid/ha/types.cpp
index 53e2056213..bb4bf83574 100644
--- a/cpp/src/qpid/ha/types.cpp
+++ b/cpp/src/qpid/ha/types.cpp
@@ -33,6 +33,7 @@ namespace ha {
using namespace std;
const string QPID_REPLICATE("qpid.replicate");
+const string QPID_HA_UUID("qpid.ha-uuid");
string EnumBase::str() const {
assert(value < count);
@@ -55,6 +56,11 @@ template <> const char* Enum<ReplicateLevel>::NAMES[] = { "none", "configuration
template <> const size_t Enum<ReplicateLevel>::N = 3;
template <> const char* Enum<BrokerStatus>::NAME = "HA broker status";
+
+// NOTE: Changing status names will have an impact on qpid-ha and
+// the qpidd-primary init script.
+// Don't change them unless you are going to update all dependent code.
+//
template <> const char* Enum<BrokerStatus>::NAMES[] = {
"joining", "catchup", "ready", "recovering", "active", "standalone"
};
diff --git a/cpp/src/qpid/ha/types.h b/cpp/src/qpid/ha/types.h
index 35faf9f624..f8c48afc5a 100644
--- a/cpp/src/qpid/ha/types.h
+++ b/cpp/src/qpid/ha/types.h
@@ -99,6 +99,7 @@ inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
// String constants.
extern const std::string QPID_REPLICATE;
+extern const std::string QPID_HA_UUID;
/** Define IdSet type, not a typedef so we can overload operator << */
class IdSet : public std::set<types::Uuid> {};