diff options
Diffstat (limited to 'cpp/src/qpid/ha')
34 files changed, 1559 insertions, 748 deletions
diff --git a/cpp/src/qpid/ha/AlternateExchangeSetter.h b/cpp/src/qpid/ha/AlternateExchangeSetter.h index 08690e68bc..2386a01084 100644 --- a/cpp/src/qpid/ha/AlternateExchangeSetter.h +++ b/cpp/src/qpid/ha/AlternateExchangeSetter.h @@ -43,12 +43,14 @@ class AlternateExchangeSetter AlternateExchangeSetter(broker::ExchangeRegistry& er) : exchanges(er) {} + /** If altEx is already known, call setter(altEx) now else save for later */ void setAlternate(const std::string& altEx, const SetFunction& setter) { - broker::Exchange::shared_ptr ex = exchanges.find(altEx); + boost::shared_ptr<broker::Exchange> ex = exchanges.find(altEx); if (ex) setter(ex); // Set immediately. else setters.insert(Setters::value_type(altEx, setter)); // Save for later. } + /** Add an exchange and call any setters that are waiting for it. */ void addExchange(boost::shared_ptr<broker::Exchange> exchange) { // Update the setters for this exchange std::pair<Setters::iterator, Setters::iterator> range = setters.equal_range(exchange->getName()); diff --git a/cpp/src/qpid/ha/Backup.cpp b/cpp/src/qpid/ha/Backup.cpp index 6852a58b0c..2affc12bf6 100644 --- a/cpp/src/qpid/ha/Backup.cpp +++ b/cpp/src/qpid/ha/Backup.cpp @@ -20,9 +20,12 @@ */ #include "Backup.h" #include "BrokerReplicator.h" +#include "ConnectionObserver.h" #include "HaBroker.h" +#include "Primary.h" #include "ReplicatingSubscription.h" #include "Settings.h" +#include "StatusCheck.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" @@ -44,28 +47,38 @@ using namespace framing; using namespace broker; using types::Variant; using std::string; +using sys::Mutex; Backup::Backup(HaBroker& hb, const Settings& s) : - logPrefix("Backup: "), haBroker(hb), broker(hb.getBroker()), settings(s) + logPrefix("Backup: "), membership(hb.getMembership()), stopped(false), + haBroker(hb), broker(hb.getBroker()), settings(s), + statusCheck( + new StatusCheck( + logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo())) { - // Empty brokerUrl means delay initialization until seBrokertUrl() is called. - if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); + // Set link properties to tag outgoing links. + framing::FieldTable linkProperties = broker.getLinkClientProperties(); + linkProperties.setTable( + ConnectionObserver::BACKUP_TAG, hb.getBrokerInfo().asFieldTable()); + broker.setLinkClientProperties(linkProperties); } -void Backup::initialize(const Url& brokers) { - if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); - string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; - types::Uuid uuid(true); - // Declare the link - std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( - broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), - brokers[0].host, brokers[0].port, protocol, - false, // durable - settings.mechanism, settings.username, settings.password, - false); // no amq.failover - don't want to use client URL. - { - sys::Mutex::ScopedLock l(lock); +void Backup::setBrokerUrl(const Url& brokers) { + if (brokers.empty()) return; + Mutex::ScopedLock l(lock); + if (stopped) return; + if (haBroker.getStatus() == JOINING) statusCheck->setUrl(brokers); + if (!link) { // Not yet initialized + QPID_LOG(info, logPrefix << "Connecting to cluster, broker URL: " << brokers); + string protocol = brokers[0].protocol.empty() ? "tcp" : brokers[0].protocol; + types::Uuid uuid(true); + std::pair<Link::shared_ptr, bool> result; + result = broker.getLinks().declare( + broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), + brokers[0].host, brokers[0].port, protocol, + false, // durable + settings.mechanism, settings.username, settings.password, + false); // no amq.failover - don't want to use client URL. link = result.first; replicator.reset(new BrokerReplicator(haBroker, link)); replicator->initialize(); @@ -74,36 +87,57 @@ void Backup::initialize(const Url& brokers) { link->setUrl(brokers); // Outside the lock, once set link doesn't change. } -Backup::~Backup() { +void Backup::stop(Mutex::ScopedLock&) { + if (stopped) return; + stopped = true; + QPID_LOG(debug, logPrefix << "Leaving backup role."); if (link) link->close(); - if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); + if (replicator.get()) { + replicator->shutdown(); + replicator.reset(); + } } -// Called via management. -void Backup::setBrokerUrl(const Url& url) { - // Ignore empty URLs seen during start-up for some tests. - if (url.empty()) return; - bool linkSet = false; +Role* Backup::recover(Mutex::ScopedLock&) { + BrokerInfo::Set backups; { - sys::Mutex::ScopedLock l(lock); - linkSet = link; + Mutex::ScopedLock l(lock); + if (stopped) return 0; + stop(l); // Stop backup activity before starting primary. + QPID_LOG(notice, "Promoting to primary: " << haBroker.getBrokerInfo()); + // Reset membership before allowing backups to connect. + backups = membership.otherBackups(); + membership.clear(); + return new Primary(haBroker, backups); } - if (linkSet) - link->setUrl(url); // Outside lock, once set link doesn't change - else - initialize(url); // Deferred initialization } -void Backup::setStatus(BrokerStatus status) { - switch (status) { - case READY: - QPID_LOG(notice, logPrefix << "Ready to become primary."); +Role* Backup::promote() { + Mutex::ScopedLock l(lock); + if (stopped) return 0; + switch (haBroker.getStatus()) { + case JOINING: + if (statusCheck->canPromote()) return recover(l); + else { + QPID_LOG(error, + logPrefix << "Joining active cluster, cannot be promoted."); + throw Exception("Joining active cluster, cannot be promoted."); + } break; case CATCHUP: - QPID_LOG(notice, logPrefix << "Catching up on primary, cannot be promoted."); + QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); + throw Exception("Still catching up, cannot be promoted."); + break; + case READY: return recover(l); break; default: - assert(0); + assert(0); // Not a valid state for the Backup role.. } + return 0; // Keep compiler happy +} + +Backup::~Backup() { + Mutex::ScopedLock l(lock); + stop(l); } }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/Backup.h b/cpp/src/qpid/ha/Backup.h index 4f2d5babde..4943ca5e2e 100644 --- a/cpp/src/qpid/ha/Backup.h +++ b/cpp/src/qpid/ha/Backup.h @@ -22,6 +22,7 @@ * */ +#include "Role.h" #include "Settings.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" @@ -38,30 +39,41 @@ namespace ha { class Settings; class BrokerReplicator; class HaBroker; +class StatusCheck; +class Membership; /** - * State associated with a backup broker. Manages connections to primary. + * Backup role: Manages connections to primary, replicates management events and queue contents. * * THREAD SAFE */ -class Backup +class Backup : public Role { public: Backup(HaBroker&, const Settings&); ~Backup(); + + std::string getLogPrefix() const { return logPrefix; } + void setBrokerUrl(const Url&); - void setStatus(BrokerStatus); + + Role* promote(); private: - void initialize(const Url&); + void stop(sys::Mutex::ScopedLock&); + Role* recover(sys::Mutex::ScopedLock&); + std::string logPrefix; + Membership& membership; sys::Mutex lock; + bool stopped; HaBroker& haBroker; broker::Broker& broker; Settings settings; boost::shared_ptr<broker::Link> link; boost::shared_ptr<BrokerReplicator> replicator; + std::auto_ptr<StatusCheck> statusCheck; }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/BrokerInfo.cpp b/cpp/src/qpid/ha/BrokerInfo.cpp index c8bd1a14be..8efed91b17 100644 --- a/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/cpp/src/qpid/ha/BrokerInfo.cpp @@ -33,27 +33,22 @@ namespace qpid { namespace ha { namespace { -std::string SYSTEM_ID="system-id"; -std::string HOST_NAME="host-name"; -std::string PORT="port"; -std::string STATUS="status"; +const std::string SYSTEM_ID="system-id"; +const std::string HOST_NAME="host-name"; +const std::string PORT="port"; +const std::string STATUS="status"; } using types::Uuid; using types::Variant; using framing::FieldTable; -BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) : - hostName(host), port(port_), systemId(id) -{ - updateLogId(); -} +BrokerInfo::BrokerInfo() : port(0), status(JOINING) {} -void BrokerInfo::updateLogId() { - std::ostringstream o; - o << hostName << ":" << port; - logId = o.str(); -} +BrokerInfo::BrokerInfo(const types::Uuid& id, BrokerStatus s, + const std::string& host, uint16_t port_) : + hostName(host), port(port_), systemId(id), status(s) +{} FieldTable BrokerInfo::asFieldTable() const { Variant::Map m = asMap(); @@ -91,7 +86,6 @@ void BrokerInfo::assign(const Variant::Map& m) { hostName = get(m, HOST_NAME).asString(); port = get(m, PORT).asUint16(); status = BrokerStatus(get(m, STATUS).asUint8()); - updateLogId(); } std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { diff --git a/cpp/src/qpid/ha/BrokerInfo.h b/cpp/src/qpid/ha/BrokerInfo.h index 642f7c1361..40358336b0 100644 --- a/cpp/src/qpid/ha/BrokerInfo.h +++ b/cpp/src/qpid/ha/BrokerInfo.h @@ -43,8 +43,9 @@ class BrokerInfo typedef std::set<BrokerInfo> Set; typedef std::map<types::Uuid, BrokerInfo> Map; - BrokerInfo() {} - BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id); + BrokerInfo(); + BrokerInfo(const types::Uuid& id, BrokerStatus, + const std::string& host=std::string(), uint16_t port=0); BrokerInfo(const framing::FieldTable& ft) { assign(ft); } BrokerInfo(const types::Variant::Map& m) { assign(m); } @@ -52,7 +53,6 @@ class BrokerInfo std::string getHostName() const { return hostName; } BrokerStatus getStatus() const { return status; } uint16_t getPort() const { return port; } - std::string getLogId() const { return logId; } void setStatus(BrokerStatus s) { status = s; } @@ -66,8 +66,6 @@ class BrokerInfo bool operator<(const BrokerInfo x) const { return systemId < x.systemId; } private: - void updateLogId(); - std::string logId; std::string hostName; uint16_t port; types::Uuid systemId; diff --git a/cpp/src/qpid/ha/BrokerReplicator.cpp b/cpp/src/qpid/ha/BrokerReplicator.cpp index 3a3c9c2954..983b976d76 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -23,16 +23,19 @@ #include "QueueReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Connection.h" +#include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Link.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/SessionHandler.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/reply_exceptions.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventUnbind.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" @@ -41,6 +44,7 @@ #include "qmf/org/apache/qpid/broker/EventQueueDelete.h" #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" +#include <boost/bind.hpp> #include <algorithm> #include <sstream> #include <iostream> @@ -57,23 +61,25 @@ using qmf::org::apache::qpid::broker::EventQueueDeclare; using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; using qmf::org::apache::qpid::ha::EventMembersUpdate; +using qpid::broker::amqp_0_10::MessageTransfer; using namespace framing; -using std::string; +using namespace std; using std::ostream; using types::Variant; using namespace broker; namespace { -const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); +const string QPID_CONFIGURATION_REPLICATOR("qpid.broker-replicator"); const string CLASS_NAME("_class_name"); const string EVENT("_event"); const string OBJECT_NAME("_object_name"); const string PACKAGE_NAME("_package_name"); const string QUERY_RESPONSE("_query_response"); -const string SCHEMA_ID("_schema_id"); const string VALUES("_values"); +const string SCHEMA_ID("_schema_id"); +const string WHAT("_what"); const string ALTEX("altEx"); const string ALTEXCHANGE("altExchange"); @@ -81,24 +87,27 @@ const string ARGS("args"); const string ARGUMENTS("arguments"); const string AUTODEL("autoDel"); const string AUTODELETE("autoDelete"); -const string EXCL("excl"); -const string EXCLUSIVE("exclusive"); const string BIND("bind"); -const string UNBIND("unbind"); const string BINDING("binding"); +const string BINDING_KEY("bindingKey"); const string CREATED("created"); const string DISP("disp"); +const string DEST("dest"); const string DURABLE("durable"); const string EXCHANGE("exchange"); +const string EXCL("excl"); +const string EXCLUSIVE("exclusive"); const string EXNAME("exName"); const string EXTYPE("exType"); +const string HA_BROKER("habroker"); const string KEY("key"); const string NAME("name"); +const string PARTIAL("partial"); const string QNAME("qName"); const string QUEUE("queue"); const string TYPE("type"); -const string HA_BROKER("habroker"); -const string PARTIAL("partial"); +const string UNBIND("unbind"); +const string CONSUMER_COUNT("consumerCount"); const string AGENT_EVENT_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string AGENT_EVENT_HA("agent.ind.event.org_apache_qpid_ha.#"); @@ -107,10 +116,6 @@ const string QMF_CONTENT("qmf.content"); const string QMF_DEFAULT_TOPIC("qmf.default.topic"); const string QMF_OPCODE("qmf.opcode"); -const string _WHAT("_what"); -const string _CLASS_NAME("_class_name"); -const string _PACKAGE_NAME("_package_name"); -const string _SCHEMA_ID("_schema_id"); const string OBJECT("OBJECT"); const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); const string ORG_APACHE_QPID_HA("org.apache.qpid.ha"); @@ -118,21 +123,18 @@ const string QMF_DEFAULT_DIRECT("qmf.default.direct"); const string _QUERY_REQUEST("_query_request"); const string BROKER("broker"); const string MEMBERS("members"); - -template <class T> bool match(Variant::Map& schema) { - return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); -} +const string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); void sendQuery(const string& packageName, const string& className, const string& queueName, SessionHandler& sessionHandler) { framing::AMQP_ServerProxy peer(sessionHandler.out); Variant::Map request; - request[_WHAT] = OBJECT; + request[WHAT] = OBJECT; Variant::Map schema; - schema[_CLASS_NAME] = className; - schema[_PACKAGE_NAME] = packageName; - request[_SCHEMA_ID] = schema; + schema[CLASS_NAME] = className; + schema[PACKAGE_NAME] = packageName; + request[SCHEMA_ID] = schema; AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); method.setBof(true); @@ -170,19 +172,144 @@ Variant::Map asMapVoid(const Variant& value) { } } // namespace +// Listens for errors on the bridge session. +class BrokerReplicator::ErrorListener : public SessionHandler::ErrorListener { + public: + ErrorListener(const std::string& lp, BrokerReplicator& br) : + logPrefix(lp), brokerReplicator(br) {} + + void connectionException(framing::connection::CloseCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Connection error: " << msg); + } + void channelException(framing::session::DetachCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Channel error: " << msg); + } + void executionException(framing::execution::ErrorCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Execution error: " << msg); + } + void detach() { + QPID_LOG(debug, logPrefix << "Session detached."); + } + + private: + std::string logPrefix; + BrokerReplicator& brokerReplicator; +}; + +class BrokerReplicator::ConnectionObserver : public broker::ConnectionObserver +{ + public: + ConnectionObserver(BrokerReplicator& br) : brokerReplicator(br) {} + virtual void connection(Connection&) {} + virtual void opened(Connection&) {} + + virtual void closed(Connection& c) { + if (brokerReplicator.link && &c == brokerReplicator.connection) + brokerReplicator.disconnected(); + } + virtual void forced(Connection& c, const std::string& /*message*/) { closed(c); } + private: + BrokerReplicator& brokerReplicator; +}; + +/** Keep track of queues or exchanges during the update process to solve 2 + * problems. + * + * 1. Once all responses are processed, remove any queues/exchanges + * that were not mentioned as they no longer exist on the primary. + * + * 2. During the update if we see an event for an object we should + * ignore any subsequent responses for that object as they are out + * of date. + */ +class BrokerReplicator::UpdateTracker { + public: + typedef std::set<std::string> Names; + typedef boost::function<void (const std::string&)> CleanFn; + + UpdateTracker(const std::string& type_, // "queue" or "exchange" + CleanFn f, const ReplicationTest& rt) + : type(type_), cleanFn(f), repTest(rt) {} + + /** Destructor cleans up remaining initial queues. */ + ~UpdateTracker() { + // Don't throw in a destructor. + try { for_each(initial.begin(), initial.end(), cleanFn); } + catch (const std::exception& e) { + QPID_LOG(error, "Error in cleanup of lost objects: " << e.what()); + } + } + + /** Add an exchange name */ + void addExchange(Exchange::shared_ptr ex) { + if (repTest.getLevel(*ex)) + initial.insert(ex->getName()); + } + + /** Add a queue name. */ + void addQueue(Queue::shared_ptr q) { + if (repTest.getLevel(*q)) + initial.insert(q->getName()); + } + + /** Received an event for name */ + void event(const std::string& name) { + initial.erase(name); // no longer a candidate for deleting + events.insert(name); // we have seen an event for this name + } + + /** Received a response for name. + *@return true if this response should be processed, false if we have + *already seen an event for this object. + */ + bool response(const std::string& name) { + initial.erase(name); // no longer a candidate for deleting + return events.find(name) == events.end(); // true if no event seen yet. + } + + private: + void clean(const std::string& name) { + QPID_LOG(info, "Backup updated, deleting " << type << " " << name); + cleanFn(name); + } + + std::string type; + Names initial, events; + CleanFn cleanFn; + ReplicationTest repTest; +}; + BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), - logPrefix("Backup: "), replicationTest(hb.getReplicationTest()), - haBroker(hb), broker(hb.getBroker()), link(l), + logPrefix("Backup: "), replicationTest(NONE), + haBroker(hb), broker(hb.getBroker()), + exchanges(broker.getExchanges()), queues(broker.getQueues()), + link(l), initialized(false), - alternates(hb.getBroker().getExchanges()) -{} + alternates(hb.getBroker().getExchanges()), + connection(0) +{ + connectionObserver.reset(new ConnectionObserver(*this)); + broker.getConnectionObservers().add(connectionObserver); + framing::FieldTable args = getArgs(); + args.setString(QPID_REPLICATE, printable(NONE).str()); + setArgs(args); + + dispatch[EventQueueDeclare::getFullName()] = &BrokerReplicator::doEventQueueDeclare; + dispatch[EventQueueDelete::getFullName()] = &BrokerReplicator::doEventQueueDelete; + dispatch[EventExchangeDeclare::getFullName()] = &BrokerReplicator::doEventExchangeDeclare; + dispatch[EventExchangeDelete::getFullName()] = &BrokerReplicator::doEventExchangeDelete; + dispatch[EventBind::getFullName()] = &BrokerReplicator::doEventBind; + dispatch[EventUnbind::getFullName()] = &BrokerReplicator::doEventUnbind; + dispatch[EventMembersUpdate::getFullName()] = &BrokerReplicator::doEventMembersUpdate; + dispatch[EventSubscribe::getFullName()] = &BrokerReplicator::doEventSubscribe; +} void BrokerReplicator::initialize() { // Can't do this in the constructor because we need a shared_ptr to this. types::Uuid uuid(true); const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); - broker.getLinks().declare( + std::pair<Bridge::shared_ptr, bool> result = broker.getLinks().declare( name, // name for bridge *link, // parent false, // durable @@ -195,21 +322,47 @@ void BrokerReplicator::initialize() { "", // excludes false, // dynamic 0, // sync? - // shared_ptr keeps this in memory until outstanding initializeBridge + // shared_ptr keeps this in memory until outstanding connected // calls are run. - boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2) + boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2) ); + assert(result.second); + result.first->setErrorListener( + boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this))); } -BrokerReplicator::~BrokerReplicator() { } +BrokerReplicator::~BrokerReplicator() { shutdown(); } + +namespace { +void collectQueueReplicators( + const boost::shared_ptr<Exchange> ex, set<boost::shared_ptr<QueueReplicator> >& collect) +{ + boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); + if (qr) collect.insert(qr); +} +} // namespace + +void BrokerReplicator::shutdown() { + // NOTE: this is called in a QMF dispatch thread, not the Link's connection + // thread. It's OK to be unlocked because it doesn't use any mutable state, + // it only calls thread safe functions objects belonging to the Broker. + + // Unregister with broker objects: + if (connectionObserver) { + broker.getConnectionObservers().remove(connectionObserver); + connectionObserver.reset(); + } + broker.getExchanges().destroy(getName()); +} // This is called in the connection IO thread when the bridge is started. -void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { +void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) { // Use the credentials of the outgoing Link connection for creating queues, // exchanges etc. We know link->getConnection() is non-zero because we are // being called in the connections thread context. // - assert(link->getConnection()); + connection = link->getConnection(); + assert(connection); userId = link->getConnection()->getUserId(); remoteHost = link->getConnection()->getUrl(); @@ -221,6 +374,19 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH << " status:" << printable(haBroker.getStatus())); initialized = true; + exchangeTracker.reset( + new UpdateTracker("exchange", + boost::bind(&BrokerReplicator::deleteExchange, this, _1), + replicationTest)); + exchanges.eachExchange( + boost::bind(&UpdateTracker::addExchange, exchangeTracker.get(), _1)); + + queueTracker.reset( + new UpdateTracker("queue", + boost::bind(&BrokerReplicator::deleteQueue, this, _1, true), + replicationTest)); + queues.eachQueue(boost::bind(&UpdateTracker::addQueue, queueTracker.get(), _1)); + framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); @@ -231,9 +397,14 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, FieldTable()); peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, FieldTable()); //subscribe to the queue - peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + FieldTable arguments; + arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? + peer.getMessage().subscribe( + queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/, + false/*exclusive*/, "", 0, arguments); + peer.getMessage().setFlowMode(args.i_dest, 1); // Window + peer.getMessage().flow(args.i_dest, 0, haBroker.getSettings().getFlowMessages()); + peer.getMessage().flow(args.i_dest, 1, haBroker.getSettings().getFlowBytes()); // Issue a query request for queues, exchanges, bindings and the habroker // using event queue as the reply-to address @@ -247,12 +418,12 @@ void BrokerReplicator::route(Deliverable& msg) { // We transition from JOINING->CATCHUP on the first message received from the primary. // Until now we couldn't be sure if we had a good connection to the primary. if (haBroker.getStatus() == JOINING) { - haBroker.setStatus(CATCHUP); + haBroker.getMembership().setStatus(CATCHUP); QPID_LOG(notice, logPrefix << "Connected to primary " << primary); } Variant::List list; try { - if (!qpid::broker::amqp_0_10::MessageTransfer::isQMFv2(msg.getMessage())) + if (!MessageTransfer::isQMFv2(msg.getMessage())) throw Exception("Unexpected message, not QMF2 event or query response."); // decode as list string content = msg.getMessage().getContent(); @@ -264,13 +435,9 @@ void BrokerReplicator::route(Deliverable& msg) { QPID_LOG(trace, "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); - if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); - else if (match<EventQueueDelete>(schema)) doEventQueueDelete(values); - else if (match<EventExchangeDeclare>(schema)) doEventExchangeDeclare(values); - else if (match<EventExchangeDelete>(schema)) doEventExchangeDelete(values); - else if (match<EventBind>(schema)) doEventBind(values); - else if (match<EventUnbind>(schema)) doEventUnbind(values); - else if (match<EventMembersUpdate>(schema)) doEventMembersUpdate(values); + EventKey key(schema[PACKAGE_NAME], schema[CLASS_NAME]); + EventDispatchMap::iterator j = dispatch.find(key); + if (j != dispatch.end()) (this->*(j->second))(values); } } else if (msg.getMessage().getPropertyAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { @@ -285,15 +452,21 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == BINDING) doResponseBind(values); else if (type == HA_BROKER) doResponseHaBroker(values); } - if (qpid::broker::amqp_0_10::MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { - // We have received all of the exchange response. + if (MessageTransfer::isLastQMFResponse(msg.getMessage(), EXCHANGE)) { + QPID_LOG(debug, logPrefix << "All exchange responses received.") + exchangeTracker.reset(); // Clean up exchanges that no longer exist in the primary alternates.clear(); } + if (MessageTransfer::isLastQMFResponse(msg.getMessage(), QUEUE)) { + QPID_LOG(debug, logPrefix << "All queue responses received."); + queueTracker.reset(); // Clean up queues that no longer exist in the primary + } } } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what() - << ": while handling: " << list); - haBroker.shutdown(); +; + haBroker.shutdown( + QPID_MSG(logPrefix << "Configuration replication failed: " + << e.what() << ": while handling: " << list)); throw; } } @@ -301,31 +474,22 @@ void BrokerReplicator::route(Deliverable& msg) { void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { Variant::Map argsMap = asMapVoid(values[ARGS]); - bool autoDel = values[AUTODEL].asBool(); - bool excl = values[EXCL].asBool(); - if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) { + if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) { string name = values[QNAME].asString(); QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool()); + QPID_LOG(debug, logPrefix << "Queue declare event: " << name); + if (queueTracker.get()) queueTracker->event(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a queue with this name, replace it. // The queue was definitely created on the primary. - if (broker.getQueues().find(name)) { - QPID_LOG(warning, logPrefix << "Replacing exsiting queue: " << name); - broker.getQueues().destroy(name); - stopQueueReplicator(name); + if (queues.find(name)) { + QPID_LOG(warning, logPrefix << "Declare event, replacing exsiting queue: " + << name); + deleteQueue(name); } - settings.populate(args, settings.storeSettings); - std::pair<boost::shared_ptr<Queue>, bool> result = - broker.createQueue( - name, - settings, - 0 /*i.e. no owner regardless of exclusivity on master*/, - values[ALTEX].asString(), - userId, - remoteHost); - assert(result.second); // Should be true since we destroyed existing queue above - startQueueReplicator(result.first); + replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args, + values[ALTEX].asString()); } } @@ -333,7 +497,7 @@ boost::shared_ptr<QueueReplicator> BrokerReplicator::findQueueReplicator( const std::string& qname) { string rname = QueueReplicator::replicatorName(qname); - boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); + boost::shared_ptr<broker::Exchange> ex = exchanges.find(rname); return boost::dynamic_pointer_cast<QueueReplicator>(ex); } @@ -341,79 +505,85 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // The remote queue has already been deleted so replicator // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); - boost::shared_ptr<Queue> queue = broker.getQueues().find(name); - if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) { + boost::shared_ptr<Queue> queue = queues.find(name); + if (queue && replicationTest.getLevel(*queue)) { QPID_LOG(debug, logPrefix << "Queue delete event: " << name); - stopQueueReplicator(name); - broker.deleteQueue(name, userId, remoteHost); + if (queueTracker.get()) queueTracker->event(name); + deleteQueue(name); } } void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGS])); - if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange. - if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { + if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) { string name = values[EXNAME].asString(); QPID_LOG(debug, logPrefix << "Exchange declare event: " << name); + if (exchangeTracker.get()) exchangeTracker->event(name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); // If we already have a exchange with this name, replace it. // The exchange was definitely created on the primary. - if (broker.getExchanges().find(name)) { - broker.getExchanges().destroy(name); - QPID_LOG(warning, logPrefix << "Replaced exsiting exchange: " << name); + if (exchanges.find(name)) { + deleteExchange(name); + QPID_LOG(warning, logPrefix << "Declare event, replacing existing exchange: " + << name); } - boost::shared_ptr<Exchange> exchange = - createExchange(name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, values[ALTEX].asString()); - assert(exchange); + CreateExchangeResult result = createExchange( + name, values[EXTYPE].asString(), values[DURABLE].asBool(), args, + values[ALTEX].asString()); + replicatedExchanges.insert(name); + assert(result.second); } } void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); - boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); + boost::shared_ptr<Exchange> exchange = exchanges.find(name); if (!exchange) { - QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name); - } else if (!replicationTest.replicateLevel(exchange->getArgs())) { + QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name); + } else if (!replicationTest.getLevel(*exchange)) { QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); - broker.deleteExchange(name, userId, remoteHost); + if (exchangeTracker.get()) exchangeTracker->event(name); + deleteExchange(name); + replicatedExchanges.erase(name); } } void BrokerReplicator::doEventBind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = - broker.getExchanges().find(values[EXNAME].asString()); + exchanges.find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = - broker.getQueues().find(values[QNAME].asString()); - // We only replicate binds for a replicated queue to replicated - // exchange that both exist locally. - if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + queues.find(values[QNAME].asString()); + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); + // We only replicate binds for a replicated queue to replicated exchange + // that both exist locally. Respect the replication level set in the + // bind arguments, but replicate by default. + if (exchange && replicationTest.getLevel(*exchange) && + queue && replicationTest.getLevel(*queue) && + ReplicationTest(ALL).getLevel(args)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() - << " key=" << key); + << " key=" << key + << " args=" << args); exchange->bind(queue, key, &args, 0); } } void BrokerReplicator::doEventUnbind(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = - broker.getExchanges().find(values[EXNAME].asString()); + exchanges.find(values[EXNAME].asString()); boost::shared_ptr<Queue> queue = - broker.getQueues().find(values[QNAME].asString()); + queues.find(values[QNAME].asString()); // We only replicate unbinds for a replicated queue to replicated // exchange that both exist locally. - if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + if (exchange && replicationTest.getLevel(*exchange) && + queue && replicationTest.getLevel(*queue)) { - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() @@ -424,7 +594,17 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { Variant::List members = values[MEMBERS].asList(); - haBroker.setMembership(members); + setMembership(members); +} + +void BrokerReplicator::doEventSubscribe(Variant::Map& values) { + // Ignore queue replicator subscriptions. + if (QueueReplicator::isReplicatorName(values[DEST].asString())) return; + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(values[QNAME]); + if (qr) { + qr->setSubscribed(); + QPID_LOG(debug, logPrefix << "Subscribe event: " << values[QNAME]); + } } namespace { @@ -441,40 +621,68 @@ string getAltExchange(const types::Variant& var) { } else return string(); } + +Variant getHaUuid(const Variant::Map& map) { + Variant::Map::const_iterator i = map.find(QPID_HA_UUID); + return i == map.end() ? Variant() : i->second; } +} // namespace + + void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicationTest.isReplicated( - CONFIGURATION, - values[ARGUMENTS].asMap(), - values[AUTODELETE].asBool(), - values[EXCLUSIVE].asBool())) - return; + if (!replicationTest.getLevel(argsMap)) return; string name(values[NAME].asString()); + if (!queueTracker.get()) + throw Exception(QPID_MSG("Unexpected queue response: " << values)); + if (!queueTracker->response(name)) return; // Response is out-of-date QPID_LOG(debug, logPrefix << "Queue response: " << name); + // If we see a queue with the same name as one we have, but not the same UUID, + // then replace the one we have. + boost::shared_ptr<Queue> queue = queues.find(name); + if (queue && getHaUuid(queue->getSettings().original) != getHaUuid(argsMap)) { + QPID_LOG(warning, logPrefix << "UUID mismatch, replacing queue: " + << name); + deleteQueue(name); + } framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - boost::shared_ptr<Queue> queue = - createQueue(name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, - getAltExchange(values[ALTEXCHANGE])); - // It is normal for the queue to already exist if we are failing over. - if (queue) startQueueReplicator(queue); - else QPID_LOG(debug, logPrefix << "Queue already replicated: " << name); + boost::shared_ptr<QueueReplicator> qr = replicateQueue( + name, values[DURABLE].asBool(), values[AUTODELETE].asBool(), args, + getAltExchange(values[ALTEXCHANGE])); + if (qr) { + Variant::Map::const_iterator i = values.find(CONSUMER_COUNT); + if (i != values.end() && isIntegerType(i->second.getType())) { + if (i->second.asInt64()) qr->setSubscribed(); + } + } } void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicationTest.replicateLevel(argsMap)) return; + if (!replicationTest.getLevel(argsMap)) return; string name = values[NAME].asString(); + if (!exchangeTracker.get()) + throw Exception(QPID_MSG("Unexpected exchange response: " << values)); + if (!exchangeTracker->response(name)) return; // Response is out of date. QPID_LOG(debug, logPrefix << "Exchange response: " << name); framing::FieldTable args; qpid::amqp_0_10::translate(argsMap, args); - boost::shared_ptr<Exchange> exchange = createExchange( + // If we see an exchange with the same name as one we have, but not the same UUID, + // then replace the one we have. + boost::shared_ptr<Exchange> exchange = exchanges.find(name); + if (exchange && + exchange->getArgs().getAsString(QPID_HA_UUID) != args.getAsString(QPID_HA_UUID)) + { + QPID_LOG(warning, logPrefix << "UUID mismatch, replacing exchange: " + << name); + deleteExchange(name); + } + CreateExchangeResult result = createExchange( name, values[TYPE].asString(), values[DURABLE].asBool(), args, getAltExchange(values[ALTEXCHANGE])); - // It is normal for the exchange to already exist if we are failing over. - QPID_LOG_IF(debug, !exchange, logPrefix << "Exchange already replicated: " << name); + replicatedExchanges.insert(name); } namespace { @@ -501,19 +709,25 @@ const std::string QUEUE_REF("queueRef"); void BrokerReplicator::doResponseBind(Variant::Map& values) { std::string exName = getRefName(EXCHANGE_REF_PREFIX, values[EXCHANGE_REF]); std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]); - boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName); - boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); + boost::shared_ptr<Exchange> exchange = exchanges.find(exName); + boost::shared_ptr<Queue> queue = queues.find(qName); - // Automatically replicate binding if queue and exchange exist and are replicated - if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && - queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) + framing::FieldTable args; + qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + + // Automatically replicate binding if queue and exchange exist and are replicated. + // Respect replicate setting in binding args but default to replicated. + if (exchange && replicationTest.getLevel(*exchange) && + queue && replicationTest.getLevel(*queue) && + ReplicationTest(ALL).getLevel(args)) { - string key = values[KEY].asString(); + string key = values[BINDING_KEY].asString(); QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName << " queue:" << qName - << " key:" << key); - framing::FieldTable args; - qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); + << " key:" << key + << " args:" << args); +// framing::FieldTable args; +// qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); exchange->bind(queue, key, &args, 0); } } @@ -527,42 +741,65 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { QPID_LOG(trace, logPrefix << "HA Broker response: " << values); ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); - ReplicateLevel primary = replicationTest.replicateLevel( - values[REPLICATE_DEFAULT].asString()); + ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString()); if (mine != primary) throw Exception(QPID_MSG("Replicate default on backup (" << mine << ") does not match primary (" << primary << ")")); - haBroker.setMembership(values[MEMBERS].asList()); + setMembership(values[MEMBERS].asList()); } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what() - << ": " << values); - haBroker.shutdown(); + haBroker.shutdown( + QPID_MSG(logPrefix << "Invalid HA Broker response: " << e.what() + << ": " << values)); + throw; } } -void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) +boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator( + const boost::shared_ptr<Queue>& queue) { - if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) { + if (replicationTest.getLevel(*queue) == ALL) { boost::shared_ptr<QueueReplicator> qr( new QueueReplicator(haBroker, queue, link)); - if (!broker.getExchanges().registerExchange(qr)) + if (!exchanges.registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); + return qr; } + return boost::shared_ptr<QueueReplicator>(); } -void BrokerReplicator::stopQueueReplicator(const std::string& name) { - boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); - if (qr) { - qr->deactivate(); - // QueueReplicator's bridge is now queued for destruction but may not - // actually be destroyed. - broker.getExchanges().destroy(qr->getName()); +void BrokerReplicator::deleteQueue(const std::string& name, bool purge) { + Queue::shared_ptr queue = queues.find(name); + if (queue) { + // Purge before deleting to ensure that we don't reroute any + // messages. Any reroutes will be done at the primary and + // replicated as normal. + if (purge) queue->purge(0, boost::shared_ptr<Exchange>()); + broker.deleteQueue(name, userId, remoteHost); + QPID_LOG(debug, logPrefix << "Queue deleted: " << name); + } +} + +void BrokerReplicator::deleteExchange(const std::string& name) { + try { + boost::shared_ptr<broker::Exchange> exchange = exchanges.find(name); + if (!exchange) { + QPID_LOG(warning, logPrefix << "Cannot delete exchange, not found: " << name); + return; + } + if (exchange->inUseAsAlternate()) { + QPID_LOG(warning, "Cannot delete exchange, in use as alternate: " << name); + return; + } + broker.deleteExchange(name, userId, remoteHost); + QPID_LOG(debug, logPrefix << "Exchange deleted: " << name); + } catch (const framing::NotFoundException&) { + QPID_LOG(debug, logPrefix << "Exchange not found for deletion: " << name); } } -boost::shared_ptr<Queue> BrokerReplicator::createQueue( +boost::shared_ptr<QueueReplicator> BrokerReplicator::replicateQueue( const std::string& name, bool durable, bool autodelete, @@ -571,7 +808,7 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue( { QueueSettings settings(durable, autodelete); settings.populate(arguments, settings.storeSettings); - std::pair<boost::shared_ptr<Queue>, bool> result = + CreateQueueResult result = broker.createQueue( name, settings, @@ -579,24 +816,23 @@ boost::shared_ptr<Queue> BrokerReplicator::createQueue( string(), // Set alternate exchange below userId, remoteHost); - if (result.second) { - if (!alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); - } - return result.first; + boost::shared_ptr<QueueReplicator> qr; + if (!findQueueReplicator(name)) qr = startQueueReplicator(result.first); + if (result.second && !alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Queue::setAlternateExchange, result.first, _1)); } - else return boost::shared_ptr<Queue>(); + return qr; } -boost::shared_ptr<Exchange> BrokerReplicator::createExchange( +BrokerReplicator::CreateExchangeResult BrokerReplicator::createExchange( const std::string& name, const std::string& type, bool durable, const qpid::framing::FieldTable& args, const std::string& alternateExchange) { - std::pair<boost::shared_ptr<Exchange>, bool> result = + CreateExchangeResult result = broker.createExchange( name, type, @@ -605,15 +841,12 @@ boost::shared_ptr<Exchange> BrokerReplicator::createExchange( args, userId, remoteHost); - if (result.second) { - alternates.addExchange(result.first); - if (!alternateExchange.empty()) { - alternates.setAlternate( - alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); - } - return result.first; + alternates.addExchange(result.first); + if (!alternateExchange.empty()) { + alternates.setAlternate( + alternateExchange, boost::bind(&Exchange::setAlternate, result.first, _1)); } - else return boost::shared_ptr<Exchange>(); + return result; } bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*, qpid::broker::AsyncStore* const) { return false; } @@ -626,4 +859,61 @@ void BrokerReplicator::write(char* /*target*/) {} string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } +void BrokerReplicator::autoDeleteCheck(boost::shared_ptr<Exchange> ex) { + boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex)); + if (!qr) return; + assert(qr); + if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) { + if (qr->getQueue()->getSettings().autoDeleteDelay) { + // Start the auto-delete timer + Queue::tryAutoDelete(broker, qr->getQueue(), remoteHost, userId); + } + else { + // Delete immediately. Don't purge, the primary is gone so we need + // to reroute the deleted messages. + deleteQueue(qr->getQueue()->getName(), false); + } + } +} + +// Callback function for accumulating exchange candidates +namespace { + void exchangeAccumulatorCallback(vector<boost::shared_ptr<Exchange> >& c, const Exchange::shared_ptr& i) { + c.push_back(i); + } +} + +void BrokerReplicator::disconnected() { + QPID_LOG(info, logPrefix << "Disconnected from " << primary); + connection = 0; + // Clean up auto-delete queues + vector<boost::shared_ptr<Exchange> > collect; + // Make a copy so we can work outside the ExchangeRegistry lock + exchanges.eachExchange( + boost::bind(&exchangeAccumulatorCallback, boost::ref(collect), _1)); + for_each(collect.begin(), collect.end(), + boost::bind(&BrokerReplicator::autoDeleteCheck, this, _1)); +} + +void BrokerReplicator::setMembership(const Variant::List& brokers) { + Membership& membership(haBroker.getMembership()); + membership.assign(brokers); + // Check if the primary has signalled a change in my status: + // from CATCHUP to READY when we are caught up. + // from READY TO CATCHUP if we are timed out during fail-over. + BrokerInfo info; + if (membership.get(membership.getSelf(), info)) { + BrokerStatus oldStatus = haBroker.getStatus(); + BrokerStatus newStatus = info.getStatus(); + if (oldStatus == CATCHUP && newStatus == READY) { + QPID_LOG(info, logPrefix << logPrefix << "Caught-up and ready"); + haBroker.getMembership().setStatus(READY); + } + else if (oldStatus == READY && newStatus == CATCHUP) { + QPID_LOG(info, logPrefix << logPrefix << "No longer ready, catching up"); + haBroker.getMembership().setStatus(CATCHUP); + } + } +} + }} // namespace broker diff --git a/cpp/src/qpid/ha/BrokerReplicator.h b/cpp/src/qpid/ha/BrokerReplicator.h index f6983e8719..a42d607263 100644 --- a/cpp/src/qpid/ha/BrokerReplicator.h +++ b/cpp/src/qpid/ha/BrokerReplicator.h @@ -31,6 +31,7 @@ #include "qpid/management/ManagementObject.h" #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> +#include <set> namespace qpid { @@ -40,6 +41,9 @@ class Broker; class Link; class Bridge; class SessionHandler; +class Connection; +class QueueRegistry; +class ExchangeRegistry; } namespace framing { @@ -58,7 +62,9 @@ class QueueReplicator; * exchanges and bindings to replicate the primary. * It also creates QueueReplicators for newly replicated queues. * - * THREAD UNSAFE: Only called in Link connection thread, no need for locking. + * THREAD UNSAFE: + * All members except shutdown are only called in the Link's connection thread context. + * shutdown() does not use any mutable state. * */ class BrokerReplicator : public broker::Exchange, @@ -76,6 +82,7 @@ class BrokerReplicator : public broker::Exchange, bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const store); void route(broker::Deliverable&); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); + void shutdown(); // DataSource interface - used to write persistence data to async store uint64_t getSize(); @@ -83,8 +90,20 @@ class BrokerReplicator : public broker::Exchange, private: typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; + typedef std::pair<boost::shared_ptr<broker::Queue>, bool> CreateQueueResult; + typedef std::pair<boost::shared_ptr<broker::Exchange>, bool> CreateExchangeResult; - void initializeBridge(broker::Bridge&, broker::SessionHandler&); + typedef std::pair<std::string,std::string> EventKey; + typedef void (BrokerReplicator::*DispatchFunction)(types::Variant::Map&); + typedef std::map<EventKey, DispatchFunction> EventDispatchMap; + + typedef std::map<std::string, QueueReplicatorPtr> QueueReplicatorMap; + + class UpdateTracker; + class ErrorListener; + class ConnectionObserver; + + void connected(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); @@ -93,6 +112,7 @@ class BrokerReplicator : public broker::Exchange, void doEventBind(types::Variant::Map&); void doEventUnbind(types::Variant::Map&); void doEventMembersUpdate(types::Variant::Map&); + void doEventSubscribe(types::Variant::Map&); void doResponseQueue(types::Variant::Map& values); void doResponseExchange(types::Variant::Map& values); @@ -100,32 +120,50 @@ class BrokerReplicator : public broker::Exchange, void doResponseHaBroker(types::Variant::Map& values); QueueReplicatorPtr findQueueReplicator(const std::string& qname); - void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); - void stopQueueReplicator(const std::string& name); + QueueReplicatorPtr startQueueReplicator(const boost::shared_ptr<broker::Queue>&); - boost::shared_ptr<broker::Queue> createQueue( + QueueReplicatorPtr replicateQueue( const std::string& name, bool durable, bool autodelete, const qpid::framing::FieldTable& arguments, const std::string& alternateExchange); - boost::shared_ptr<broker::Exchange> createExchange( + CreateExchangeResult createExchange( const std::string& name, const std::string& type, bool durable, const qpid::framing::FieldTable& args, const std::string& alternateExchange); + bool deactivate(boost::shared_ptr<broker::Exchange> ex, bool destroy); + void deleteQueue(const std::string& name, bool purge=true); + void deleteExchange(const std::string& name); + + void autoDeleteCheck(boost::shared_ptr<broker::Exchange>); + + void disconnected(); + + void setMembership(const types::Variant::List&); // Set membership from list. + std::string logPrefix; - std::string userId, remoteHost; ReplicationTest replicationTest; + std::string userId, remoteHost; HaBroker& haBroker; broker::Broker& broker; + broker::ExchangeRegistry& exchanges; + broker::QueueRegistry& queues; boost::shared_ptr<broker::Link> link; bool initialized; AlternateExchangeSetter alternates; qpid::Address primary; + typedef std::set<std::string> StringSet; + StringSet replicatedExchanges; // exchanges that have been replicated. + broker::Connection* connection; + EventDispatchMap dispatch; + std::auto_ptr<UpdateTracker> queueTracker; + std::auto_ptr<UpdateTracker> exchangeTracker; + boost::shared_ptr<ConnectionObserver> connectionObserver; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/ha/ConnectionObserver.cpp b/cpp/src/qpid/ha/ConnectionObserver.cpp index 81ba3e4301..775222efd3 100644 --- a/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -30,7 +30,7 @@ namespace qpid { namespace ha { ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid) - : haBroker(hb), logPrefix("Connections: "), self(uuid) {} + : haBroker(hb), logPrefix("Backup: "), self(uuid) {} bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) { framing::FieldTable ft; @@ -41,9 +41,11 @@ bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, Bro return false; } -void ConnectionObserver::setObserver(const ObserverPtr& o){ +void ConnectionObserver::setObserver(const ObserverPtr& o, const std::string& newlogPrefix) +{ sys::Mutex::ScopedLock l(lock); observer = o; + logPrefix = newlogPrefix; } ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() { diff --git a/cpp/src/qpid/ha/ConnectionObserver.h b/cpp/src/qpid/ha/ConnectionObserver.h index e3a6d1154a..5374660dbe 100644 --- a/cpp/src/qpid/ha/ConnectionObserver.h +++ b/cpp/src/qpid/ha/ConnectionObserver.h @@ -55,7 +55,7 @@ class ConnectionObserver : public broker::ConnectionObserver ConnectionObserver(HaBroker& haBroker, const types::Uuid& self); - void setObserver(const ObserverPtr&); + void setObserver(const ObserverPtr&, const std::string& logPrefix); ObserverPtr getObserver(); void opened(broker::Connection& connection); diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index ffbcb684bc..590db7efa5 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -26,6 +26,7 @@ #include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "Settings.h" +#include "StandAlone.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" @@ -41,7 +42,6 @@ #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h" -#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include "qpid/log/Statement.h" #include <boost/shared_ptr.hpp> @@ -54,134 +54,100 @@ using namespace std; using types::Variant; using types::Uuid; using sys::Mutex; +using boost::shared_ptr; +using boost::dynamic_pointer_cast; // Called in Plugin::earlyInitialize HaBroker::HaBroker(broker::Broker& b, const Settings& s) - : logPrefix("Broker: "), - broker(b), - systemId(broker.getSystem()->getSystemId().data()), + : systemId(b.getSystem()->getSystemId().data()), settings(s), + broker(b), observer(new ConnectionObserver(*this, systemId)), - mgmtObject(0), - status(STANDALONE), - membership(systemId), - replicationTest(s.replicateDefault.get()) + role(new StandAlone), + membership(BrokerInfo(systemId, STANDALONE), *this) { // If we are joining a cluster we must start excluding clients now, // otherwise there's a window for a client to connect before we get to // initialize() if (settings.cluster) { - QPID_LOG(debug, logPrefix << "Rejecting client connections."); - observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>( - new BackupConnectionExcluder)); + QPID_LOG(debug, "Broker startup, rejecting client connections."); + shared_ptr<broker::ConnectionObserver> excluder(new BackupConnectionExcluder); + observer->setObserver(excluder, "Backup: "); broker.getConnectionObservers().add(observer); } } +namespace { +const std::string NONE("none"); +bool isNone(const std::string& x) { return x.empty() || x == NONE; } +} + // Called in Plugin::initialize void HaBroker::initialize() { - // FIXME aconway 2012-07-19: assumes there's a TCP transport with a meaningful port. - brokerInfo = BrokerInfo( - broker.getSystem()->getNodeName(), - broker.getPort(broker::Broker::TCP_TRANSPORT), - systemId); - - QPID_LOG(notice, logPrefix << "Initializing: " << brokerInfo); + membership.add( + BrokerInfo( + membership.getSelf(), + settings.cluster ? JOINING : membership.getStatus(), + broker.getSystem()->getNodeName(), + broker.getPort(broker::Broker::TCP_TRANSPORT) + ) + ); + QPID_LOG(notice, "Initializing: " << membership.getInfo()); // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); if (settings.cluster && !ma) throw Exception("Cannot start HA: management is disabled"); _qmf::Package packageInit(ma); - mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); + mgmtObject = _qmf::HaBroker::shared_ptr(new _qmf::HaBroker(ma, this, "ha-broker")); mgmtObject->set_replicateDefault(settings.replicateDefault.str()); mgmtObject->set_systemId(systemId); ma->addObject(mgmtObject); + membership.setMgmtObject(mgmtObject); // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( - boost::shared_ptr<ReplicatingSubscription::Factory>( + shared_ptr<ReplicatingSubscription::Factory>( new ReplicatingSubscription::Factory())); // If we are in a cluster, start as backup in joining state. if (settings.cluster) { - status = JOINING; - backup.reset(new Backup(*this, settings)); + assert(membership.getStatus() == JOINING); + role.reset(new Backup(*this, settings)); broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); + if (!isNone(settings.publicUrl)) setPublicUrl(Url(settings.publicUrl)); + if (!isNone(settings.brokerUrl)) setBrokerUrl(Url(settings.brokerUrl)); } - - if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl)); - if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl)); - - - // NOTE: lock is not needed in a constructor, but create one - // to pass to functions that have a ScopedLock parameter. - Mutex::ScopedLock l(lock); - statusChanged(l); } HaBroker::~HaBroker() { - QPID_LOG(notice, logPrefix << "Shut down: " << brokerInfo); + QPID_LOG(notice, role->getLogPrefix() << "Shut down"); broker.getConnectionObservers().remove(observer); } -void HaBroker::recover() { - auto_ptr<Backup> b; - { - Mutex::ScopedLock l(lock); - // No longer replicating, close link. Note: link must be closed before we - // setStatus(RECOVERING) as that will remove our broker info from the - // outgoing link properties so we won't recognize self-connects. - b = backup; - } - b.reset(); // Call destructor outside of lock. - BrokerInfo::Set backups; - { - Mutex::ScopedLock l(lock); - setStatus(RECOVERING, l); - backups = membership.otherBackups(); - membership.reset(brokerInfo); - // Drop the lock, new Primary may call back on activate. - } - // Outside of lock, may call back on activate() - primary.reset(new Primary(*this, backups)); // Starts primary-ready check. -} - -// Called back from Primary active check. -void HaBroker::activate() { setStatus(ACTIVE); } - Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { - switch (getStatus()) { - case JOINING: recover(); break; - case CATCHUP: - QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); - throw Exception("Still catching up, cannot be promoted."); - break; - case READY: recover(); break; - case RECOVERING: break; - case ACTIVE: break; - case STANDALONE: break; - } - break; + Role* r = role->promote(); + if (r) role.reset(r); + break; } case _qmf::HaBroker::METHOD_SETBROKERSURL: { setBrokerUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetBrokersUrl&>(args).i_url)); break; } case _qmf::HaBroker::METHOD_SETPUBLICURL: { - setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url)); + setPublicUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url)); break; } case _qmf::HaBroker::METHOD_REPLICATE: { _qmf::ArgsHaBrokerReplicate& bq_args = dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); - QPID_LOG(debug, logPrefix << "Replicate individual queue " + QPID_LOG(debug, role->getLogPrefix() << "Replicate individual queue " << bq_args.i_queue << " from " << bq_args.i_broker); - boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); + shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); Url url(bq_args.i_broker); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; Uuid uuid(true); @@ -191,10 +157,10 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, false, // durable settings.mechanism, settings.username, settings.password, false); // no amq.failover - don't want to use client URL. - boost::shared_ptr<broker::Link> link = result.first; + shared_ptr<broker::Link> link = result.first; link->setUrl(url); // Create a queue replicator - boost::shared_ptr<QueueReplicator> qr( + shared_ptr<QueueReplicator> qr( new QueueReplicator(*this, queue, link)); qr->activate(); broker.getExchanges().registerExchange(qr); @@ -207,31 +173,23 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, return Manageable::STATUS_OK; } -void HaBroker::setClientUrl(const Url& url) { +void HaBroker::setPublicUrl(const Url& url) { Mutex::ScopedLock l(lock); - if (url.empty()) throw Exception("Invalid empty URL for HA client failover"); - clientUrl = url; - updateClientUrl(l); -} - -void HaBroker::updateClientUrl(Mutex::ScopedLock&) { - Url url = clientUrl.empty() ? brokerUrl : clientUrl; - if (url.empty()) throw Url::Invalid("HA client URL is empty"); + publicUrl = url; mgmtObject->set_publicUrl(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, logPrefix << "Setting client URL to: " << url); + QPID_LOG(debug, role->getLogPrefix() << "Setting public URL to: " << url); } void HaBroker::setBrokerUrl(const Url& url) { - Mutex::ScopedLock l(lock); - if (url.empty()) throw Url::Invalid("HA broker URL is empty"); - brokerUrl = url; - mgmtObject->set_brokersUrl(brokerUrl.str()); - QPID_LOG(info, logPrefix << "Broker URL set to: " << url); - if (backup.get()) backup->setBrokerUrl(brokerUrl); - // Updating broker URL also updates defaulted client URL: - if (clientUrl.empty()) updateClientUrl(l); + { + Mutex::ScopedLock l(lock); + brokerUrl = url; + mgmtObject->set_brokersUrl(brokerUrl.str()); + QPID_LOG(info, role->getLogPrefix() << "Brokers URL set to: " << url); + } + role->setBrokerUrl(url); // Oustside lock } std::vector<Url> HaBroker::getKnownBrokers() const { @@ -239,116 +197,14 @@ std::vector<Url> HaBroker::getKnownBrokers() const { return knownBrokers; } -void HaBroker::shutdown() { - QPID_LOG(critical, logPrefix << "Critical error, shutting down."); +void HaBroker::shutdown(const std::string& message) { + QPID_LOG(critical, message); broker.shutdown(); + throw Exception(message); } BrokerStatus HaBroker::getStatus() const { - Mutex::ScopedLock l(lock); - return status; -} - -void HaBroker::setStatus(BrokerStatus newStatus) { - Mutex::ScopedLock l(lock); - setStatus(newStatus, l); -} - -namespace { -bool checkTransition(BrokerStatus from, BrokerStatus to) { - // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. - static const BrokerStatus TRANSITIONS[][2] = { - { JOINING, CATCHUP }, // Connected to primary - { JOINING, RECOVERING }, // Chosen as initial primary. - { CATCHUP, READY }, // Caught up all queues, ready to take over. - { READY, RECOVERING }, // Chosen as new primary - { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. - { RECOVERING, ACTIVE } // All expected backups are ready - }; - static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); - for (size_t i = 0; i < N; ++i) { - if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) - return true; - } - return false; -} -} // namespace - -void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) { - QPID_LOG(info, logPrefix << "Status change: " - << printable(status) << " -> " << printable(newStatus)); - bool legal = checkTransition(status, newStatus); - assert(legal); - if (!legal) { - QPID_LOG(critical, logPrefix << "Illegal state transition: " - << printable(status) << " -> " << printable(newStatus)); - shutdown(); - } - status = newStatus; - statusChanged(l); -} - -void HaBroker::statusChanged(Mutex::ScopedLock& l) { - mgmtObject->set_status(printable(status).str()); - brokerInfo.setStatus(status); - setLinkProperties(l); -} - -void HaBroker::membershipUpdated(Mutex::ScopedLock&) { - QPID_LOG(info, logPrefix << "Membership changed: " << membership); - Variant::List brokers = membership.asList(); - mgmtObject->set_members(brokers); - broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); -} - -void HaBroker::setMembership(const Variant::List& brokers) { - Mutex::ScopedLock l(lock); - membership.assign(brokers); - QPID_LOG(info, logPrefix << "Membership update: " << membership); - BrokerInfo info; - // Update my status to what the primary says it is. The primary can toggle - // status between READY and CATCHUP based on the state of our subscriptions. - if (membership.get(systemId, info) && status != info.getStatus()) { - setStatus(info.getStatus(), l); - if (backup.get()) backup->setStatus(status); - } - membershipUpdated(l); -} - -void HaBroker::resetMembership(const BrokerInfo& b) { - Mutex::ScopedLock l(lock); - membership.reset(b); - QPID_LOG(debug, logPrefix << "Membership reset to: " << membership); - membershipUpdated(l); -} - -void HaBroker::addBroker(const BrokerInfo& b) { - Mutex::ScopedLock l(lock); - membership.add(b); - QPID_LOG(debug, logPrefix << "Membership add: " << b); - membershipUpdated(l); -} - -void HaBroker::removeBroker(const Uuid& id) { - Mutex::ScopedLock l(lock); - membership.remove(id); - QPID_LOG(debug, logPrefix << "Membership remove: " << id); - membershipUpdated(l); -} - -void HaBroker::setLinkProperties(Mutex::ScopedLock&) { - framing::FieldTable linkProperties = broker.getLinkClientProperties(); - if (isBackup(status)) { - // If this is a backup then any outgoing links are backup - // links and need to be tagged. - linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable()); - } - else { - // If this is a primary then any outgoing links are federation links - // and should not be tagged. - linkProperties.erase(ConnectionObserver::BACKUP_TAG); - } - broker.setLinkClientProperties(linkProperties); + return membership.getStatus(); } }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/HaBroker.h b/cpp/src/qpid/ha/HaBroker.h index 7dabe6e35b..6b15c88e0a 100644 --- a/cpp/src/qpid/ha/HaBroker.h +++ b/cpp/src/qpid/ha/HaBroker.h @@ -25,14 +25,12 @@ #include "BrokerInfo.h" #include "Membership.h" #include "types.h" -#include "ReplicationTest.h" #include "Settings.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" #include "qmf/org/apache/qpid/ha/HaBroker.h" #include "qpid/management/Manageable.h" #include "qpid/types/Variant.h" -#include <memory> #include <set> #include <boost/shared_ptr.hpp> @@ -54,11 +52,15 @@ namespace ha { class Backup; class ConnectionObserver; class Primary; - +class Role; /** * HA state and actions associated with a HA broker. Holds all the management info. * * THREAD SAFE: may be called in arbitrary broker IO or timer threads. + + * NOTE: HaBroker and Role subclasses follow this lock hierarchy: + * - HaBroker MUST NOT hold its own lock across calls Role subclasses. + * - Role subclasses MAY hold their locks accross calls to HaBroker. */ class HaBroker : public management::Manageable { @@ -71,66 +73,46 @@ class HaBroker : public management::Manageable void initialize(); // Implement Manageable. - qpid::management::ManagementObject* GetManagementObject() const { return mgmtObject; } + qpid::management::ManagementObject::shared_ptr GetManagementObject() const { return mgmtObject; } management::Manageable::status_t ManagementMethod ( uint32_t methodId, management::Args& args, std::string& text); broker::Broker& getBroker() { return broker; } const Settings& getSettings() const { return settings; } - /** Shut down the broker. Caller should log a critical error message. */ - void shutdown(); + /** Shut down the broker because of a critical error. */ + void shutdown(const std::string& message); BrokerStatus getStatus() const; - void setStatus(BrokerStatus); - void activate(); - - Backup* getBackup() { return backup.get(); } - ReplicationTest getReplicationTest() const { return replicationTest; } - boost::shared_ptr<ConnectionObserver> getObserver() { return observer; } - const BrokerInfo& getBrokerInfo() const { return brokerInfo; } - - void setMembership(const types::Variant::List&); // Set membership from list. - void resetMembership(const BrokerInfo& b); // Reset to contain just one member. - void addBroker(const BrokerInfo& b); // Add a broker to the membership. - void removeBroker(const types::Uuid& id); // Remove a broker from membership. - + BrokerInfo getBrokerInfo() const { return membership.getInfo(); } + Membership& getMembership() { return membership; } types::Uuid getSystemId() const { return systemId; } private: - void setClientUrl(const Url&); + + void setPublicUrl(const Url&); void setBrokerUrl(const Url&); void updateClientUrl(sys::Mutex::ScopedLock&); - bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); } - - void setStatus(BrokerStatus, sys::Mutex::ScopedLock&); - void recover(); - void statusChanged(sys::Mutex::ScopedLock&); - void setLinkProperties(sys::Mutex::ScopedLock&); - std::vector<Url> getKnownBrokers() const; - void membershipUpdated(sys::Mutex::ScopedLock&); - - std::string logPrefix; - broker::Broker& broker; - types::Uuid systemId; + // Immutable members + const types::Uuid systemId; const Settings settings; + // Member variables protected by lock mutable sys::Mutex lock; - boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary - std::auto_ptr<Backup> backup; - std::auto_ptr<Primary> primary; - qmf::org::apache::qpid::ha::HaBroker* mgmtObject; - Url clientUrl, brokerUrl; + Url publicUrl, brokerUrl; std::vector<Url> knownBrokers; - BrokerStatus status; - BrokerInfo brokerInfo; + + // Independently thread-safe member variables + broker::Broker& broker; + qmf::org::apache::qpid::ha::HaBroker::shared_ptr mgmtObject; + boost::shared_ptr<ConnectionObserver> observer; // Used by Backup and Primary + boost::shared_ptr<Role> role; Membership membership; - ReplicationTest replicationTest; }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/HaPlugin.cpp b/cpp/src/qpid/ha/HaPlugin.cpp index f7fe553d9b..d26b466847 100644 --- a/cpp/src/qpid/ha/HaPlugin.cpp +++ b/cpp/src/qpid/ha/HaPlugin.cpp @@ -33,9 +33,11 @@ struct Options : public qpid::Options { addOptions() ("ha-cluster", optValue(settings.cluster, "yes|no"), "Join a HA active/passive cluster.") + ("ha-queue-replication", optValue(settings.queueReplication, "yes|no"), + "Enable replication of specific queues without joining a cluster") ("ha-brokers-url", optValue(settings.brokerUrl,"URL"), "URL with address of each broker in the cluster.") - ("ha-public-url", optValue(settings.clientUrl,"URL"), + ("ha-public-url", optValue(settings.publicUrl,"URL"), "URL advertized to clients to connect to the cluster.") ("ha-replicate", optValue(settings.replicateDefault, "LEVEL"), @@ -48,6 +50,10 @@ struct Options : public qpid::Options { "Authentication mechanism for connections between HA brokers") ("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"), "Maximum time to wait for an expected backup to connect and become ready.") + ("ha-flow-messages", optValue(settings.flowMessages, "N"), + "Flow control message count limit for replication, 0 means no limit") + ("ha-flow-bytes", optValue(settings.flowBytes, "N"), + "Flow control byte limit for replication, 0 means no limit") ; } }; @@ -64,17 +70,23 @@ struct HaPlugin : public Plugin { void earlyInitialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker) { - // Must create the HaBroker in earlyInitialize so it can set up its - // connection observer before clients start connecting. - haBroker.reset(new ha::HaBroker(*broker, settings)); - broker->addFinalizer(boost::bind(&HaPlugin::finalize, this)); + if (broker && (settings.cluster || settings.queueReplication)) { + if (!broker->getManagementAgent()) { + QPID_LOG(info, "HA plugin disabled because management is disabled"); + if (settings.cluster) + throw Exception("Cannot start HA: management is disabled"); + } else { + // Must create the HaBroker in earlyInitialize so it can set up its + // connection observer before clients start connecting. + haBroker.reset(new ha::HaBroker(*broker, settings)); + broker->addFinalizer(boost::bind(&HaPlugin::finalize, this)); + } } } void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker) haBroker->initialize(); + if (broker && haBroker.get()) haBroker->initialize(); } void finalize() { diff --git a/cpp/src/qpid/ha/Membership.cpp b/cpp/src/qpid/ha/Membership.cpp index 74580f9b1e..6c64d86fd7 100644 --- a/cpp/src/qpid/ha/Membership.cpp +++ b/cpp/src/qpid/ha/Membership.cpp @@ -19,6 +19,12 @@ * */ #include "Membership.h" +#include "HaBroker.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" +#include "qpid/types/Variant.h" +#include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" +#include "qmf/org/apache/qpid/ha/HaBroker.h" #include <boost/bind.hpp> #include <iostream> #include <iterator> @@ -26,37 +32,58 @@ namespace qpid { namespace ha { +namespace _qmf = ::qmf::org::apache::qpid::ha; -void Membership::reset(const BrokerInfo& b) { +using sys::Mutex; +using types::Variant; + +Membership::Membership(const BrokerInfo& info, HaBroker& b) + : haBroker(b), self(info.getSystemId()) +{ + brokers[self] = info; +} + +void Membership::clear() { + Mutex::ScopedLock l(lock); + BrokerInfo me = brokers[self]; brokers.clear(); - brokers[b.getSystemId()] = b; + brokers[self] = me; } void Membership::add(const BrokerInfo& b) { + Mutex::ScopedLock l(lock); brokers[b.getSystemId()] = b; + update(l); } void Membership::remove(const types::Uuid& id) { + Mutex::ScopedLock l(lock); + if (id == self) return; // Never remove myself BrokerInfo::Map::iterator i = brokers.find(id); if (i != brokers.end()) { brokers.erase(i); - } + update(l); + } } bool Membership::contains(const types::Uuid& id) { + Mutex::ScopedLock l(lock); return brokers.find(id) != brokers.end(); } void Membership::assign(const types::Variant::List& list) { - brokers.clear(); + Mutex::ScopedLock l(lock); + clear(); for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { BrokerInfo b(i->asMap()); brokers[b.getSystemId()] = b; } + update(l); } types::Variant::List Membership::asList() const { + Mutex::ScopedLock l(lock); types::Variant::List list; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); @@ -64,6 +91,7 @@ types::Variant::List Membership::asList() const { } BrokerInfo::Set Membership::otherBackups() const { + Mutex::ScopedLock l(lock); BrokerInfo::Set result; for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (i->second.getStatus() == READY && i->second.getSystemId() != self) @@ -71,15 +99,84 @@ BrokerInfo::Set Membership::otherBackups() const { return result; } -bool Membership::get(const types::Uuid& id, BrokerInfo& result) { - BrokerInfo::Map::iterator i = brokers.find(id); +bool Membership::get(const types::Uuid& id, BrokerInfo& result) const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(id); if (i == brokers.end()) return false; result = i->second; return true; } -std::ostream& operator<<(std::ostream& o, const Membership& members) { - return o << members.brokers; +void Membership::update(Mutex::ScopedLock& l) { + QPID_LOG(info, "Membership: " << brokers); + Variant::List brokers = asList(); + if (mgmtObject) mgmtObject->set_status(printable(getStatus(l)).str()); + if (mgmtObject) mgmtObject->set_members(brokers); + haBroker.getBroker().getManagementAgent()->raiseEvent( + _qmf::EventMembersUpdate(brokers)); +} + +void Membership::setMgmtObject(boost::shared_ptr<_qmf::HaBroker> mo) { + Mutex::ScopedLock l(lock); + mgmtObject = mo; + update(l); +} + + +namespace { +bool checkTransition(BrokerStatus from, BrokerStatus to) { + // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. + static const BrokerStatus TRANSITIONS[][2] = { + { STANDALONE, JOINING }, // Initialization of backup broker + { JOINING, CATCHUP }, // Connected to primary + { JOINING, RECOVERING }, // Chosen as initial primary. + { CATCHUP, READY }, // Caught up all queues, ready to take over. + { READY, RECOVERING }, // Chosen as new primary + { READY, CATCHUP }, // Timed out failing over, demoted to catch-up. + { RECOVERING, ACTIVE } // All expected backups are ready + }; + static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); + for (size_t i = 0; i < N; ++i) { + if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) + return true; + } + return false; +} +} // namespace + +void Membership::setStatus(BrokerStatus newStatus) { + BrokerStatus status = getStatus(); + QPID_LOG(info, "Status change: " + << printable(status) << " -> " << printable(newStatus)); + bool legal = checkTransition(status, newStatus); + if (!legal) { + haBroker.shutdown(QPID_MSG("Illegal state transition: " << printable(status) + << " -> " << printable(newStatus))); + } + + Mutex::ScopedLock l(lock); + brokers[self].setStatus(newStatus); + if (mgmtObject) mgmtObject->set_status(printable(newStatus).str()); + update(l); +} + +BrokerStatus Membership::getStatus() const { + Mutex::ScopedLock l(lock); + return getStatus(l); +} + +BrokerStatus Membership::getStatus(sys::Mutex::ScopedLock&) const { + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second.getStatus(); +} + +BrokerInfo Membership::getInfo() const { + Mutex::ScopedLock l(lock); + BrokerInfo::Map::const_iterator i = brokers.find(self); + assert(i != brokers.end()); + return i->second; } +// FIXME aconway 2013-01-23: move to .h? }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/Membership.h b/cpp/src/qpid/ha/Membership.h index 8406dccd5d..956569fbd8 100644 --- a/cpp/src/qpid/ha/Membership.h +++ b/cpp/src/qpid/ha/Membership.h @@ -24,45 +24,72 @@ #include "BrokerInfo.h" #include "types.h" -#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Mutex.h" #include "qpid/types/Variant.h" #include <boost/function.hpp> #include <set> #include <vector> #include <iosfwd> + +namespace qmf { namespace org { namespace apache { namespace qpid { namespace ha { +class HaBroker; +}}}}} + namespace qpid { + +namespace broker { +class Broker; +} + +namespace types { +class Uuid; +} + namespace ha { +class HaBroker; /** * Keep track of the brokers in the membership. - * THREAD UNSAFE: caller must serialize + * Send management when events on membership changes. + * THREAD SAFE */ class Membership { public: - Membership(const types::Uuid& self_) : self(self_) {} + Membership(const BrokerInfo& info, HaBroker&); - void reset(const BrokerInfo& b); ///< Reset to contain just one member. + void setMgmtObject(boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker>); + + void clear(); ///< Clear all but self. void add(const BrokerInfo& b); void remove(const types::Uuid& id); bool contains(const types::Uuid& id); + /** Return IDs of all READY backups other than self */ BrokerInfo::Set otherBackups() const; void assign(const types::Variant::List&); types::Variant::List asList() const; - bool get(const types::Uuid& id, BrokerInfo& result); + bool get(const types::Uuid& id, BrokerInfo& result) const; + + types::Uuid getSelf() const { return self; } + BrokerInfo getInfo() const; + BrokerStatus getStatus() const; + void setStatus(BrokerStatus s); private: - types::Uuid self; + void update(sys::Mutex::ScopedLock&); + BrokerStatus getStatus(sys::Mutex::ScopedLock&) const; + + mutable sys::Mutex lock; + HaBroker& haBroker; + boost::shared_ptr<qmf::org::apache::qpid::ha::HaBroker> mgmtObject; + const types::Uuid self; BrokerInfo::Map brokers; - friend std::ostream& operator<<(std::ostream&, const Membership&); }; -std::ostream& operator<<(std::ostream&, const Membership&); - }} // namespace qpid::ha #endif /*!QPID_HA_MEMBERSHIP_H*/ diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index e4bf9671b8..93dbbbea85 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -31,6 +31,8 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include "qpid/sys/Timer.h" #include <boost/bind.hpp> @@ -39,6 +41,8 @@ namespace qpid { namespace ha { using sys::Mutex; +using namespace std; +using namespace framing; namespace { @@ -58,6 +62,8 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver PrimaryConfigurationObserver(Primary& p) : primary(p) {} void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); } void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } + void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } + void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); } private: Primary& primary; }; @@ -76,8 +82,11 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary* Primary::instance = 0; Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : - haBroker(hb), logPrefix("Primary: "), active(false) + haBroker(hb), membership(hb.getMembership()), + logPrefix("Primary: "), active(false), + replicationTest(hb.getSettings().replicateDefault.get()) { + hb.getMembership().setStatus(RECOVERING); assert(instance == 0); instance = this; // Let queue replicators find us. if (expect.empty()) { @@ -89,11 +98,10 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : // the QueueGuards are created. QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { - boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(*i, haBroker.getReplicationTest(), false)); + boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); - backup->setInitialQueues(hb.getBroker().getQueues(), true); // Create guards + backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards } // Set timeout for expected brokers to connect and become ready. sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC)); @@ -102,14 +110,21 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : hb.getBroker().getTimer().add(timerTask); } + + // Remove backup tag property from outgoing link properties. + framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties(); + linkProperties.erase(ConnectionObserver::BACKUP_TAG); + hb.getBroker().setLinkClientProperties(linkProperties); + configurationObserver.reset(new PrimaryConfigurationObserver(*this)); haBroker.getBroker().getConfigurationObservers().add(configurationObserver); Mutex::ScopedLock l(lock); // We are now active as a configurationObserver checkReady(l); + // Allow client connections connectionObserver.reset(new PrimaryConnectionObserver(*this)); - haBroker.getObserver()->setObserver(connectionObserver); + haBroker.getObserver()->setObserver(connectionObserver, logPrefix); } Primary::~Primary() { @@ -122,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLock&) { active = true; Mutex::ScopedUnlock u(lock); // Don't hold lock across callback QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active."); - haBroker.activate(); + membership.setStatus(ACTIVE); } } @@ -130,7 +145,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { if (i != backups.end() && i->second->reportReady()) { BrokerInfo info = i->second->getBrokerInfo(); info.setStatus(READY); - haBroker.addBroker(info); + membership.add(info); if (expectedBackups.erase(i->second)) { QPID_LOG(info, logPrefix << "Expected backup is ready: " << info); checkReady(l); @@ -155,9 +170,10 @@ void Primary::timeoutExpectedBackups() { expectedBackups.erase(i++); backups.erase(info.getSystemId()); rb->cancel(); - // Downgrade the broker to CATCHUP + // Downgrade the broker's status to CATCHUP + // The broker will get this status change when it eventually connects. info.setStatus(CATCHUP); - haBroker.addBroker(info); + membership.add(info); } else ++i; } @@ -178,46 +194,78 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { } } +// NOTE: Called with queue registry lock held. void Primary::queueCreate(const QueuePtr& q) { - // Throw if there is an invalid replication level in the queue settings. - haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings); - Mutex::ScopedLock l(lock); - for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { - i->second->queueCreate(q); - checkReady(i, l); + // Set replication argument. + ReplicateLevel level = replicationTest.useLevel(*q); + QPID_LOG(debug, logPrefix << "Created queue " << q->getName() + << " replication: " << printable(level)); + q->addArgument(QPID_REPLICATE, printable(level).str()); + if (level) { + // Give each queue a unique id to avoid confusion of same-named queues. + q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); + Mutex::ScopedLock l(lock); + for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { + i->second->queueCreate(q); + checkReady(i, l); + } } } +// NOTE: Called with queue registry lock held. void Primary::queueDestroy(const QueuePtr& q) { + QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName()); Mutex::ScopedLock l(lock); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueDestroy(q); checkReady(l); } +// NOTE: Called with exchange registry lock held. +void Primary::exchangeCreate(const ExchangePtr& ex) { + ReplicateLevel level = replicationTest.useLevel(*ex); + QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName() + << " replication: " << printable(level)); + FieldTable args = ex->getArgs(); + args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg. + if (level) { + // Give each exchange a unique id to avoid confusion of same-named exchanges. + args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0]))); + } + ex->setArgs(args); +} + +// NOTE: Called with exchange registry lock held. +void Primary::exchangeDestroy(const ExchangePtr& ex) { + QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName()); + // Do nothing + } + void Primary::opened(broker::Connection& connection) { BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { - QPID_LOG(debug, logPrefix << "New backup connected: " << info); - boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(info, haBroker.getReplicationTest(), true)); + QPID_LOG(info, logPrefix << "New backup connected: " << info); + boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection)); { // Avoid deadlock with queue registry lock. Mutex::ScopedUnlock u(lock); - backup->setInitialQueues(haBroker.getBroker().getQueues(), false); + backup->setCatchupQueues(haBroker.getBroker().getQueues(), false); } backups[info.getSystemId()] = backup; + i = backups.find(info.getSystemId()); } else { - QPID_LOG(debug, logPrefix << "Known backup connected: " << info); - i->second->setConnected(true); - checkReady(i, l); + QPID_LOG(info, logPrefix << "Known backup connected: " << info); + i->second->setConnection(&connection); } - if (info.getStatus() == JOINING) info.setStatus(CATCHUP); - haBroker.addBroker(info); + if (info.getStatus() == JOINING) { + info.setStatus(CATCHUP); + membership.add(info); + } + if (i != backups.end()) checkReady(i, l); } else QPID_LOG(debug, logPrefix << "Accepted client connection " @@ -225,19 +273,20 @@ void Primary::opened(broker::Connection& connection) { } void Primary::closed(broker::Connection& connection) { - // NOTE: It is possible for a backup connection to be rejected while we are - // a backup, but closed() is called after we have become primary. - // - // For this reason we do not remove from the backups map here, the backups - // map holds all the backups we know about whether connected or not. - // - Mutex::ScopedLock l(lock); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - QPID_LOG(debug, logPrefix << "Backup disconnected: " << info); - haBroker.removeBroker(info.getSystemId()); + Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); - if (i != backups.end()) i->second->setConnected(false); + // NOTE: It is possible for a backup connection to be rejected while we + // are a backup, but closed() is called after we have become primary. + // Checking isConnected() lets us ignore such spurious closes. + if (i != backups.end() && i->second->isConnected()) { + QPID_LOG(info, logPrefix << "Backup disconnected: " << info); + membership.remove(info.getSystemId()); + expectedBackups.erase(i->second); + backups.erase(i); + checkReady(l); + } } } @@ -249,4 +298,9 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q); } +Role* Primary::promote() { + QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo()); + return 0; +} + }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h index 26883f4416..ff85837882 100644 --- a/cpp/src/qpid/ha/Primary.h +++ b/cpp/src/qpid/ha/Primary.h @@ -24,6 +24,8 @@ #include "types.h" #include "BrokerInfo.h" +#include "ReplicationTest.h" +#include "Role.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> #include <boost/intrusive_ptr.hpp> @@ -48,6 +50,7 @@ class HaBroker; class ReplicatingSubscription; class RemoteBackup; class QueueGuard; +class Membership; /** * State associated with a primary broker: @@ -56,22 +59,30 @@ class QueueGuard; * * THREAD SAFE: called concurrently in arbitrary connection threads. */ -class Primary +class Primary : public Role { public: typedef boost::shared_ptr<broker::Queue> QueuePtr; + typedef boost::shared_ptr<broker::Exchange> ExchangePtr; static Primary* get() { return instance; } Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups); ~Primary(); + // Role implementation + std::string getLogPrefix() const { return logPrefix; } + Role* promote(); + void setBrokerUrl(const Url&) {} + void readyReplica(const ReplicatingSubscription&); void removeReplica(const std::string& q); // Called via ConfigurationObserver void queueCreate(const QueuePtr&); void queueDestroy(const QueuePtr&); + void exchangeCreate(const ExchangePtr&); + void exchangeDestroy(const ExchangePtr&); // Called via ConnectionObserver void opened(broker::Connection& connection); @@ -91,17 +102,19 @@ class Primary sys::Mutex lock; HaBroker& haBroker; + Membership& membership; std::string logPrefix; bool active; + ReplicationTest replicationTest; + /** * Set of expected backups that must be ready before we declare ourselves - * active + * active. These are backups that were known and ready before the primary + * crashed. As new primary we expect them to re-connect. */ BackupSet expectedBackups; /** - * Map of all the remote backups we know about: any expected backups plus - * all actual backups that have connected. We do not remove entries when a - * backup disconnects. @see Primary::closed() + * Map of all the expected backups plus all connected backups. */ BackupMap backups; boost::shared_ptr<broker::ConnectionObserver> connectionObserver; diff --git a/cpp/src/qpid/ha/QueueGuard.cpp b/cpp/src/qpid/ha/QueueGuard.cpp index 77e1f81a38..d06d88ca29 100644 --- a/cpp/src/qpid/ha/QueueGuard.cpp +++ b/cpp/src/qpid/ha/QueueGuard.cpp @@ -50,10 +50,10 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) - : queue(q), subscription(0) + : cancelled(false), queue(q), subscription(0) { std::ostringstream os; - os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": "; + os << "Primary guard " << queue.getName() << "@" << info << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); queue.addObserver(observer); @@ -66,45 +66,31 @@ QueueGuard::~QueueGuard() { cancel(); } // NOTE: Called with message lock held. void QueueGuard::enqueued(const Message& m) { // Delay completion - QPID_LOG(trace, logPrefix << "Delayed completion of " << m); + QPID_LOG(trace, logPrefix << "Delayed completion of " << m.getSequence()); + Mutex::ScopedLock l(lock); + if (cancelled) return; // Don't record enqueues after we are cancelled. + assert(delayed.find(m.getSequence()) == delayed.end()); + delayed[m.getSequence()] = m.getIngressCompletion(); m.getIngressCompletion()->startCompleter(); - { - Mutex::ScopedLock l(lock); - if (!delayed.insert(Delayed::value_type(m.getSequence(), m.getIngressCompletion())).second) { - QPID_LOG(critical, logPrefix << "Second enqueue for message with sequence " << m.getSequence()); - assert(false); - } - } } // NOTE: Called with message lock held. void QueueGuard::dequeued(const Message& m) { QPID_LOG(trace, logPrefix << "Dequeued " << m); - ReplicatingSubscription* rs=0; - { - Mutex::ScopedLock l(lock); - rs = subscription; - } - if (rs) rs->dequeued(m); - complete(m.getSequence()); -} - -void QueueGuard::completeRange(Delayed::iterator begin, Delayed::iterator end) { - for (Delayed::iterator i = begin; i != end; ++i) { - QPID_LOG(trace, logPrefix << "Completed " << i->first); - i->second->finishCompleter(); - } + Mutex::ScopedLock l(lock); + if (subscription) subscription->dequeued(m); + complete(m.getSequence(), l); } void QueueGuard::cancel() { queue.removeObserver(observer); - Delayed removed; - { - Mutex::ScopedLock l(lock); - if (delayed.empty()) return; // No need if no delayed messages. - delayed.swap(removed); + Mutex::ScopedLock l(lock); + if (cancelled) return; + cancelled = true; + for (Delayed::iterator i = delayed.begin(); i != delayed.end();) { + complete(i, l); + delayed.erase(i++); } - completeRange(removed.begin(), removed.end()); } void QueueGuard::attach(ReplicatingSubscription& rs) { @@ -113,38 +99,36 @@ void QueueGuard::attach(ReplicatingSubscription& rs) { } bool QueueGuard::subscriptionStart(SequenceNumber position) { - Delayed removed; - { - Mutex::ScopedLock l(lock); - // Complete any messages before or at the ReplicatingSubscription start position. - // Those messages are already on the backup. - for (Delayed::iterator i = delayed.begin(); i != delayed.end() && i->first <= position;) { - removed.insert(*i); - delayed.erase(i++); - } + // Complete any messages before or at the ReplicatingSubscription start position. + // Those messages are already on the backup. + Mutex::ScopedLock l(lock); + Delayed::iterator i = delayed.begin(); + while(i != delayed.end() && i->first <= position) { + complete(i, l); + delayed.erase(i++); } - completeRange(removed.begin(), removed.end()); return position >= range.back; } void QueueGuard::complete(SequenceNumber sequence) { - boost::intrusive_ptr<broker::AsyncCompletion> m; - { - Mutex::ScopedLock l(lock); - // The same message can be completed twice, by - // ReplicatingSubscription::acknowledged and dequeued. Remove it - // from the map so we only call finishCompleter() once - Delayed::iterator i = delayed.find(sequence); - if (i != delayed.end()) { - m = i->second; - delayed.erase(i); - } + Mutex::ScopedLock l(lock); + complete(sequence, l); +} +void QueueGuard::complete(SequenceNumber sequence, Mutex::ScopedLock& l) { + // The same message can be completed twice, by + // ReplicatingSubscription::acknowledged and dequeued. Remove it + // from the map so we only call finishCompleter() once + Delayed::iterator i = delayed.find(sequence); + if (i != delayed.end()) { + complete(i, l); + delayed.erase(i); } - if (m) { - QPID_LOG(trace, logPrefix << "Completed " << sequence); - m->finishCompleter(); - } +} + +void QueueGuard::complete(Delayed::iterator i, Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Completed " << i->first); + i->second->finishCompleter(); } }} // namespaces qpid::ha diff --git a/cpp/src/qpid/ha/QueueGuard.h b/cpp/src/qpid/ha/QueueGuard.h index 3904b3bd3f..e7ceb351e8 100644 --- a/cpp/src/qpid/ha/QueueGuard.h +++ b/cpp/src/qpid/ha/QueueGuard.h @@ -54,6 +54,9 @@ class ReplicatingSubscription; * THREAD SAFE: Concurrent calls: * - enqueued() via QueueObserver in arbitrary connection threads. * - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread. + * + * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held + * QueueGuard MAY call ReplicatingSubscription with it's lock held. */ class QueueGuard { public: @@ -104,17 +107,20 @@ class QueueGuard { private: class QueueObserver; + typedef std::map<framing::SequenceNumber, + boost::intrusive_ptr<broker::AsyncCompletion> > Delayed; + + void complete(framing::SequenceNumber, sys::Mutex::ScopedLock &); + void complete(Delayed::iterator, sys::Mutex::ScopedLock &); sys::Mutex lock; + bool cancelled; std::string logPrefix; broker::Queue& queue; - typedef std::map<framing::SequenceNumber, boost::intrusive_ptr<broker::AsyncCompletion> > Delayed; Delayed delayed; ReplicatingSubscription* subscription; boost::shared_ptr<QueueObserver> observer; QueueRange range; - - void completeRange(Delayed::iterator begin, Delayed::iterator end); }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/QueueRange.h b/cpp/src/qpid/ha/QueueRange.h index d734326910..f67ac146e6 100644 --- a/cpp/src/qpid/ha/QueueRange.h +++ b/cpp/src/qpid/ha/QueueRange.h @@ -24,6 +24,7 @@ #include "ReplicatingSubscription.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueCursor.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/SequenceNumber.h" #include <iostream> @@ -51,15 +52,7 @@ struct QueueRange { QueueRange() : front(1), back(0) { } // Empty range. - QueueRange(broker::Queue& q) { - if (ReplicatingSubscription::getFront(q, front)) - back = q.getPosition(); - else { - back = q.getPosition(); - front = back+1; // empty - } - assert(front <= back + 1); - } + QueueRange(broker::Queue& q) { q.getRange(front, back, broker::REPLICATOR); } QueueRange(const framing::FieldTable& args) { back = args.getAsInt(ReplicatingSubscription::QPID_BACK); diff --git a/cpp/src/qpid/ha/QueueReplicator.cpp b/cpp/src/qpid/ha/QueueReplicator.cpp index cac1fdac29..98220b2098 100644 --- a/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/cpp/src/qpid/ha/QueueReplicator.cpp @@ -22,12 +22,15 @@ #include "HaBroker.h" #include "QueueReplicator.h" #include "ReplicatingSubscription.h" +#include "Settings.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionHandler.h" +#include "qpid/broker/SessionHandler.h" #include "qpid/framing/SequenceSet.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" @@ -37,43 +40,89 @@ namespace { const std::string QPID_REPLICATOR_("qpid.replicator-"); const std::string TYPE_NAME("qpid.queue-replicator"); -const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); +const std::string QPID_HA("qpid.ha-"); } namespace qpid { namespace ha { using namespace broker; using namespace framing; +using namespace std; +using sys::Mutex; -const std::string QPID_HA_EVENT_PREFIX("qpid.ha-event:"); -const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); -const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position"); +const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA+"dequeue"); +const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA+"position"); +const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency"); std::string QueueReplicator::replicatorName(const std::string& queueName) { return QPID_REPLICATOR_ + queueName; } +bool QueueReplicator::isReplicatorName(const std::string& name) { + return name.compare(0, QPID_REPLICATOR_.size(), QPID_REPLICATOR_) == 0; +} + bool QueueReplicator::isEventKey(const std::string key) { - const std::string& prefix = QPID_HA_EVENT_PREFIX; + const std::string& prefix = QPID_HA; bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; return ret; } +class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener { + public: + ErrorListener(const std::string& prefix) : logPrefix(prefix) {} + void connectionException(framing::connection::CloseCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Connection error: " << msg); + } + void channelException(framing::session::DetachCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Channel error: " << msg); + } + void executionException(framing::execution::ErrorCode, const std::string& msg) { + QPID_LOG(error, logPrefix << "Execution error: " << msg); + } + void detach() { + QPID_LOG(debug, logPrefix << "Session detached"); + } + private: + std::string logPrefix; +}; + +class QueueReplicator::QueueObserver : public broker::QueueObserver { + public: + QueueObserver(boost::shared_ptr<QueueReplicator> qr) : queueReplicator(qr) {} + void enqueued(const Message&) {} + void dequeued(const Message&) {} + void acquired(const Message&) {} + void requeued(const Message&) {} + void consumerAdded( const Consumer& ) {} + void consumerRemoved( const Consumer& ) {} + // Queue observer is destroyed when the queue is. + void destroy() { queueReplicator->destroy(); } + private: + boost::shared_ptr<QueueReplicator> queueReplicator; +}; + QueueReplicator::QueueReplicator(HaBroker& hb, boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), haBroker(hb), logPrefix("Backup queue "+q->getName()+": "), - queue(q), link(l), brokerInfo(hb.getBrokerInfo()) + queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false), + settings(hb.getSettings()) { + args.setString(QPID_REPLICATE, printable(NONE).str()); Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); + framing::FieldTable args = getArgs(); + args.setString(QPID_REPLICATE, printable(NONE).str()); + setArgs(args); } // This must be separate from the constructor so we can call shared_from_this. void QueueReplicator::activate() { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed std::pair<Bridge::shared_ptr, bool> result = queue->getBroker()->getLinks().declare( bridgeName, @@ -93,48 +142,57 @@ void QueueReplicator::activate() { boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2) ); bridge = result.first; + bridge->setErrorListener( + boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix))); + boost::shared_ptr<QueueObserver> observer(new QueueObserver(shared_from_this())); + queue->addObserver(observer); } -QueueReplicator::~QueueReplicator() { deactivate(); } +QueueReplicator::~QueueReplicator() {} -void QueueReplicator::deactivate() { - // destroy the route - sys::Mutex::ScopedLock l(lock); - if (bridge) { - bridge->close(); - bridge.reset(); - QPID_LOG(debug, logPrefix << "Deactivated bridge " << bridgeName); - } +void QueueReplicator::destroy() { + // Called from Queue::destroyed() + Mutex::ScopedLock l(lock); + if (!bridge) return; + QPID_LOG(debug, logPrefix << "Destroyed."); + bridge->close(); + // Need to drop shared pointers to avoid pointer cycles keeping this in memory. + queue.reset(); + link.reset(); + bridge.reset(); + getBroker()->getExchanges().destroy(getName()); } // Called in a broker connection thread when the bridge is created. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); - FieldTable settings; - settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); - settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? - settings.setInt(ReplicatingSubscription::QPID_BACK, - queue->getPosition()); - settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO, - brokerInfo.asFieldTable()); - SequenceNumber front; - if (ReplicatingSubscription::getFront(*queue, front)) { - settings.setInt(ReplicatingSubscription::QPID_FRONT, front); - QPID_LOG(debug, "QPID_FRONT for " << queue->getName() << " is " << front); + FieldTable arguments; + arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); + arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? + arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition()); + arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable()); + SequenceNumber front, back; + queue->getRange(front, back, broker::REPLICATOR); + if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, front); + try { + peer.getMessage().subscribe( + args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, + false/*exclusive*/, "", 0, arguments); + peer.getMessage().setFlowMode(getName(), 1); // Window + peer.getMessage().flow(getName(), 0, settings.getFlowMessages()); + peer.getMessage().flow(getName(), 1, settings.getFlowBytes()); + } + catch(const exception& e) { + QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << e.what())); + throw; } - peer.getMessage().subscribe( - args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, - false/*exclusive*/, "", 0, settings); - // FIXME aconway 2012-05-22: use a finite credit window? - peer.getMessage().flow(getName(), 0, 0xFFFFFFFF); - peer.getMessage().flow(getName(), 1, 0xFFFFFFFF); - qpid::Address primary; link->getRemoteAddress(primary); QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << bridgeName << ")"); - QPID_LOG(trace, logPrefix << "Subscription settings: " << settings); + QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments); } namespace { @@ -147,17 +205,35 @@ template <class T> T decodeContent(Message& m) { } } -void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) { +void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) { + boost::shared_ptr<Queue> q; + { + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed + q = queue; + } // Thread safe: only calls thread safe Queue functions. queue->dequeueMessageAt(n); } +namespace { +bool getSequence(const Message& message, SequenceNumber& result) { + result = message.getSequence(); + return true; +} +bool getNext(broker::Queue& q, SequenceNumber position, SequenceNumber& result) { + QueueCursor cursor(REPLICATOR); + return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), position+1); +} +} // namespace + // Called in connection thread of the queues bridge to primary. void QueueReplicator::route(Deliverable& msg) { try { const std::string& key = msg.getMessage().getRoutingKey(); - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); + if (!queue) return; // Already destroyed if (!isEventKey(key)) { msg.deliverTo(queue); // We are on a backup so the queue is not modified except via this. @@ -176,16 +252,15 @@ void QueueReplicator::route(Deliverable& msg) << " to " << position); // Verify that there are no messages after the new position in the queue. SequenceNumber next; - if (ReplicatingSubscription::getNext(*queue, position, next)) - throw Exception("Invalid position move, preceeds messages"); + if (getNext(*queue, position, next)) + throw Exception(QPID_MSG(logPrefix << "Invalid position " << position + << " preceeds message at " << next)); queue->setPosition(position); } // Ignore unknown event keys, may be introduced in later versions. } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Replication failed: " << e.what()); - haBroker.shutdown(); - throw; + haBroker.shutdown(QPID_MSG(logPrefix << "Replication failed: " << e.what())); } } diff --git a/cpp/src/qpid/ha/QueueReplicator.h b/cpp/src/qpid/ha/QueueReplicator.h index 8d8a41a5ba..e8a793f611 100644 --- a/cpp/src/qpid/ha/QueueReplicator.h +++ b/cpp/src/qpid/ha/QueueReplicator.h @@ -41,6 +41,7 @@ class Deliverable; namespace ha { class HaBroker; +class Settings; /** * Exchange created on a backup broker to replicate a queue on the primary. @@ -57,7 +58,11 @@ class QueueReplicator : public broker::Exchange, public: static const std::string DEQUEUE_EVENT_KEY; static const std::string POSITION_EVENT_KEY; + static const std::string QPID_SYNC_FREQUENCY; + static std::string replicatorName(const std::string& queueName); + static bool isReplicatorName(const std::string&); + /** Test if a string is an event key */ static bool isEventKey(const std::string key); @@ -68,7 +73,6 @@ class QueueReplicator : public broker::Exchange, ~QueueReplicator(); void activate(); // Call after ctor - void deactivate(); // Call before dtor std::string getType() const; bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*, qpid::broker::AsyncStore* const); @@ -80,8 +84,18 @@ class QueueReplicator : public broker::Exchange, uint64_t getSize(); void write(char* target); + // Set if the queue has ever been subscribed to, used for auto-delete cleanup. + void setSubscribed() { subscribed = true; } + bool isSubscribed() { return subscribed; } + + boost::shared_ptr<broker::Queue> getQueue() const { return queue; } + private: + class ErrorListener; + class QueueObserver; + void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); + void destroy(); // Called when the queue is destroyed. void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&); HaBroker& haBroker; @@ -92,6 +106,8 @@ class QueueReplicator : public broker::Exchange, boost::shared_ptr<broker::Link> link; boost::shared_ptr<broker::Bridge> bridge; BrokerInfo brokerInfo; + bool subscribed; + const Settings& settings; }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp index 3421380940..798ade3f73 100644 --- a/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/cpp/src/qpid/ha/RemoteBackup.cpp @@ -21,6 +21,7 @@ #include "RemoteBackup.h" #include "QueueGuard.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/log/Statement.h" @@ -32,32 +33,45 @@ namespace ha { using sys::Mutex; using boost::bind; -RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) : - logPrefix("Primary: Remote backup "+info.getLogId()+": "), - brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false) -{} +RemoteBackup::RemoteBackup( + const BrokerInfo& info, broker::Connection* c +) : brokerInfo(info), replicationTest(NONE), connection(c), reportedReady(false) +{ + std::ostringstream oss; + oss << "Primary: Remote backup " << info << ": "; + logPrefix = oss.str(); +} -void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards) +void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards) { - QPID_LOG(debug, logPrefix << "Setting initial queues" << (createGuards ? " and guards" : "")); - queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1, createGuards)); + queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards)); + QPID_LOG(debug, logPrefix << "Set " << catchupQueues.size() << " catch-up queues" + << (createGuards ? " and guards" : "")); } RemoteBackup::~RemoteBackup() { cancel(); } void RemoteBackup::cancel() { + QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected") + << " backup: " << brokerInfo); for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i) i->second->cancel(); guards.clear(); + if (connection) { + connection->abort(); + connection = 0; + } } bool RemoteBackup::isReady() { - return connected && initialQueues.empty(); + return connection && catchupQueues.empty(); } -void RemoteBackup::initialQueue(const QueuePtr& q, bool createGuard) { - if (replicationTest.isReplicated(ALL, *q)) { - initialQueues.insert(q); +void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) { + if (replicationTest.getLevel(*q) == ALL) { + QPID_LOG(debug, logPrefix << "Catch-up queue" + << (createGuard ? " and guard" : "") << ": " << q->getName()); + catchupQueues.insert(q); if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo)); } } @@ -88,21 +102,24 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) { } void RemoteBackup::ready(const QueuePtr& q) { - initialQueues.erase(q); - QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() - << QueueSetPrinter(", waiting for: ", initialQueues)); - if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); + catchupQueues.erase(q); + if (catchupQueues.size()) { + QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", " + << catchupQueues.size() << " remain to catch up"); + } + else + QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() ); } -// Called via ConfigurationObserver::queueCreate and from initialQueue +// Called via ConfigurationObserver::queueCreate and from catchupQueue void RemoteBackup::queueCreate(const QueuePtr& q) { - if (replicationTest.isReplicated(ALL, *q)) + if (replicationTest.getLevel(*q) == ALL) guards[q].reset(new QueueGuard(*q, brokerInfo)); } // Called via ConfigurationObserver void RemoteBackup::queueDestroy(const QueuePtr& q) { - initialQueues.erase(q); + catchupQueues.erase(q); GuardMap::iterator i = guards.find(q); if (i != guards.end()) { i->second->cancel(); diff --git a/cpp/src/qpid/ha/RemoteBackup.h b/cpp/src/qpid/ha/RemoteBackup.h index 8ee776e90b..769c50457e 100644 --- a/cpp/src/qpid/ha/RemoteBackup.h +++ b/cpp/src/qpid/ha/RemoteBackup.h @@ -33,6 +33,7 @@ namespace qpid { namespace broker { class Queue; class QueueRegistry; +class Connection; } namespace ha { @@ -54,20 +55,20 @@ class RemoteBackup /** Note: isReady() can be true after construction *@param connected true if the backup is already connected. */ - RemoteBackup(const BrokerInfo& info, ReplicationTest, bool connected); + RemoteBackup(const BrokerInfo&, broker::Connection*); ~RemoteBackup(); - /** Set the initial queues for all queues in the registry. - *@createGuards if true create guards also, if false guards will be created on demand. + /** Set all queues in the registry as catch-up queues. + *@createGuards if true create guards also, if false guards are created on demand. */ - void setInitialQueues(broker::QueueRegistry&, bool createGuards); + void setCatchupQueues(broker::QueueRegistry&, bool createGuards); /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ GuardPtr guard(const QueuePtr&); /** Is the remote backup connected? */ - void setConnected(bool b) { connected=b; } - bool isConnected() const { return connected; } + void setConnection(broker::Connection* c) { connection = c; } + bool isConnected() const { return connection; } /** ReplicatingSubscription associated with queue is ready. * Note: may set isReady() @@ -80,7 +81,7 @@ class RemoteBackup /** Called via ConfigurationObserver. Note: may set isReady() */ void queueDestroy(const QueuePtr&); - /**@return true when all initial queues for this backup are ready. */ + /**@return true when all catch-up queues for this backup are ready. */ bool isReady(); /**@return true if isReady() and this is the first call to reportReady */ @@ -94,15 +95,14 @@ class RemoteBackup typedef std::map<QueuePtr, GuardPtr> GuardMap; typedef std::set<QueuePtr> QueueSet; - /** Add queue to guard as an initial queue */ - void initialQueue(const QueuePtr&, bool createGuard); + void catchupQueue(const QueuePtr&, bool createGuard); std::string logPrefix; BrokerInfo brokerInfo; ReplicationTest replicationTest; GuardMap guards; - QueueSet initialQueues; - bool connected; + QueueSet catchupQueues; + broker::Connection* connection; bool reportedReady; }; diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 6f7519cd1f..933716e8fa 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -91,25 +91,6 @@ string mask(const string& in) return DOLLAR + in + INTERNAL; } -namespace { -bool getSequence(const Message& message, SequenceNumber& result) -{ - result = message.getSequence(); - return true; -} -} -bool ReplicatingSubscription::getNext( - broker::Queue& q, SequenceNumber from, SequenceNumber& result) -{ - QueueCursor cursor(REPLICATOR); - return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(result)), from); -} - -bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) { - QueueCursor cursor(REPLICATOR); - return q.seek(cursor, boost::bind(&getSequence, _1, boost::ref(front))); -} - /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( @@ -157,7 +138,7 @@ ReplicatingSubscription::ReplicatingSubscription( // Set a log prefix message that identifies the remote broker. ostringstream os; - os << "Primary " << queue->getName() << "@" << info.getLogId() << ": "; + os << "Primary " << queue->getName() << "@" << info << ": "; logPrefix = os.str(); // NOTE: Once the guard is attached we can have concurrent @@ -171,6 +152,7 @@ ReplicatingSubscription::ReplicatingSubscription( guard->attach(*this); QueueRange backup(arguments); // Remote backup range. + QueueRange backupOriginal(backup); QueueRange primary(guard->getRange()); // Unguarded range when the guard was set. backupPosition = backup.back; @@ -207,7 +189,7 @@ ReplicatingSubscription::ReplicatingSubscription( // queue and hasn't been tampered with then that will be the case. QPID_LOG(debug, logPrefix << "Subscribed:" - << " backup:" << backup + << " backup:" << backupOriginal << " adjusted backup:" << backup << " primary:" << primary << " catch-up: " << position << "-" << primary.back << "(" << primary.back-position << ")"); @@ -222,9 +204,7 @@ ReplicatingSubscription::ReplicatingSubscription( } } -ReplicatingSubscription::~ReplicatingSubscription() { - QPID_LOG(debug, logPrefix << "Detroyed replicating subscription"); -} +ReplicatingSubscription::~ReplicatingSubscription() {} // Called in subscription's connection thread when the subscription is created. // Called separate from ctor because sending events requires @@ -248,19 +228,20 @@ void ReplicatingSubscription::initialize() { } // Message is delivered in the subscription's connection thread. -bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) { - position = m.getSequence(); +bool ReplicatingSubscription::deliver( + const qpid::broker::QueueCursor& c, const qpid::broker::Message& m) +{ try { - QPID_LOG(trace, logPrefix << "Replicating " << getQueue()->getName() << "[" << m.getSequence() << "]"); + QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence()); { Mutex::ScopedLock l(lock); - //FIXME GRS: position is no longer set//assert(position == m.getSequence()); + position = m.getSequence(); - // m.getSequence() is the position of the newly enqueued message on local queue. + // m.getSequence() is the position of the new message on local queue. // backupPosition is latest position on backup queue before enqueueing if (m.getSequence() <= backupPosition) throw Exception( - QPID_MSG("Expected position > " << backupPosition + QPID_MSG(logPrefix << "Expected position > " << backupPosition << " but got " << m.getSequence())); if (m.getSequence() - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. @@ -272,7 +253,7 @@ bool ReplicatingSubscription::deliver(const qpid::broker::QueueCursor& c, const } return ConsumerImpl::deliver(c, m); } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "Error replicating " << getQueue()->getName() << "[" << m.getSequence() << "]" + QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence() << ": " << e.what()); throw; } @@ -292,6 +273,7 @@ void ReplicatingSubscription::setReady() { // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { + QPID_LOG(debug, logPrefix << "Cancelled"); guard->cancel(); ConsumerImpl::cancel(); } @@ -299,7 +281,7 @@ void ReplicatingSubscription::cancel() // Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) { // Finish completion of message, it has been acknowledged by the backup. - QPID_LOG(trace, logPrefix << "Acknowledged " << getQueue()->getName() << "[" << r.getMessageId() << "]"); + QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId()); guard->complete(r.getMessageId()); // If next message is protected by the guard then we are ready if (r.getMessageId() >= guard->getRange().back) setReady(); @@ -328,7 +310,7 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) // arbitrary connection threads. void ReplicatingSubscription::dequeued(const Message& m) { - QPID_LOG(trace, logPrefix << "Dequeued " << getQueue()->getName() << "[" << m.getSequence() << "]"); + QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence()); { Mutex::ScopedLock l(lock); dequeues.add(m.getSequence()); @@ -396,7 +378,14 @@ bool ReplicatingSubscription::doDispatch() Mutex::ScopedLock l(lock); if (!dequeues.empty()) sendDequeueEvent(l); } - return ConsumerImpl::doDispatch(); + try { + return ConsumerImpl::doDispatch(); + } + catch (const std::exception& e) { + // FIXME aconway 2012-10-05: detect queue deletion, no warning. + QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what()); + return false; + } } }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/ReplicatingSubscription.h b/cpp/src/qpid/ha/ReplicatingSubscription.h index 8a2984846e..7fcb4ccf13 100644 --- a/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -61,10 +61,14 @@ class QueueGuard; * * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer. * + * Lock Hierarchy: ReplicatingSubscription MUS NOT call QueueGuard with it's lock held + * QueueGuard MAY call ReplicatingSubscription with it's lock held. */ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl { public: + typedef broker::SemanticState::ConsumerImpl ConsumerImpl; + struct Factory : public broker::ConsumerFactory { boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( broker::SemanticState* parent, @@ -80,17 +84,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl static const std::string QPID_FRONT; static const std::string QPID_BROKER_INFO; - // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription - /** Get position of front message on queue. - *@return false if queue is empty. - */ - static bool getFront(broker::Queue&, framing::SequenceNumber& result); - /** Get next message after from in queue. - *@return false if none found. - */ - static bool getNext(broker::Queue&, framing::SequenceNumber from, - framing::SequenceNumber& result); - ReplicatingSubscription(broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, @@ -114,6 +107,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl // Hide the "queue deleted" error for a ReplicatingSubscription when a // queue is deleted, this is normal and not an error. bool hideDeletedError() { return true; } + // Not counted for auto deletion and immediate message purposes. + bool isCounted() { return false; } /** Initialization that must be done separately from construction * because it requires a shared_ptr to this to exist. diff --git a/cpp/src/qpid/ha/ReplicationTest.cpp b/cpp/src/qpid/ha/ReplicationTest.cpp index 88a969dbfd..647523ef2c 100644 --- a/cpp/src/qpid/ha/ReplicationTest.cpp +++ b/cpp/src/qpid/ha/ReplicationTest.cpp @@ -19,7 +19,9 @@ * */ #include "ReplicationTest.h" +#include "qpid/log/Statement.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/Exchange.h" #include "qpid/framing/FieldTable.h" namespace qpid { @@ -27,48 +29,47 @@ namespace ha { using types::Variant; -ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) { +ReplicateLevel ReplicationTest::getLevel(const std::string& str) { Enum<ReplicateLevel> rl(replicateDefault); if (!str.empty()) rl.parse(str); return rl.get(); } -ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) { +ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) { if (f.isSet(QPID_REPLICATE)) - return replicateLevel(f.getAsString(QPID_REPLICATE)); + return getLevel(f.getAsString(QPID_REPLICATE)); else return replicateDefault; } -ReplicateLevel ReplicationTest::replicateLevel(const Variant::Map& m) { +ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) { Variant::Map::const_iterator i = m.find(QPID_REPLICATE); if (i != m.end()) - return replicateLevel(i->second.asString()); + return getLevel(i->second.asString()); else return replicateDefault; } -namespace { -const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); +ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) { + const Variant::Map& qmap(q.getSettings().original); + Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE); + if (i != qmap.end()) + return getLevel(i->second.asString()); + else + return getLevel(q.getSettings().storeSettings); } -bool ReplicationTest::isReplicated( - ReplicateLevel level, const Variant::Map& args, bool autodelete, bool exclusive) -{ - bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end(); - return !ignore && replicateLevel(args) >= level; +ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) { + return getLevel(ex.getArgs()); } -bool ReplicationTest::isReplicated( - ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive) +ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) { - bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT); - return !ignore && replicateLevel(args) >= level; + return q.getSettings().isTemporary ? ReplicationTest(NONE).getLevel(q) : getLevel(q); } -bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q) -{ - return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner()); +ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) { + return ReplicationTest::getLevel(ex); } diff --git a/cpp/src/qpid/ha/ReplicationTest.h b/cpp/src/qpid/ha/ReplicationTest.h index 9f6976a8e4..7d44d82a21 100644 --- a/cpp/src/qpid/ha/ReplicationTest.h +++ b/cpp/src/qpid/ha/ReplicationTest.h @@ -30,6 +30,7 @@ namespace qpid { namespace broker { class Queue; +class Exchange; } namespace framing { @@ -47,21 +48,24 @@ class ReplicationTest ReplicationTest(ReplicateLevel replicateDefault_) : replicateDefault(replicateDefault_) {} - // Return the simple replication level, accounting for defaults. - ReplicateLevel replicateLevel(const std::string& str); - ReplicateLevel replicateLevel(const framing::FieldTable& f); - ReplicateLevel replicateLevel(const types::Variant::Map& m); + // Get the replication level set on an object, or default if not set. + ReplicateLevel getLevel(const std::string& str); + ReplicateLevel getLevel(const framing::FieldTable& f); + ReplicateLevel getLevel(const types::Variant::Map& m); + ReplicateLevel getLevel(const broker::Queue&); + ReplicateLevel getLevel(const broker::Exchange&); + + // Calculate level for objects that may not have replication set, + // including auto-delete/exclusive settings. + ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive); + ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive); + ReplicateLevel useLevel(const broker::Queue&); + ReplicateLevel useLevel(const broker::Exchange&); - // Return true if replication for a queue is enabled at level or higher, - // taking account of default level and queue settings. - bool isReplicated(ReplicateLevel level, - const types::Variant::Map& args, bool autodelete, bool exclusive); - bool isReplicated(ReplicateLevel level, - const framing::FieldTable& args, bool autodelete, bool exclusive); - bool isReplicated(ReplicateLevel level, const broker::Queue&); private: ReplicateLevel replicateDefault; }; + }} // namespace qpid::ha #endif /*!QPID_HA_REPLICATIONTEST_H*/ diff --git a/cpp/src/qpid/ha/Role.h b/cpp/src/qpid/ha/Role.h new file mode 100644 index 0000000000..9d6f7cd123 --- /dev/null +++ b/cpp/src/qpid/ha/Role.h @@ -0,0 +1,55 @@ +#ifndef QPID_HA_ROLE_H +#define QPID_HA_ROLE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> + +namespace qpid { +struct Url; + +namespace ha { + +/** + * A HaBroker has a role, e.g. Primary, Backup, StandAlone. + * Role subclasses define the actions of the broker in each role. + * The Role interface allows the HaBroker to pass management actions + * to be implemented by the role. + */ +class Role +{ + public: + /** Log prefix appropriate to the role */ + virtual std::string getLogPrefix() const = 0; + + /** QMF promote method handler. + * @return The new role if promoted, 0 if not. Caller takes ownership. + */ + virtual Role* promote() = 0; + + virtual void setBrokerUrl(const Url& url) = 0; + + private: +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_ROLE_H*/ diff --git a/cpp/src/qpid/ha/Settings.h b/cpp/src/qpid/ha/Settings.h index 37235b5c79..53b61415cf 100644 --- a/cpp/src/qpid/ha/Settings.h +++ b/cpp/src/qpid/ha/Settings.h @@ -23,6 +23,7 @@ */ #include "types.h" +#include "qpid/sys/IntegerTypes.h" #include <string> namespace qpid { @@ -34,16 +35,25 @@ namespace ha { class Settings { public: - Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5) + Settings() : cluster(false), queueReplication(false), + replicateDefault(NONE), backupTimeout(5), + flowMessages(100), flowBytes(0) {} bool cluster; // True if we are a cluster member. - std::string clientUrl; + bool queueReplication; // True if enabled. + std::string publicUrl; std::string brokerUrl; Enum<ReplicateLevel> replicateDefault; std::string username, password, mechanism; double backupTimeout; - private: + + uint32_t flowMessages, flowBytes; + + static const uint32_t NO_LIMIT=0xFFFFFFFF; + static uint32_t flowValue(uint32_t n) { return n ? n : NO_LIMIT; } + uint32_t getFlowMessages() const { return flowValue(flowMessages); } + uint32_t getFlowBytes() const { return flowValue(flowBytes); } }; }} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/StandAlone.h b/cpp/src/qpid/ha/StandAlone.h new file mode 100644 index 0000000000..d052996d40 --- /dev/null +++ b/cpp/src/qpid/ha/StandAlone.h @@ -0,0 +1,45 @@ +#ifndef QPID_HA_STANDALONE_H +#define QPID_HA_STANDALONE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +namespace qpid { +struct Url; + +namespace ha { + +/** + * Stand-alone role: acts as a stand-alone broker, no clustering. + * HA module needed to setting up replication via QMF methods. + */ +class StandAlone : public Role +{ + public: + std::string getLogPrefix() const { return logPrefix; } + Role* promote() { return 0; } + void setBrokerUrl(const Url&) {} + + private: + std::string logPrefix; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_STANDALONE_H*/ diff --git a/cpp/src/qpid/ha/StatusCheck.cpp b/cpp/src/qpid/ha/StatusCheck.cpp new file mode 100644 index 0000000000..17613ce3dd --- /dev/null +++ b/cpp/src/qpid/ha/StatusCheck.cpp @@ -0,0 +1,132 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "StatusCheck.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Address.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Session.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace ha { + +using namespace qpid::messaging; +using namespace qpid::types; +using namespace std; +using namespace sys; + +const string HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"; + +class StatusCheckThread : public sys::Runnable { + public: + StatusCheckThread(StatusCheck& sc, const qpid::Address& addr, const BrokerInfo& self) + : url(addr), statusCheck(sc), brokerInfo(self) {} + void run(); + private: + Url url; + StatusCheck& statusCheck; + uint16_t linkHeartbeatInterval; + BrokerInfo brokerInfo; +}; + +void StatusCheckThread::run() { + QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url); + Variant::Map options, clientProperties; + clientProperties = brokerInfo.asMap(); // Detect self connections. + clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups. + + options["client-properties"] = clientProperties; + options["heartbeat"] = statusCheck.linkHeartbeatInterval; + Connection c(url.str(), options); + + try { + c.open(); + Session session = c.createSession(); + messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}"); + Receiver r = session.createReceiver(responses); + Sender s = session.createSender("qmf.default.direct/broker"); + Message request; + request.setReplyTo(responses); + request.setContentType("amqp/map"); + request.setProperty("x-amqp-0-10.app-id", "qmf2"); + request.setProperty("qmf.opcode", "_query_request"); + Variant::Map oid; + oid["_object_name"] = HA_BROKER; + Variant::Map content; + content["_what"] = "OBJECT"; + content["_object_id"] = oid; + encode(content, request); + s.send(request); + Message response = r.fetch(statusCheck.linkHeartbeatInterval*Duration::SECOND); + session.acknowledge(); + Variant::List contentIn; + decode(response, contentIn); + if (contentIn.size() == 1) { + Variant::Map details = contentIn.front().asMap()["_values"].asMap(); + string status = details["status"].getString(); + if (status != "joining") { + statusCheck.setPromote(false); + QPID_LOG(info, statusCheck.logPrefix << "Status of " << url << " is " + << status << ", this broker will refuse promotion."); + } + QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status); + } + } catch(const exception& error) { + QPID_LOG(info, "Checking status of " << url << ": " << error.what()); + } + delete this; +} + +StatusCheck::StatusCheck(const string& lp, uint16_t lh, const BrokerInfo& self) + : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self) +{} + +StatusCheck::~StatusCheck() { + // Join any leftovers + for (size_t i = 0; i < threads.size(); ++i) threads[i].join(); +} + +void StatusCheck::setUrl(const Url& url) { + Mutex::ScopedLock l(lock); + for (size_t i = 0; i < url.size(); ++i) + threads.push_back(Thread(new StatusCheckThread(*this, url[i], brokerInfo))); +} + +bool StatusCheck::canPromote() { + Mutex::ScopedLock l(lock); + while (!threads.empty()) { + Thread t = threads.back(); + threads.pop_back(); + Mutex::ScopedUnlock u(lock); + t.join(); + } + return promote; +} + +void StatusCheck::setPromote(bool p) { + Mutex::ScopedLock l(lock); + promote = p; +} + +}} // namespace qpid::ha diff --git a/cpp/src/qpid/ha/StatusCheck.h b/cpp/src/qpid/ha/StatusCheck.h new file mode 100644 index 0000000000..997ced4159 --- /dev/null +++ b/cpp/src/qpid/ha/StatusCheck.h @@ -0,0 +1,71 @@ +#ifndef QPID_HA_STATUSCHECK_H +#define QPID_HA_STATUSCHECK_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "BrokerInfo.h" +#include "qpid/Url.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" +#include <vector> + +namespace qpid { +namespace ha { + +// FIXME aconway 2012-12-21: This solution is incomplete. It will only protect +// against bad promotion if there are READY brokers when this broker starts. +// It will not help the situation where brokers became READY after this one starts. +// + +/** + * Check whether a JOINING broker can be promoted . + * + * A JOINING broker can be promoted as long as all the other brokers are also + * JOINING. If there are READY brokers in the cluster the JOINING broker should + * refuse to promote so that one of the READY brokers can. This situation + * only comes about if the primary is dead and no new primary has been promoted. + * + * THREAD SAFE: setUrl and canPromote are called in arbitrary management threads. + */ +class StatusCheck +{ + public: + StatusCheck(const std::string& logPrefix, uint16_t linkHeartbeatInteval, const BrokerInfo& self); + ~StatusCheck(); + void setUrl(const Url&); + bool canPromote(); + + private: + void setPromote(bool p); + + std::string logPrefix; + sys::Mutex lock; + std::vector<sys::Thread> threads; + bool promote; + uint16_t linkHeartbeatInterval; + BrokerInfo brokerInfo; + friend class StatusCheckThread; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_STATUSCHECK_H*/ diff --git a/cpp/src/qpid/ha/types.cpp b/cpp/src/qpid/ha/types.cpp index 53e2056213..bb4bf83574 100644 --- a/cpp/src/qpid/ha/types.cpp +++ b/cpp/src/qpid/ha/types.cpp @@ -33,6 +33,7 @@ namespace ha { using namespace std; const string QPID_REPLICATE("qpid.replicate"); +const string QPID_HA_UUID("qpid.ha-uuid"); string EnumBase::str() const { assert(value < count); @@ -55,6 +56,11 @@ template <> const char* Enum<ReplicateLevel>::NAMES[] = { "none", "configuration template <> const size_t Enum<ReplicateLevel>::N = 3; template <> const char* Enum<BrokerStatus>::NAME = "HA broker status"; + +// NOTE: Changing status names will have an impact on qpid-ha and +// the qpidd-primary init script. +// Don't change them unless you are going to update all dependent code. +// template <> const char* Enum<BrokerStatus>::NAMES[] = { "joining", "catchup", "ready", "recovering", "active", "standalone" }; diff --git a/cpp/src/qpid/ha/types.h b/cpp/src/qpid/ha/types.h index 35faf9f624..f8c48afc5a 100644 --- a/cpp/src/qpid/ha/types.h +++ b/cpp/src/qpid/ha/types.h @@ -99,6 +99,7 @@ inline bool isBackup(BrokerStatus s) { return !isPrimary(s); } // String constants. extern const std::string QPID_REPLICATE; +extern const std::string QPID_HA_UUID; /** Define IdSet type, not a typedef so we can overload operator << */ class IdSet : public std::set<types::Uuid> {}; |