diff options
Diffstat (limited to 'qpid/cpp/src')
37 files changed, 1100 insertions, 458 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index aee5fb6213..b83e8d9c6d 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -626,31 +626,38 @@ set (ha_default ON) option(BUILD_HA "Build Active-Passive HA plugin" ${ha_default}) if (BUILD_HA) set (ha_SOURCES + qpid/ha/BackupConnectionExcluder.h + qpid/ha/BrokerInfo.cpp + qpid/ha/BrokerInfo.h + qpid/ha/HeldQueue.h + qpid/ha/QueueGuard.cpp + qpid/ha/QueueGuard.h + qpid/ha/ReplicationTest.cpp + qpid/ha/ReplicationTest.h qpid/ha/Backup.cpp qpid/ha/Backup.h - qpid/ha/BrokerInfo.h - qpid/ha/BrokerInfo.cpp qpid/ha/BrokerReplicator.cpp qpid/ha/BrokerReplicator.h - qpid/ha/ConnectionExcluder.cpp - qpid/ha/ConnectionExcluder.h + qpid/ha/ConnectionObserver.cpp + qpid/ha/ConnectionObserver.h qpid/ha/Counter.h - qpid/ha/Enum.cpp - qpid/ha/Enum.h qpid/ha/HaBroker.cpp qpid/ha/HaBroker.h qpid/ha/HaPlugin.cpp - qpid/ha/LogPrefix.cpp - qpid/ha/LogPrefix.h qpid/ha/Membership.cpp qpid/ha/Membership.h qpid/ha/Primary.cpp qpid/ha/Primary.h + qpid/ha/PrimaryConnectionMonitor.h qpid/ha/QueueReplicator.cpp qpid/ha/QueueReplicator.h qpid/ha/ReplicatingSubscription.cpp qpid/ha/ReplicatingSubscription.h qpid/ha/Settings.h + qpid/ha/Types.cpp + qpid/ha/Types.h + qpid/ha/UnreadyQueueSet.cpp + qpid/ha/UnreadyQueueSet.h ) add_library (ha MODULE ${ha_SOURCES}) diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index cab7d8c42b..dae924b6ea 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -25,29 +25,36 @@ dmoduleexec_LTLIBRARIES += ha.la ha_la_SOURCES = \ qpid/ha/Backup.cpp \ qpid/ha/Backup.h \ - qpid/ha/BrokerInfo.h \ + qpid/ha/BackupConnectionExcluder.h \ qpid/ha/BrokerInfo.cpp \ + qpid/ha/BrokerInfo.h \ qpid/ha/BrokerReplicator.cpp \ qpid/ha/BrokerReplicator.h \ - qpid/ha/ConnectionExcluder.cpp \ - qpid/ha/ConnectionExcluder.h \ + qpid/ha/ConnectionObserver.cpp \ + qpid/ha/ConnectionObserver.h \ qpid/ha/Counter.h \ - qpid/ha/Enum.cpp \ - qpid/ha/Enum.h \ qpid/ha/HaBroker.cpp \ qpid/ha/HaBroker.h \ qpid/ha/HaPlugin.cpp \ - qpid/ha/LogPrefix.cpp \ - qpid/ha/LogPrefix.h \ + qpid/ha/HeldQueue.h \ qpid/ha/Membership.cpp \ qpid/ha/Membership.h \ qpid/ha/Primary.cpp \ qpid/ha/Primary.h \ + qpid/ha/PrimaryConnectionMonitor. \ + qpid/ha/QueueGuard.cpp \ + qpid/ha/QueueGuard.h \ qpid/ha/QueueReplicator.cpp \ qpid/ha/QueueReplicator.h \ qpid/ha/ReplicatingSubscription.cpp \ qpid/ha/ReplicatingSubscription.h \ - qpid/ha/Settings.h + qpid/ha/ReplicationTest.cpp \ + qpid/ha/ReplicationTest.h \ + qpid/ha/Settings.h \ + qpid/ha/UnreadyQueueSet.cpp \ + qpid/ha/UnreadyQueueSet.h \ + qpid/ha/types.cpp \ + qpid/ha/types.h ha_la_LIBADD = libqpidbroker.la ha_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 44fb098e79..f2cc2a3454 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -45,7 +45,7 @@ using types::Variant; using std::string; Backup::Backup(HaBroker& hb, const Settings& s) : - logPrefix(hb), haBroker(hb), broker(hb.getBroker()), settings(s) + logPrefix("HA backup: "), haBroker(hb), broker(hb.getBroker()), settings(s) { // Empty brokerUrl means delay initialization until seBrokertUrl() is called. if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); @@ -64,14 +64,14 @@ Url Backup::linkUrl(const Url& brokers) const { for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (!isSelf(*i)) url.push_back(*i); if (url.empty()) throw Url::Invalid("HA Backup failover URL is empty"); - QPID_LOG(debug, logPrefix << "Backup failover URL (excluding self): " << url); + QPID_LOG(debug, logPrefix << " failover URL (excluding self): " << url); return url; */ } void Backup::initialize(const Url& brokers) { if (brokers.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(info, logPrefix << "Backup broker URL: " << brokers); + QPID_LOG(info, logPrefix << "Initialized, broker URL: " << brokers); sys::Mutex::ScopedLock l(lock); Url url = linkUrl(brokers); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 92387ece60..ca3c2a02d0 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -22,7 +22,6 @@ * */ -#include "LogPrefix.h" #include "Settings.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" @@ -57,7 +56,8 @@ class Backup Url linkUrl(const Url&) const; void initialize(const Url&); - LogPrefix logPrefix; + std::string logPrefix; + sys::Mutex lock; HaBroker& haBroker; broker::Broker& broker; diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h index d80fe23458..9776fac016 100644 --- a/qpid/cpp/src/qpid/ha/LogPrefix.cpp +++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h @@ -1,3 +1,6 @@ +#ifndef QPID_HA_BACKUPCONNECTIONEXCLUDER_H +#define QPID_HA_BACKUPCONNECTIONEXCLUDER_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,33 +21,26 @@ * under the License. * */ -#include "LogPrefix.h" -#include "HaBroker.h" -#include <iostream> + +#include "qpid/broker/ConnectionObserver.h" +#include "qpid/broker/Connection.h" namespace qpid { namespace ha { -LogPrefix::LogPrefix(HaBroker& hb, const std::string& msg) : haBroker(&hb), status(0) { - if (msg.size()) setMessage(msg); -} - -LogPrefix::LogPrefix(LogPrefix& lp, const std::string& msg) - : haBroker(lp.haBroker), status(0) +/** + * Exclude connections to a backup broker. + */ +class BackupConnectionExcluder : public broker::ConnectionObserver { - if (msg.size()) setMessage(msg); -} + public: + void opened(broker::Connection& connection) { + throw Exception("HA backup rejected connection "+connection.getMgmtId()); + } -LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {} - -void LogPrefix::setMessage(const std::string& msg) { - tail = " "+msg+":"; -} - -std::ostream& operator<<(std::ostream& o, const LogPrefix& l) { - return o << "HA(" - << printable(l.status ? *l.status : l.haBroker->getStatus()) - << ")" << l.tail << " "; -} + void closed(broker::Connection&) {} +}; }} // namespace qpid::ha + +#endif /*!QPID_HA_BACKUPCONNECTIONEXCLUDER_H*/ diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp index 2673646646..0c5de9542f 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp @@ -42,6 +42,9 @@ using types::Uuid; using types::Variant; using framing::FieldTable; +BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) : + logId(id.str().substr(0,9)+"..."), hostName(host), port(port_), systemId(id) {} + FieldTable BrokerInfo::asFieldTable() const { Variant::Map m = asMap(); FieldTable ft; diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h index b0864e0402..55479df4b9 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.h +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h @@ -22,7 +22,7 @@ * */ -#include "Enum.h" +#include "types.h" #include "qpid/Url.h" #include "qpid/framing/FieldTable.h" #include "qpid/types/Uuid.h" @@ -40,15 +40,15 @@ class BrokerInfo { public: BrokerInfo() {} - BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) : - hostName(host), port(port_), systemId(id) {} + BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id); BrokerInfo(const framing::FieldTable& ft) { assign(ft); } BrokerInfo(const types::Variant::Map& m) { assign(m); } types::Uuid getSystemId() const { return systemId; } std::string getHostName() const { return hostName; } BrokerStatus getStatus() const { return status; } - uint16_t getPort() const { return port; } + uint16_t getPort() const { return port; } + std::string getLogId() const { return logId; } void setStatus(BrokerStatus s) { status = s; } @@ -59,6 +59,7 @@ class BrokerInfo void assign(const types::Variant::Map&); private: + std::string logId; std::string hostName; uint16_t port; types::Uuid systemId; diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 0173495a00..81667e0437 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -170,7 +170,7 @@ Variant::Map asMapVoid(const Variant& value) { BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), - logPrefix(hb), + logPrefix("HA backup: "), replicationTest(hb.getReplicationTest()), haBroker(hb), broker(hb.getBroker()), link(l) {} @@ -291,10 +291,10 @@ void BrokerReplicator::route(Deliverable& msg) { void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); - if (!isReplicated( + if (!replicationTest.isReplicated( values[ARGS].asMap(), values[AUTODEL].asBool(), values[EXCL].asBool())) return; - if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) { + if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { framing::FieldTable args; amqp_0_10::translate(argsMap, args); // If we already have a queue with this name, replace it. @@ -335,7 +335,7 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (!queue) { QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " << name); - } else if (!haBroker.replicateLevel(queue->getSettings())) { + } else if (!replicationTest.replicateLevel(queue->getSettings())) { QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " << name); } else { boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); @@ -353,8 +353,8 @@ void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGS])); - if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated exchange. - if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) { + if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange. + if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { string name = values[EXNAME].asString(); framing::FieldTable args; amqp_0_10::translate(argsMap, args); @@ -383,7 +383,7 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); if (!exchange) { QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: " << name); - } else if (!haBroker.replicateLevel(exchange->getArgs())) { + } else if (!replicationTest.replicateLevel(exchange->getArgs())) { QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name); } else { QPID_LOG(debug, logPrefix << "Exchange delete event:" << name); @@ -401,8 +401,8 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { broker.getQueues().find(values[QNAME].asString()); // We only replicate binds for a replicated queue to replicated // exchange that both exist locally. - if (exchange && haBroker.replicateLevel(exchange->getArgs()) && - queue && haBroker.replicateLevel(queue->getSettings())) + if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && + queue && replicationTest.replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); @@ -421,8 +421,8 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { broker.getQueues().find(values[QNAME].asString()); // We only replicate unbinds for a replicated queue to replicated // exchange that both exist locally. - if (exchange && haBroker.replicateLevel(exchange->getArgs()) && - queue && haBroker.replicateLevel(queue->getSettings())) + if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && + queue && replicationTest.replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); @@ -441,7 +441,7 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!isReplicated(values[ARGUMENTS].asMap(), + if (!replicationTest.isReplicated(values[ARGUMENTS].asMap(), values[AUTODELETE].asBool(), values[EXCLUSIVE].asBool())) return; @@ -465,7 +465,7 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!haBroker.replicateLevel(argsMap)) return; + if (!replicationTest.replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); if (broker.createExchange( @@ -512,8 +512,8 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); // Automatically replicate binding if queue and exchange exist and are replicated - if (exchange && haBroker.replicateLevel(exchange->getArgs()) && - queue && haBroker.replicateLevel(queue->getSettings())) + if (exchange && replicationTest.replicateLevel(exchange->getArgs()) && + queue && replicationTest.replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); @@ -533,7 +533,7 @@ const string REPLICATE_DEFAULT="replicateDefault"; void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); - ReplicateLevel primary = haBroker.replicateLevel( + ReplicateLevel primary = replicationTest.replicateLevel( values[REPLICATE_DEFAULT].asString()); if (mine != primary) throw Exception(QPID_MSG("Replicate default on backup (" << mine @@ -545,22 +545,11 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { } } -namespace { -const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); -} - -bool BrokerReplicator::isReplicated( - const Variant::Map& args, bool autodelete, bool exclusive) -{ - bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end(); - return haBroker.replicateLevel(args) && !ignore; -} - void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { - if (haBroker.replicateLevel(queue->getSettings()) == ALL) { + if (replicationTest.replicateLevel(queue->getSettings()) == ALL) { boost::shared_ptr<QueueReplicator> qr( - new QueueReplicator(haBroker, queue, link)); + new QueueReplicator(haBroker.getBrokerInfo(), queue, link)); if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index f7466d6406..677de31370 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -23,8 +23,8 @@ */ #include "Counter.h" -#include "Enum.h" -#include "LogPrefix.h" +#include "types.h" +#include "ReplicationTest.h" #include "qpid/broker/Exchange.h" #include "qpid/types/Variant.h" #include <boost/shared_ptr.hpp> @@ -97,7 +97,8 @@ class BrokerReplicator : public broker::Exchange, bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive); void ready(); - LogPrefix logPrefix; + std::string logPrefix; + ReplicationTest replicationTest; HaBroker& haBroker; broker::Broker& broker; boost::shared_ptr<broker::Link> link; diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp index 9294f38ef3..61835b15d1 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp @@ -31,7 +31,7 @@ namespace qpid { namespace ha { ConnectionExcluder::ConnectionExcluder(HaBroker& hb, const types::Uuid& uuid) - : haBroker(hb), logPrefix(hb), self(uuid) {} + : haBroker(hb), logPrefix("HA: "), self(uuid) {} namespace { bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info) { @@ -53,7 +53,7 @@ void ConnectionExcluder::opened(broker::Connection& connection) { } BrokerStatus status = haBroker.getStatus(); if (isBackup(status)) reject(connection); - BrokerInfo info; // Get info about a connecting backup. + BrokerInfo info; // Avoid self connections. if (getBrokerInfo(connection, info)) { if (info.getSystemId() == self) { QPID_LOG(debug, logPrefix << "Rejected self connection"); @@ -65,8 +65,7 @@ void ConnectionExcluder::opened(broker::Connection& connection) { return; } } - else // This is a client connection. - if (status == RECOVERING) reject(connection); // FIXME aconway 2012-05-29: allow clients in recovery + // else: Primary node accepts connections. } void ConnectionExcluder::reject(broker::Connection& connection) { diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h index 629fda7519..c24a138e2c 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h @@ -22,9 +22,9 @@ * */ -#include "LogPrefix.h" +#include "types.h" #include "qpid/broker/ConnectionObserver.h" -#include "qpid/framing/Uuid.h" +#include "qpid/types/Uuid.h" #include "qpid/sys/Mutex.h" #include <boost/function.hpp> @@ -35,6 +35,7 @@ class Connection; } namespace ha { +class HaBroker; /** * Exclude normal connections to a backup broker. @@ -58,7 +59,7 @@ class ConnectionExcluder : public broker::ConnectionObserver void reject(broker::Connection&); HaBroker& haBroker; - LogPrefix logPrefix; + std::string logPrefix; types::Uuid self; }; diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp new file mode 100644 index 0000000000..694a253fc3 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ConnectionObserver.h" +#include "BrokerInfo.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/broker/Connection.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace ha { + +ConnectionObserver::ConnectionObserver(const types::Uuid& uuid) + : logPrefix("HA: "), self(uuid) {} + +bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) { + framing::FieldTable ft; + if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) { + info = BrokerInfo(ft); + return true; + } + return false; +} + +void ConnectionObserver::setObserver(const ObserverPtr& o){ + sys::Mutex::ScopedLock l(lock); + observer = o; +} + +ConnectionObserver::ObserverPtr ConnectionObserver::getObserver() { + sys::Mutex::ScopedLock l(lock); + return observer; +} + +void ConnectionObserver::opened(broker::Connection& connection) { + if (connection.isLink()) return; // Allow outgoing links. + if (connection.getClientProperties().isSet(ADMIN_TAG)) { + QPID_LOG(debug, logPrefix << "Allowing admin connection: " + << connection.getMgmtId()); + return; // No need to call observer, always allow admins. + } + BrokerInfo info; // Avoid self connections. + if (getBrokerInfo(connection, info) && info.getSystemId() == self) + throw Exception("HA rejected self connection"); + ObserverPtr o(getObserver()); + if (o) o->opened(connection); +} + +void ConnectionObserver::closed(broker::Connection& connection) { + ObserverPtr o(getObserver()); + if (o) o->closed(connection); +} + +const std::string ConnectionObserver::ADMIN_TAG="qpid.ha-admin"; +const std::string ConnectionObserver::BACKUP_TAG="qpid.ha-backup"; + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h new file mode 100644 index 0000000000..a950f41739 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h @@ -0,0 +1,72 @@ +#ifndef QPID_HA_CONNECTIONOBSERVER_H +#define QPID_HA_CONNECTIONOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/broker/ConnectionObserver.h" +#include "qpid/types/Uuid.h" +#include "qpid/sys/Mutex.h" +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace ha { +class BrokerInfo; + +/** + * Observes connections, delegates to another ConnectionObserver for + * actions specific to primary or backup. + * + * THREAD SAFE: called in arbitrary connection threads. + * + * Main role of this class is to provide a continuous observer object + * on the connection so we can't lose observations between removing + * one observer and adding another. + */ +class ConnectionObserver : public broker::ConnectionObserver +{ + public: + typedef boost::shared_ptr<broker::ConnectionObserver> ObserverPtr; + + static const std::string ADMIN_TAG; + static const std::string BACKUP_TAG; + + static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info); + + ConnectionObserver(const types::Uuid& self); + + void setObserver(const ObserverPtr&); + ObserverPtr getObserver(); + + void opened(broker::Connection& connection); + void closed(broker::Connection& connection); + + private: + sys::Mutex lock; + std::string logPrefix; + ObserverPtr observer; + types::Uuid self; +}; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_CONNECTIONOBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 6095837cd6..046800791d 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -19,11 +19,13 @@ * */ #include "Backup.h" -#include "ConnectionExcluder.h" +#include "BackupConnectionExcluder.h" +#include "ConnectionObserver.h" #include "HaBroker.h" #include "Primary.h" -#include "Settings.h" +#include "PrimaryConnectionMonitor.h" #include "ReplicatingSubscription.h" +#include "Settings.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" @@ -39,9 +41,9 @@ #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokersUrl.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicUrl.h" -#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h" #include "qmf/org/apache/qpid/ha/EventMembersUpdate.h" #include "qpid/log/Statement.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace ha { @@ -49,20 +51,22 @@ namespace ha { namespace _qmf = ::qmf::org::apache::qpid::ha; using namespace management; using namespace std; +using types::Variant; +using types::Uuid; HaBroker::HaBroker(broker::Broker& b, const Settings& s) - : logPrefix(status), + : logPrefix("HA: "), broker(b), systemId(broker.getSystem()->getSystemId().data()), settings(s), mgmtObject(0), status(STANDALONE), - excluder(new ConnectionExcluder(*this, systemId)), + observer(new ConnectionObserver(systemId)), brokerInfo(broker.getSystem()->getNodeName(), // TODO aconway 2012-05-24: other transports? broker.getPort(broker::Broker::TCP_TRANSPORT), systemId), - membership(logPrefix, boost::bind(&HaBroker::membershipUpdate, this, _1)) - + membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1, _2)), + replicationTest(s.replicateDefault.get()) { // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); @@ -77,13 +81,15 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) // Register a factory for replicating subscriptions. broker.getConsumerFactories().add( boost::shared_ptr<ReplicatingSubscription::Factory>( - new ReplicatingSubscription::Factory(*this))); + new ReplicatingSubscription::Factory())); // If we are in a cluster, start as backup in joining state. if (settings.cluster) { status = JOINING; + observer->setObserver(boost::shared_ptr<broker::ConnectionObserver>( + new BackupConnectionExcluder)); + broker.getConnectionObservers().add(observer); backup.reset(new Backup(*this, s)); - broker.getConnectionObservers().add(excluder); broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); } @@ -99,57 +105,35 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) HaBroker::~HaBroker() { QPID_LOG(debug, logPrefix << "Broker shut down: " << brokerInfo); - broker.getConnectionObservers().remove(excluder); + broker.getConnectionObservers().remove(observer); } -void HaBroker::recover(sys::Mutex::ScopedLock&) { +void HaBroker::recover(sys::Mutex::ScopedLock& l) { setStatus(RECOVERING); backup.reset(); // No longer replicating, close link. + IdSet backups = membership.otherBackups(); membership.reset(brokerInfo); - primary.reset(new Primary(*this)); // Starts primary-ready check. + primary.reset(new Primary(*this, backups)); // Starts primary-ready check. + observer->setObserver( // Allow connections + boost::shared_ptr<broker::ConnectionObserver>( + new PrimaryConnectionMonitor(*this))); + if (primary->isActive()) activate(l); } -// Called back from Primary ready check. +// Called back from Primary active check. void HaBroker::activate() { sys::Mutex::ScopedLock l(lock); activate(l); } -void HaBroker::activate(sys::Mutex::ScopedLock&) { - BrokerStatus oldStatus = status; - setStatus(ACTIVE); - if (oldStatus != RECOVERING) // Already set membership - membership.reset(brokerInfo); - backup.reset(); // No longer replicating, close link. -} - -ReplicateLevel HaBroker::replicateLevel(const std::string& str) { - Enum<ReplicateLevel> rl; - if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get()); - else return getSettings().replicateDefault.get(); -} - -ReplicateLevel HaBroker::replicateLevel(const framing::FieldTable& f) { - if (f.isSet(QPID_REPLICATE)) - return replicateLevel(f.getAsString(QPID_REPLICATE)); - else - return getSettings().replicateDefault.get(); -} - -ReplicateLevel HaBroker::replicateLevel(const types::Variant::Map& m) { - types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE); - if (i != m.end()) - return replicateLevel(i->second.asString()); - else - return getSettings().replicateDefault.get(); -} +void HaBroker::activate(sys::Mutex::ScopedLock&) { setStatus(ACTIVE); } Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { sys::Mutex::ScopedLock l(lock); switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { switch (status) { - case JOINING: activate(l); break; + case JOINING: recover(l); break; case CATCHUP: // FIXME aconway 2012-04-27: don't allow promotion in catch-up // QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); @@ -171,20 +155,16 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, setClientUrl(Url(dynamic_cast<_qmf::ArgsHaBrokerSetPublicUrl&>(args).i_url), l); break; } - case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: { - setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l); - break; - } case _qmf::HaBroker::METHOD_REPLICATE: { _qmf::ArgsHaBrokerReplicate& bq_args = dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); - QPID_LOG(debug, logPrefix << "replicate individual queue " + QPID_LOG(debug, logPrefix << "Replicate individual queue " << bq_args.i_queue << " from " << bq_args.i_broker); boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); Url url(bq_args.i_broker); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; - types::Uuid uuid(true); + Uuid uuid(true); std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare( broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), url[0].host, url[0].port, protocol, @@ -193,7 +173,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, boost::shared_ptr<broker::Link> link = result.first; link->setUrl(url); // Create a queue replicator - boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(*this, queue, link)); + boost::shared_ptr<QueueReplicator> qr( + new QueueReplicator(brokerInfo, queue, link)); qr->activate(); broker.getExchanges().registerExchange(qr); break; @@ -205,13 +186,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, return Manageable::STATUS_OK; } -void HaBroker::setClientUrl(const Url& url, const sys::Mutex::ScopedLock& l) { +void HaBroker::setClientUrl(const Url& url, sys::Mutex::ScopedLock& l) { if (url.empty()) throw Exception("Invalid empty URL for HA client failover"); clientUrl = url; updateClientUrl(l); } -void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { +void HaBroker::updateClientUrl(sys::Mutex::ScopedLock&) { Url url = clientUrl.empty() ? brokerUrl : clientUrl; if (url.empty()) throw Url::Invalid("HA client URL is empty"); mgmtObject->set_publicUrl(url.str()); @@ -220,7 +201,7 @@ void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { QPID_LOG(debug, logPrefix << "Setting client URL to: " << url); } -void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) { +void HaBroker::setBrokerUrl(const Url& url, sys::Mutex::ScopedLock& l) { if (url.empty()) throw Url::Invalid("HA broker URL is empty"); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); @@ -229,11 +210,6 @@ void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) { if (clientUrl.empty()) updateClientUrl(l); } -void HaBroker::setExpectedBackups(size_t n, const sys::Mutex::ScopedLock&) { - expectedBackups = n; - mgmtObject->set_expectedBackups(n); -} - std::vector<Url> HaBroker::getKnownBrokers() const { return knownBrokers; } @@ -259,7 +235,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) { static const BrokerStatus TRANSITIONS[][2] = { { CATCHUP, RECOVERING }, // FIXME aconway 2012-04-27: illegal transition, allow while fixing behavior { JOINING, CATCHUP }, // Connected to primary - { JOINING, ACTIVE }, // Chosen as initial primary. + { JOINING, RECOVERING }, // Chosen as initial primary. { CATCHUP, READY }, // Caught up all queues, ready to take over. { READY, RECOVERING }, // Chosen as new primary { RECOVERING, ACTIVE } @@ -293,10 +269,13 @@ void HaBroker::statusChanged(sys::Mutex::ScopedLock& l) { setLinkProperties(l); } -void HaBroker::membershipUpdate(const types::Variant::List& brokers) { +void HaBroker::membershipUpdate(const Variant::List& brokers, const IdSet& otherBackups) +{ QPID_LOG(debug, logPrefix << "Membership update: " << brokers); mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); + sys::Mutex::ScopedLock l(lock); + if (primary.get()) primary->getUnreadyQueueSet().setExpectedBackups(otherBackups); } void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) { @@ -305,13 +284,13 @@ void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) { // If this is a backup then any links we make are backup links // and need to be tagged. QPID_LOG(debug, logPrefix << "Backup setting info for outgoing links: " << brokerInfo); - linkProperties.setTable(ConnectionExcluder::BACKUP_TAG, brokerInfo.asFieldTable()); + linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable()); } else { // If this is a primary then any links are federation links // and should not be tagged. QPID_LOG(debug, logPrefix << "Primary removing backup info for outgoing links"); - linkProperties.erase(ConnectionExcluder::BACKUP_TAG); + linkProperties.erase(ConnectionObserver::BACKUP_TAG); } broker.setLinkClientProperties(linkProperties); } @@ -326,7 +305,8 @@ void HaBroker::deactivatedBackup(const std::string& queue) { activeBackups.erase(queue); } -std::set<std::string> HaBroker::getActiveBackups() const { +// FIXME aconway 2012-05-31: strip out. +HaBroker::QueueNames HaBroker::getActiveBackups() const { sys::Mutex::ScopedLock l(lock); return activeBackups; } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 224a0923c5..80e8a6cc3d 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -24,8 +24,8 @@ #include "BrokerInfo.h" #include "Membership.h" -#include "Enum.h" -#include "LogPrefix.h" +#include "types.h" +#include "ReplicationTest.h" #include "Settings.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" @@ -37,8 +37,14 @@ #include <boost/shared_ptr.hpp> namespace qpid { + +namespace types { +class Variant; +} + namespace broker { class Broker; +class Queue; } namespace framing { class FieldTable; @@ -46,7 +52,7 @@ class FieldTable; namespace ha { class Backup; -class ConnectionExcluder; +class ConnectionObserver; class Primary; /** @@ -76,11 +82,7 @@ class HaBroker : public management::Manageable void activate(); Backup* getBackup() { return backup.get(); } - - // Translate replicate levels. - ReplicateLevel replicateLevel(const std::string& str); - ReplicateLevel replicateLevel(const framing::FieldTable& f); - ReplicateLevel replicateLevel(const types::Variant::Map& m); + ReplicationTest getReplicationTest() const { return replicationTest; } // Keep track of the set of actively replicated queues on a backup // so that it can be transferred to the Primary on promotion. @@ -89,19 +91,18 @@ class HaBroker : public management::Manageable void deactivatedBackup(const std::string& queue); QueueNames getActiveBackups() const; - boost::shared_ptr<ConnectionExcluder> getExcluder() { return excluder; } + boost::shared_ptr<ConnectionObserver> getObserver() { return observer; } const BrokerInfo& getBrokerInfo() const { return brokerInfo; } Membership& getMembership() { return membership; } - void membershipUpdate(const types::Variant::List&); + void membershipUpdate(const types::Variant::List&, const IdSet&); private: - void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); - void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&); - void setExpectedBackups(size_t, const sys::Mutex::ScopedLock&); - void updateClientUrl(const sys::Mutex::ScopedLock&); + void setClientUrl(const Url&, sys::Mutex::ScopedLock&); + void setBrokerUrl(const Url&, sys::Mutex::ScopedLock&); + void updateClientUrl(sys::Mutex::ScopedLock&); - bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); } + bool isPrimary(sys::Mutex::ScopedLock&) { return !backup.get(); } void setStatus(BrokerStatus, sys::Mutex::ScopedLock&); void recover(sys::Mutex::ScopedLock&); @@ -111,7 +112,7 @@ class HaBroker : public management::Manageable std::vector<Url> getKnownBrokers() const; - LogPrefix logPrefix; + std::string logPrefix; broker::Broker& broker; types::Uuid systemId; const Settings settings; @@ -122,12 +123,12 @@ class HaBroker : public management::Manageable qmf::org::apache::qpid::ha::HaBroker* mgmtObject; Url clientUrl, brokerUrl; std::vector<Url> knownBrokers; - size_t expectedBackups; BrokerStatus status; QueueNames activeBackups; - boost::shared_ptr<ConnectionExcluder> excluder; + boost::shared_ptr<ConnectionObserver> observer; BrokerInfo brokerInfo; Membership membership; + ReplicationTest replicationTest; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index b6aa0d4a91..42758c4689 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -40,9 +40,6 @@ struct Options : public qpid::Options { ("ha-replicate", optValue(settings.replicateDefault, "LEVEL"), "Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'") - // FIXME aconway 2012-04-30: required-backups? Also need timeout. - ("ha-expected-backups", optValue(settings.expectedBackups, "N"), - "Number of backups expected to be active in the HA cluster.") ("ha-username", optValue(settings.username, "USER"), "Username for connections between HA brokers") ("ha-password", optValue(settings.password, "PASS"), diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp index 7d22a019d5..6c6961f094 100644 --- a/qpid/cpp/src/qpid/ha/Membership.cpp +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -25,31 +25,26 @@ namespace ha { void Membership::reset(const BrokerInfo& b) { - { - sys::Mutex::ScopedLock l(lock); - brokers.clear(); - brokers[b.getSystemId()] = b; - } - update(); + sys::Mutex::ScopedLock l(lock); + brokers.clear(); + brokers[b.getSystemId()] = b; + update(l); } void Membership::add(const BrokerInfo& b) { - { - sys::Mutex::ScopedLock l(lock); - brokers[b.getSystemId()] = b; - } - update(); + sys::Mutex::ScopedLock l(lock); + brokers[b.getSystemId()] = b; + update(l); } void Membership::remove(const types::Uuid& id) { - { - sys::Mutex::ScopedLock l(lock); - BrokerMap::iterator i = brokers.find(id); - if (i != brokers.end()) - brokers.erase(i); + sys::Mutex::ScopedLock l(lock); + BrokerMap::iterator i = brokers.find(id); + if (i != brokers.end()) { + brokers.erase(i); + update(l); } - update(); } bool Membership::contains(const types::Uuid& id) { @@ -58,32 +53,47 @@ bool Membership::contains(const types::Uuid& id) { } void Membership::assign(const types::Variant::List& list) { - { - sys::Mutex::ScopedLock l(lock); - brokers.clear(); - for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { - BrokerInfo b(i->asMap()); - brokers[b.getSystemId()] = b; - } + sys::Mutex::ScopedLock l(lock); + brokers.clear(); + for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) { + BrokerInfo b(i->asMap()); + brokers[b.getSystemId()] = b; } - update(); + update(l); } types::Variant::List Membership::asList() const { sys::Mutex::ScopedLock l(lock); + return asList(l); +} + +types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const { types::Variant::List list; for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); return list; } -void Membership::update() { +void Membership::update(sys::Mutex::ScopedLock& l) { if (updateCallback) { - types::Variant::List list; - for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i) - list.push_back(i->second.asMap()); - updateCallback(list); + types::Variant::List list = asList(l); + IdSet ids = otherBackups(l); + sys::Mutex::ScopedUnlock u(lock); + updateCallback(list, ids); } } +IdSet Membership::otherBackups() const { + sys::Mutex::ScopedLock l(lock); + return otherBackups(l); +} + +IdSet Membership::otherBackups(sys::Mutex::ScopedLock&) const { + IdSet result; + for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i) + if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self) + result.insert(i->second.getSystemId()); + return result; +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h index 8af03e0f40..bdb55425dc 100644 --- a/qpid/cpp/src/qpid/ha/Membership.h +++ b/qpid/cpp/src/qpid/ha/Membership.h @@ -23,7 +23,7 @@ */ #include "BrokerInfo.h" -#include "LogPrefix.h" +#include "types.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include "qpid/types/Variant.h" @@ -40,25 +40,32 @@ namespace ha { class Membership { public: - Membership(LogPrefix lp, boost::function<void(const types::Variant::List&)> updateFn) - : logPrefix(lp), updateCallback(updateFn) {} + typedef boost::function<void (const types::Variant::List&, + const IdSet&) > UpdateCallback; + + Membership(const types::Uuid& self_, UpdateCallback updateFn) + : self(self_), updateCallback(updateFn) {} void reset(const BrokerInfo& b); ///< Reset to contain just one member. void add(const BrokerInfo& b); void remove(const types::Uuid& id); bool contains(const types::Uuid& id); + /** Return IDs of all backups other than self */ + IdSet otherBackups() const; void assign(const types::Variant::List&); types::Variant::List asList() const; private: typedef std::map<types::Uuid, BrokerInfo> BrokerMap; - void update(); + IdSet otherBackups(sys::Mutex::ScopedLock&) const; + types::Variant::List asList(sys::Mutex::ScopedLock&) const; + void update(sys::Mutex::ScopedLock&); mutable sys::Mutex lock; - LogPrefix logPrefix; + types::Uuid self; BrokerMap brokers; - boost::function<void(const types::Variant::List&)> updateCallback; + UpdateCallback updateCallback; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index ae48e48c6f..63cba14484 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -22,9 +22,11 @@ #include "ConnectionExcluder.h" #include "HaBroker.h" #include "Primary.h" +#include "ReplicatingSubscription.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" #include <boost/bind.hpp> namespace qpid { @@ -32,52 +34,29 @@ namespace ha { Primary* Primary::instance = 0; -Primary::Primary(HaBroker& b) : - haBroker(b), logPrefix(b), - expected(b.getSettings().expectedBackups), - unready(0), activated(false) +Primary::Primary(HaBroker& hb, const IdSet& backups) : + haBroker(hb), logPrefix("HA primary: "), + unready(0), activated(false), + queues(hb.getBroker(), hb.getReplicationTest(), backups) { + assert(instance == 0); instance = this; // Let queue replicators find us. - if (expected == 0) // No need for ready check - activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor. + if (backups.empty()) { + QPID_LOG(debug, logPrefix << "Not waiting for backups"); + activated = true; + } else { - // Set up the primary-ready check: ready when all queues have - // expected number of backups. Note clients are excluded at this point - // so dont't have to worry about changes to the set of queues. - HaBroker::QueueNames names = haBroker.getActiveBackups(); - for (HaBroker::QueueNames::const_iterator i = names.begin(); i != names.end(); ++i) - { - queues[*i] = 0; - ++unready; - QPID_LOG(debug, logPrefix << "Need backup of " << *i - << ", " << unready << " unready queues"); - } - if (queues.empty()) - activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor. - else { - QPID_LOG(debug, logPrefix << "Waiting for " << expected - << " backups on " << queues.size() << " queues"); - } + QPID_LOG(debug, logPrefix << "Waiting for backups: " << backups); } } -void Primary::readyReplica(const std::string& q) { +void Primary::readyReplica(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); - if (!activated) { - QueueCounts::iterator i = queues.find(q); - if (i != queues.end()) { - ++i->second; - if (i->second == expected) --unready; - QPID_LOG(debug, logPrefix << i->second << " backups caught up on " << q - << ", " << unready << " unready queues"); - if (unready == 0) activate(l); - } + if (queues.ready(rs.getQueue(), rs.getBrokerInfo().getSystemId()) && !activated) { + activated = true; + haBroker.activate(); + QPID_LOG(notice, logPrefix << "Activated, all initial queues are safe."); } } -void Primary::activate(sys::Mutex::ScopedLock&) { - activated = true; - haBroker.activate(); -} - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 7e347fdbe2..3a1a9be9e8 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -22,7 +22,8 @@ * */ -#include "LogPrefix.h" +#include "UnreadyQueueSet.h" +#include "types.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> #include <map> @@ -36,10 +37,11 @@ class Queue; namespace ha { class HaBroker; +class ReplicatingSubscription; /** - * State associated with a primary broker. Tracks replicating - * subscriptions to determine when primary is ready. + * State associated with a primary broker. Tracks replicating + * subscriptions to determine when primary is active. * * THREAD SAFE: readyReplica is called in arbitray threads. */ @@ -47,22 +49,22 @@ class Primary { public: static Primary* get() { return instance; } - Primary(HaBroker& b); - void readyReplica(const std::string& q); - void removeReplica(const std::string& q); + Primary(HaBroker& hb, const IdSet& expectedBackups); - private: - typedef std::map<std::string, size_t> QueueCounts; + void readyReplica(const ReplicatingSubscription&); + void removeReplica(const std::string& q); - void activate(sys::Mutex::ScopedLock&); + UnreadyQueueSet& getUnreadyQueueSet() { return queues; } + bool isActive() { return activated; } + private: sys::Mutex lock; HaBroker& haBroker; - LogPrefix logPrefix; - QueueCounts queues; + std::string logPrefix; size_t expected, unready; bool activated; + UnreadyQueueSet queues; static Primary* instance; }; diff --git a/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h new file mode 100644 index 0000000000..1aa61b2dea --- /dev/null +++ b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h @@ -0,0 +1,76 @@ +#ifndef QPID_HA_PRIMARYCONNECTIONOBSERVER_H +#define QPID_HA_PRIMARYCONNECTIONOBSERVER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "ConnectionObserver.h" +#include "qpid/broker/ConnectionObserver.h" +#include "qpid/types/Uuid.h" +#include "qpid/sys/Mutex.h" +#include <boost/function.hpp> + +namespace qpid { + +namespace broker { +class Connection; +} + +namespace ha { +class HaBroker; + +/** + * Monitor connections on a primary broker. Update membership and + * primary readiness. + * + * THREAD SAFE: has no state, just mediates between other thread-safe objects. + */ +class PrimaryConnectionMonitor : public broker::ConnectionObserver +{ + public: + PrimaryConnectionMonitor(HaBroker& hb) : haBroker(hb) {} + + void opened(broker::Connection& connection) { + BrokerInfo info; + if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { + QPID_LOG(debug, "HA primary: Backup connected: " << info); + haBroker.getMembership().add(info); + // FIXME aconway 2012-06-01: changes to expected backup set for unready queues. + } + } + + void closed(broker::Connection& connection) { + BrokerInfo info; + if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { + QPID_LOG(debug, "HA primary: Backup disconnected: " << info); + haBroker.getMembership().remove(info.getSystemId()); + // FIXME aconway 2012-06-01: changes to expected backup set for unready queues. + } + } + private: + void reject(broker::Connection&); + HaBroker& haBroker; + }; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_PRIMARYCONNECTIONOBSERVER_H*/ diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp new file mode 100644 index 0000000000..55dc6b0d50 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "QueueGuard.h" +#include "ReplicatingSubscription.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/log/Statement.h" +#include <sstream> + +namespace qpid { +namespace ha { + +using namespace broker; +using sys::Mutex; + +QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) + : queue(q), subscription(0) +{ + // Set a log prefix message that identifies the remote broker. + std::ostringstream os; + os << "HA subscription " << queue.getName() << "@" << info.getLogId() << ": "; + logPrefix = os.str(); +} + +void QueueGuard::initialize() { + Mutex::ScopedLock l(lock); + queue.addObserver(shared_from_this()); +} + +void QueueGuard::enqueued(const QueuedMessage& qm) { + // Delay completion + QPID_LOG(trace, logPrefix << "Delaying completion of " << qm); + qm.payload->getIngressCompletion().startCompleter(); + { + sys::Mutex::ScopedLock l(lock); + assert(!delayed.contains(qm.position)); + delayed += qm.position; + } +} + +// FIXME aconway 2012-06-05: ERROR, must call on ReplicatingSubscription + +void QueueGuard::dequeued(const QueuedMessage& qm) { + QPID_LOG(trace, logPrefix << "Dequeued " << qm); + ReplicatingSubscription* rs = 0; + { + sys::Mutex::ScopedLock l(lock); + rs = subscription; + } + if (rs) rs->dequeued(qm); +} + +void QueueGuard::cancel() { + queue.removeObserver(shared_from_this()); + queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1)); +} + +void QueueGuard::attach(ReplicatingSubscription& rs) { + sys::Mutex::ScopedLock l(lock); + subscription = &rs; +} + +void QueueGuard::complete(const QueuedMessage& qm, sys::Mutex::ScopedLock&) { + QPID_LOG(trace, logPrefix << "Completed " << qm); + // The same message can be completed twice, by acknowledged and + // dequeued, remove it from the set so we only call + // finishCompleter() once + if (delayed.contains(qm.position)) { + qm.payload->getIngressCompletion().finishCompleter(); + delayed -= qm.position; + } +} + +void QueueGuard::complete(const QueuedMessage& qm) { + Mutex::ScopedLock l(lock); + complete(qm, l); +} + +// FIXME aconway 2012-06-04: TODO support for timeout. + +}} // namespaces qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h new file mode 100644 index 0000000000..739c1e0e13 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -0,0 +1,101 @@ +#ifndef QPID_HA_QUEUEGUARD_H +#define QPID_HA_QUEUEGUARD_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include "qpid/broker/QueueObserver.h" +#include "qpid/framing/SequenceNumber.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/types/Uuid.h" +#include "qpid/sys/Mutex.h" +#include <boost/shared_ptr.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <deque> +#include <set> + +namespace qpid { +namespace broker { +class Queue; +class QueuedMessage; +class Message; +} + +namespace ha { +class BrokerInfo; +class ReplicatingSubscription; + +/** + * A queue guard is a QueueObserver that delays completion of new + * messages arriving on a queue. It works as part of a + * ReplicatingSubscription to ensure messages are not acknowledged + * till they have been replicated. + * + * The guard is created before the ReplicatingSubscription to protect + * messages arriving before the creation of the subscription has not + * yet seen. + * + * THREAD SAFE: Called concurrently via QueueObserver::enqueued in + * arbitrary connection threads, and from ReplicatingSubscription + * in the subscriptions thread. + */ +class QueueGuard : public broker::QueueObserver, + public boost::enable_shared_from_this<QueueGuard> +{ + public: + QueueGuard(broker::Queue& q, const BrokerInfo&); + + /** Must be called after ctor, requires a shared_ptr to this to exist. + * Must be called before ReplicatingSubscription::initialize(this) + */ + void initialize(); + + /** QueueObserver override. Delay completion of the message. */ + void enqueued(const broker::QueuedMessage&); + + /** QueueObserver override: Complete a delayed message */ + void dequeued(const broker::QueuedMessage&); + + /** Complete a delayed message. */ + void complete(const broker::QueuedMessage&); + + /** Complete all delayed messages. */ + void cancel(); + + void attach(ReplicatingSubscription&); + + // Unused QueueObserver functions. + void acquired(const broker::QueuedMessage&) {} + void requeued(const broker::QueuedMessage&) {} + + private: + sys::Mutex lock; + std::string logPrefix; + broker::Queue& queue; + framing::SequenceSet delayed; + ReplicatingSubscription* subscription; + + void complete(const broker::QueuedMessage&, sys::Mutex::ScopedLock&); +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_QUEUEGUARD_H*/ diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 58c5e452d7..4d12015008 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -21,7 +21,6 @@ #include "Counter.h" #include "QueueReplicator.h" -#include "HaBroker.h" #include "ReplicatingSubscription.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" @@ -60,15 +59,15 @@ bool QueueReplicator::isEventKey(const std::string key) { return ret; } -QueueReplicator::QueueReplicator(HaBroker& hb, +QueueReplicator::QueueReplicator(const BrokerInfo& info, boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) : Exchange(replicatorName(q->getName()), 0, q->getBroker()), - haBroker(hb), logPrefix(hb), queue(q), link(l) + logPrefix("HA backup of "+q->getName()+": "), + queue(q), link(l), brokerInfo(info) { Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); - logPrefix.setMessage(q->getName()); QPID_LOG(info, logPrefix << "Created"); } @@ -119,7 +118,7 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, queue->getPosition()); settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO, - haBroker.getBrokerInfo().asFieldTable()); + brokerInfo.asFieldTable()); SequenceNumber front; if (ReplicatingSubscription::getFront(*queue, front)) settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front); @@ -143,7 +142,7 @@ template <class T> T decodeContent(Message& m) { } } -void QueueReplicator::dequeue(SequenceNumber n, const sys::Mutex::ScopedLock&) { +void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) { // Thread safe: only calls thread safe Queue functions. if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet QueuedMessage message; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 1b221a8d28..2f55e6cc85 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -22,7 +22,7 @@ * */ -#include "LogPrefix.h" +#include "BrokerInfo.h" #include "qpid/broker/Exchange.h" #include "qpid/framing/SequenceSet.h" #include <boost/enable_shared_from_this.hpp> @@ -42,7 +42,6 @@ class Deliverable; namespace ha { class Counter; -class HaBroker; /** * Exchange created on a backup broker to replicate a queue on the primary. @@ -63,7 +62,7 @@ class QueueReplicator : public broker::Exchange, /** Test if a string is an event key */ static bool isEventKey(const std::string key); - QueueReplicator(HaBroker&, + QueueReplicator(const BrokerInfo&, boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); @@ -81,15 +80,15 @@ class QueueReplicator : public broker::Exchange, private: void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); - void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); + void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&); - HaBroker& haBroker; - LogPrefix logPrefix; + std::string logPrefix; std::string bridgeName; sys::Mutex lock; boost::shared_ptr<broker::Queue> queue; boost::shared_ptr<broker::Link> link; boost::shared_ptr<broker::Bridge> bridge; + BrokerInfo brokerInfo; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 4490e309aa..f7bfe6fda0 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -19,8 +19,8 @@ * */ +#include "QueueGuard.h" #include "ReplicatingSubscription.h" -#include "HaBroker.h" #include "Primary.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" @@ -129,14 +129,11 @@ ReplicatingSubscription::Factory::create( boost::shared_ptr<ReplicatingSubscription> rs; if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { rs.reset(new ReplicatingSubscription( - haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - queue->addObserver(rs); - // NOTE: initialize must be called _after_ addObserver, so - // messages can't be enqueued after setting readyPosition - // but before registering the observer. - rs->initialize(); + boost::shared_ptr<QueueGuard> guard(new QueueGuard(*queue, rs->getBrokerInfo())); + guard->initialize(); // Must call before ReplicatingSubscription::initialize + rs->initialize(guard); } return rs; } @@ -177,7 +174,6 @@ ostream& operator<<(ostream& o, const QueueRange& qr) { } ReplicatingSubscription::ReplicatingSubscription( - HaBroker& hb, SemanticState* parent, const string& name, Queue::shared_ptr queue, @@ -190,24 +186,19 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments), - haBroker(hb), - logPrefix(hb), dummy(new Queue(mask(name))), ready(false) { try { + FieldTable ft; + if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) + throw Exception("Replicating subscription does not have broker info"); + info.assign(ft); + // Set a log prefix message that identifies the remote broker. - // FIXME aconway 2012-05-24: use URL instead of host:port, include transport? ostringstream os; - os << queue->getName() << "@"; - FieldTable ft; - if (arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) { - BrokerInfo info(ft); - os << info.getHostName() << ":" << info.getPort(); - } - else - os << parent->getSession().getConnection().getUrl(); - logPrefix.setMessage(os.str()); + os << "HA subscription " << queue->getName() << "@" << info.getLogId() << ": "; + logPrefix = os.str(); QueueRange primary(*queue); QueueRange backup(arguments); @@ -251,17 +242,20 @@ ReplicatingSubscription::~ReplicatingSubscription() { } // Called in subscription's connection thread when the subscription is created. -void ReplicatingSubscription::initialize() { - sys::Mutex::ScopedLock l(lock); // QueueObserver methods can be called concurrently +void ReplicatingSubscription::initialize(const boost::shared_ptr<QueueGuard>& g) { + sys::Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. + // Attach to the guard. + guard = g; + guard->attach(*this); // Send initial dequeues and position to the backup. - // There most be a shared_ptr(this) when sending. + // There must be a shared_ptr(this) when sending. sendDequeueEvent(l); sendPositionEvent(position, l); backupPosition = position; // Set the ready position. All messages after this position have - // been seen by us as QueueObserver. + // been seen by the guard. QueueRange range; { // Drop the lock, QueueRange will lock the queues message lock @@ -320,82 +314,28 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { } } -void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) { +void ReplicatingSubscription::setReady(sys::Mutex::ScopedLock&) { if (ready) return; ready = true; // Notify Primary that a subscription is ready. { sys::Mutex::ScopedUnlock u(lock); QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); - if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName()); - } -} - -// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg - -// Mark a message completed. May be called by acknowledge or dequeued, -// in arbitrary connection threads. -void ReplicatingSubscription::complete( - const QueuedMessage& qm, const sys::Mutex::ScopedLock&) -{ - // Handle completions for the subscribed queue, not the internal event queue. - if (qm.queue == getQueue().get()) { - QPID_LOG(trace, logPrefix << "Completed " << qm); - Delayed::iterator i= delayed.find(qm.position); - // The same message can be completed twice, by acknowledged and - // dequeued, remove it from the set so it only gets completed - // once. - if (i != delayed.end()) { - assert(i->second.payload == qm.payload); - qm.payload->getIngressCompletion().finishCompleter(); - delayed.erase(i); - } + if (Primary::get()) Primary::get()->readyReplica(*this); } } -// Called before we get notified of the message being available and -// under the message lock in the queue. -// Called in arbitrary connection thread *with the queue lock held* -void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { - // Delay completion - QPID_LOG(trace, logPrefix << "Delaying completion of " << qm); - qm.payload->getIngressCompletion().startCompleter(); - { - sys::Mutex::ScopedLock l(lock); - assert(delayed.find(qm.position) == delayed.end()); - delayed[qm.position] = qm; - } -} - -// Function to complete a delayed message, called by cancel() -void ReplicatingSubscription::cancelComplete( - const Delayed::value_type& v, const sys::Mutex::ScopedLock&) -{ - QPID_LOG(trace, logPrefix << "Cancel completed " << v.second); - v.second.payload->getIngressCompletion().finishCompleter(); -} - // Called in the subscription's connection thread. void ReplicatingSubscription::cancel() { - getQueue()->removeObserver( - boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); - { - sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix << "Cancel backup subscription to " - << getQueue()->getName()); - for_each(delayed.begin(), delayed.end(), - boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); - delayed.clear(); - } + guard->cancel(); ConsumerImpl::cancel(); } // Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { - sys::Mutex::ScopedLock l(lock); // Finish completion of message, it has been acknowledged by the backup. - complete(msg, l); + guard->complete(msg); } // Hide the "queue deleted" error for a ReplicatingSubscription when a @@ -403,7 +343,7 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { bool ReplicatingSubscription::hideDeletedError() { return true; } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&) +void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&) { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); @@ -418,24 +358,27 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&) } } -// QueueObserver override. Called after the message has been removed +// Called via QueueObserver::dequeued override on guard. +// Called after the message has been removed // from the deque and under the messageLock in the queue. Called in // arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { + bool doComplete = false; QPID_LOG(trace, logPrefix << "Dequeued " << qm); { sys::Mutex::ScopedLock l(lock); dequeues.add(qm.position); - // If we have not yet sent this message to the backup, then - // complete it now as it will never be accepted. - if (qm.position > position) complete(qm, l); + if (qm.position > position) doComplete = true; } + // If we have not yet sent this message to the backup, then + // complete it now as it will never be accepted. + if (doComplete) guard->complete(qm); notify(); // Ensure a call to doDispatch } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::Mutex::ScopedLock&) +void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex::ScopedLock&) { if (pos == backupPosition) return; // No need to send. QPID_LOG(trace, logPrefix << "Sending position " << pos diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index ab02949952..e69a2159e6 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -23,6 +23,7 @@ */ #include "QueueReplicator.h" // For DEQUEUE_EVENT_KEY +#include "BrokerInfo.h" #include "qpid/broker/SemanticState.h" #include "qpid/broker/QueueObserver.h" #include "qpid/broker/ConsumerFactory.h" @@ -43,27 +44,25 @@ class Buffer; } namespace ha { -class LogPrefix; +class QueueGuard; /** - * A susbcription that represents a backup replicating a queue. + * A susbcription that replicates to a remote backup. * - * Runs on the primary. Delays completion of messages till the backup - * has acknowledged, informs backup of locally dequeued messages. + * Runs on the primary. In conjunction with a QueueGuard, delays + * completion of messages till the backup has acknowledged, informs + * backup of locally dequeued messages. * - * THREAD SAFE: Used as a consumer in subscription's connection - * thread, and as a QueueObserver in arbitrary connection threads. + * THREAD SAFE: Called in subscription's connection thread but also + * in arbitrary connection threads via dequeued. * * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer. * */ -class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, - public broker::QueueObserver +class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl { public: struct Factory : public broker::ConsumerFactory { - HaBroker& haBroker; - Factory(HaBroker& hb) : haBroker(hb) {} boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , @@ -88,9 +87,9 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, */ static bool getNext(broker::Queue&, framing::SequenceNumber from, framing::SequenceNumber& result); + static bool isEmpty(broker::Queue&); - ReplicatingSubscription(HaBroker&, - broker::SemanticState* parent, + ReplicatingSubscription(broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, @@ -98,11 +97,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, ~ReplicatingSubscription(); - // QueueObserver overrides. NB called with queue lock held. - void enqueued(const broker::QueuedMessage&); - void dequeued(const broker::QueuedMessage&); - void acquired(const broker::QueuedMessage&) {} - void requeued(const broker::QueuedMessage&) {} + // Called via QueueGuard::dequeued + void dequeued(const broker::QueuedMessage& qm); // Consumer overrides. bool deliver(broker::QueuedMessage& msg); @@ -111,30 +107,30 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, bool browseAcquired() const { return true; } bool hideDeletedError(); + /** Initialization that must be done after construction because it - * requires a shared_ptr to this to exist. + * requires a shared_ptr to this to exist. Will attach to guard */ - void initialize(); + void initialize(const boost::shared_ptr<QueueGuard>& guard); + + BrokerInfo getBrokerInfo() const { return info; } protected: bool doDispatch(); - private: - typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed; - HaBroker& haBroker; - LogPrefix logPrefix; + private: + std::string logPrefix; boost::shared_ptr<broker::Queue> dummy; // Used to send event messages - Delayed delayed; framing::SequenceSet dequeues; - framing::SequenceNumber backupPosition; framing::SequenceNumber readyPosition; + framing::SequenceNumber backupPosition; bool ready; + BrokerInfo info; + boost::shared_ptr<QueueGuard> guard; - void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&); - void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&); - void sendDequeueEvent(const sys::Mutex::ScopedLock&); - void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&); - void setReady(const sys::Mutex::ScopedLock&); + void sendDequeueEvent(sys::Mutex::ScopedLock&); + void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&); + void setReady(sys::Mutex::ScopedLock&); void sendEvent(const std::string& key, framing::Buffer&); friend struct Factory; }; diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp new file mode 100644 index 0000000000..1db101dc94 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "ReplicationTest.h" +#include "qpid/broker/Queue.h" +#include "qpid/framing/FieldTable.h" + +namespace qpid { +namespace ha { + +using types::Variant; + +ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) { + Enum<ReplicateLevel> rl; + if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get()); + else return replicateDefault; +} + +ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) + return replicateLevel(f.getAsString(QPID_REPLICATE)); + else + return replicateDefault; +} + +ReplicateLevel ReplicationTest::replicateLevel(const Variant::Map& m) { + Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) + return replicateLevel(i->second.asString()); + else + return replicateDefault; +} + +namespace { +const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); +} + +bool ReplicationTest::isReplicated(const Variant::Map& args, bool autodelete, bool exclusive) { + bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end(); + return replicateLevel(args) && !ignore; +} + +bool ReplicationTest::isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive) { + bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT); + return replicateLevel(args) && !ignore; +} + +bool ReplicationTest::isReplicated(const broker::Queue& q) { + return isReplicated(q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner()); +} + + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h index da01154a80..4851f34e84 100644 --- a/qpid/cpp/src/qpid/ha/LogPrefix.h +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h @@ -1,5 +1,5 @@ -#ifndef QPID_HA_LOGPREFIX_H -#define QPID_HA_LOGPREFIX_H +#ifndef QPID_HA_REPLICATIONTEST_H +#define QPID_HA_REPLICATIONTEST_H /* * @@ -22,38 +22,41 @@ * */ -#include "Enum.h" -#include <iosfwd> +#include "types.h" +#include "qpid/types/Variant.h" #include <string> namespace qpid { -namespace ha { -class HaBroker; +namespace broker { +class Queue; +} + +namespace framing { +class FieldTable; +} +namespace ha { /** - * Standard information to prefix log messages. + * Test whether something is replicated, taking into account the + * default replication level. */ -class LogPrefix +class ReplicationTest { public: - /** For use by all classes other than HaBroker */ - LogPrefix(HaBroker& hb, const std::string& queue=std::string()); - LogPrefix(LogPrefix& lp, const std::string& queue); - /** For use by the HaBroker itself. */ - LogPrefix(BrokerStatus&); + ReplicationTest(ReplicateLevel replicateDefault_) : + replicateDefault(replicateDefault_) {} - void setMessage(const std::string&); + ReplicateLevel replicateLevel(const std::string& str); + ReplicateLevel replicateLevel(const framing::FieldTable& f); + ReplicateLevel replicateLevel(const types::Variant::Map& m); + bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive); + bool isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive); + bool isReplicated(const broker::Queue&); private: - HaBroker* haBroker; - BrokerStatus* status; - std::string tail; - friend std::ostream& operator<<(std::ostream& o, const LogPrefix& l); + ReplicateLevel replicateDefault; }; - -std::ostream& operator<<(std::ostream& o, const LogPrefix& l); - }} // namespace qpid::ha -#endif /*!QPID_HA_LOGPREFIX_H*/ +#endif /*!QPID_HA_REPLICATIONTEST_H*/ diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index 08d42471b8..213a5f64d5 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -22,7 +22,7 @@ * */ -#include "Enum.h" +#include "types.h" #include <string> namespace qpid { @@ -34,13 +34,12 @@ namespace ha { class Settings { public: - Settings() : cluster(false), expectedBackups(0), replicateDefault(NONE) + Settings() : cluster(false), replicateDefault(NONE) {} bool cluster; // True if we are a cluster member. std::string clientUrl; std::string brokerUrl; - size_t expectedBackups; Enum<ReplicateLevel> replicateDefault; std::string username, password, mechanism; private: diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp new file mode 100644 index 0000000000..279eb2c0e1 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "UnreadyQueueSet.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" +#include <boost/bind.hpp> +#include <iostream> +#include <algorithm> + +namespace qpid { +namespace ha { + +using sys::Mutex; + +UnreadyQueueSet::UnreadyQueueSet(broker::Broker& broker, ReplicationTest rt, const IdSet& ids) : + logPrefix("HA unsafe queues: "), replicationTest(rt), expected(ids), + initializing(true), initialQueues(0) +{ + if (!expected.empty()) { + QPID_LOG(debug, logPrefix << "Recovering, waiting for backups: " << expected); + broker.getQueues().eachQueue(boost::bind(&UnreadyQueueSet::queueCreate, this, _1)); + initialQueues = queues.size(); + } + initializing = false; +} + +void UnreadyQueueSet::setExpectedBackups(const IdSet& ids) { + Mutex::ScopedLock l(lock); + expected = ids; +} + +bool UnreadyQueueSet::ready(const boost::shared_ptr<broker::Queue>& q, const types::Uuid& id) { + Mutex::ScopedLock l(lock); + QPID_LOG(debug, logPrefix << "Replicating subscription ready: " << id << " on " + << q->getName()); + QueueMap::iterator i = queues.find(q); + if (i != queues.end()) { + // if (i->second.guard->ready(id)) { + // QPID_LOG(debug, logPrefix << "Releasing guard on " << q->getName()); + // remove(i, l); + if (i->second.initial) --initialQueues; + // } + } + return initialQueues == 0; +} + +void UnreadyQueueSet::queueCreate(const boost::shared_ptr<broker::Queue>& q) { + Mutex::ScopedLock l(lock); + if (replicationTest.isReplicated(*q) && !expected.empty()) { + QPID_LOG(debug, logPrefix << "Guarding " << q->getName() << " for " << expected); + // GuardPtr guard(new QueueGuard(*q, expected)); + // FIXME aconway 2012-06-05: q->addObserver(guard); + queues[q] = Entry(initializing);//, guard); + } +} + +void UnreadyQueueSet::queueDestroy(const boost::shared_ptr<broker::Queue>& q) { + Mutex::ScopedLock l(lock); + remove(queues.find(q), l); +} + +void UnreadyQueueSet::remove(QueueMap::iterator i, sys::Mutex::ScopedLock&) { + if (i != queues.end()) { + QPID_LOG(debug, logPrefix << "Queue is safe: " << i->first->getName()); + // FIXME aconway 2012-06-05: i->first->removeObserver(i->second.guard); + //i->second.guard->release(); + queues.erase(i); + } +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h new file mode 100644 index 0000000000..0731282c2b --- /dev/null +++ b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h @@ -0,0 +1,88 @@ +#ifndef QPID_HA_BROKERGUARD_H +#define QPID_HA_BROKERGUARD_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReplicationTest.h" +#include "types.h" +#include "qpid/broker/ConfigurationObserver.h" +#include "qpid/sys/Mutex.h" +#include <set> +#include <map> + +namespace qpid { + +namespace broker { +class Broker; +} + +namespace ha { +class QueueGuard; + +/** + * ConfigurationObserver that sets a QueueGuard on all existing/new queues + * so they are safe until their ReplicatingSubscriptions catch up. + * + * THREAD SAFE: Called concurrently as a ConfigurationObserver and via ready() + */ +class UnreadyQueueSet : public qpid::broker::ConfigurationObserver +{ + public: + /** Caller should call broker.getConfigurationObservers().add(shared_ptr(this)) */ + UnreadyQueueSet(broker::Broker&, ReplicationTest rt, const IdSet& expected); + + void setExpectedBackups(const IdSet&); + + /** Backup id is ready on queue. + *@return true if all initial queuse are now ready. + */ + bool ready(const boost::shared_ptr<broker::Queue>&, const types::Uuid& id); + + // ConfigurationObserver overrides + void queueCreate(const boost::shared_ptr<broker::Queue>&); + void queueDestroy(const boost::shared_ptr<broker::Queue>&); + + // FIXME aconway 2012-05-31: handle IdSet changes. + private: + typedef boost::shared_ptr<QueueGuard> GuardPtr; + struct Entry { + bool initial; + // FIXME aconway 2012-06-05: GuardPtr guard; + // Entry(bool i=false, GuardPtr g=GuardPtr()) : initial(i), guard(g) {} + Entry(bool i=false) : initial(i) {} + }; + typedef std::map<boost::shared_ptr<broker::Queue>, Entry> QueueMap; + + void remove(QueueMap::iterator i, sys::Mutex::ScopedLock&); + + sys::Mutex lock; + std::string logPrefix; + ReplicationTest replicationTest; + IdSet expected; + QueueMap queues; + bool initializing; + size_t initialQueues; // Count of initial queues still unready. +}; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_BROKERGUARD_H*/ diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml index ce14032694..3da482e732 100644 --- a/qpid/cpp/src/qpid/ha/management-schema.xml +++ b/qpid/cpp/src/qpid/ha/management-schema.xml @@ -31,9 +31,6 @@ <property name="publicUrl" type="sstr" desc="URL advertized to clients to connect to the cluster."/> - <property name="expectedBackups" type="uint16" - desc="Number of HA backup brokers expected."/> - <property name="replicateDefault" type="sstr" desc="Replication for queues/exchanges with no qpid.replicate argument"/> @@ -51,10 +48,6 @@ <arg name="url" type="sstr" dir="I"/> </method> - <method name="setExpectedBackups" desc="Set number of backups expected"> - <arg name="expectedBackups" type="uint16" dir="I"/> - </method> - <method name="replicate" desc="Replicate individual queue from remote broker."> <arg name="broker" type="sstr" dir="I"/> <arg name="queue" type="sstr" dir="I"/> diff --git a/qpid/cpp/src/qpid/ha/Enum.cpp b/qpid/cpp/src/qpid/ha/types.cpp index a5ee7ea51f..92acd76fca 100644 --- a/qpid/cpp/src/qpid/ha/Enum.cpp +++ b/qpid/cpp/src/qpid/ha/types.cpp @@ -19,30 +19,33 @@ * */ -#include "Enum.h" +#include "types.h" #include "qpid/Msg.h" #include "qpid/Exception.h" #include <algorithm> #include <iostream> +#include <iterator> #include <assert.h> namespace qpid { namespace ha { -const std::string QPID_REPLICATE("qpid.replicate"); +using namespace std; -std::string EnumBase::str() const { +const string QPID_REPLICATE("qpid.replicate"); + +string EnumBase::str() const { assert(value < count); return names[value]; } -void EnumBase::parse(const std::string& s) { +void EnumBase::parse(const string& s) { if (!parseNoThrow(s)) throw Exception(QPID_MSG("Invalid " << names[count] << " value: " << s)); } -bool EnumBase::parseNoThrow(const std::string& s) { - const char** i = std::find(names, names+count, s); +bool EnumBase::parseNoThrow(const string& s) { + const char** i = find(names, names+count, s); value = i - names; return value < count; } @@ -58,15 +61,21 @@ template <> const char* Enum<BrokerStatus>::NAMES[] = { }; template <> const size_t Enum<BrokerStatus>::N = 7; -std::ostream& operator<<(std::ostream& o, EnumBase e) { +ostream& operator<<(ostream& o, EnumBase e) { return o << e.str(); } -std::istream& operator>>(std::istream& i, EnumBase& e) { - std::string s; +istream& operator>>(istream& i, EnumBase& e) { + string s; i >> s; e.parse(s); return i; } +ostream& operator<<(ostream& o, const IdSet& ids) { + ostream_iterator<qpid::types::Uuid> out(o, " "); + copy(ids.begin(), ids.end(), out); + return o; +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Enum.h b/qpid/cpp/src/qpid/ha/types.h index adad21d2d4..e12c039c79 100644 --- a/qpid/cpp/src/qpid/ha/Enum.h +++ b/qpid/cpp/src/qpid/ha/types.h @@ -23,7 +23,9 @@ */ #include "qpid/types/Variant.h" +#include "qpid/types/Uuid.h" #include <string> +#include <set> #include <iosfwd> namespace qpid { @@ -93,5 +95,15 @@ inline bool isPrimary(BrokerStatus s) { inline bool isBackup(BrokerStatus s) { return !isPrimary(s); } extern const std::string QPID_REPLICATE; + +// FIXME aconway 2012-06-04: rename types.h->types.h + +/** + * Define IdSet type, not a typedef so we can overload operator << + */ +class IdSet : public std::set<types::Uuid> {}; + +std::ostream& operator<<(std::ostream& o, const IdSet& ids); + }} // qpid::ha #endif /*!QPID_HA_ENUM_H*/ diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 1b93504b64..c32b7f2c96 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -248,7 +248,7 @@ class Broker(Popen): self.log = "%s.log" % self.name i = 1 while (os.path.exists(self.log)): - self.log = "%s-%d.log" % (self.name, i) + self.log = "%s.%d.log" % (self.name, i) i += 1 def get_log(self): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 6e270851f0..86679611c4 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -24,7 +24,7 @@ from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Co from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition -from logging import getLogger, WARN, ERROR, DEBUG +from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent from uuid import UUID @@ -75,7 +75,10 @@ class HaBroker(Broker): if not self._agent: self._agent = QmfAgent(self.host_port()) return self._agent - def ha_status(self): self.agent().getHaBroker().status + def ha_status(self): return self.agent().getHaBroker().status + + def wait_status(self, status): + assert retry(lambda: self.ha_status() == status), "%r != %r"%(self.ha_status(), status) # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -97,6 +100,14 @@ class HaBroker(Broker): try: wait_address(bs, address) finally: bs.connection.close() + def assert_browse(self, queue, expected, **kwargs): + """Verify queue contents by browsing.""" + bs = self.connect().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + def assert_browse_backup(self, queue, expected, **kwargs): """Combines wait_backup and assert_browse_retry.""" bs = self.connect_admin().session() @@ -157,9 +168,11 @@ class HaCluster(object): if promote_next: self[(i+1) % len(self)].promote() def restart(self, i): + """Start a broker with the same name and data directory. It will get + a separate log file: foo.n.log""" b = self._brokers[i] self._brokers[i] = HaBroker( - self.test, name=self.next_name(), port=b.port(), brokers_url=self.url, + self.test, name=b.name, port=b.port(), brokers_url=self.url, **self.kwargs) def bounce(self, i, promote_next=True): @@ -347,7 +360,8 @@ class ReplicationTests(BrokerTest): primary.kill() assert retry(lambda: not is_running(primary.pid)) backup.promote() - self.assert_browse_retry(s, "q", ["foo"]) + sender.send("bar") + self.assert_browse_retry(s, "q", ["foo", "bar"]) c.close() def test_failover_cpp(self): @@ -630,27 +644,6 @@ class ReplicationTests(BrokerTest): assert valid_address(s, "ad") assert valid_address(s, "time") - def test_recovering(self): - """Verify that the primary broker does not go active until expected - backups have connected""" - cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"]) - c = cluster[0].connect() - for i in xrange(10): - s = c.session().sender("q%s;{create:always}"%i) - for j in xrange(100): s.send(str(j)) - cluster.kill(0) # Fail over to 1 - cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients. - cluster.restart(0) - c = retry(cluster[1].try_connect) - self.assertTrue(c) - cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]); - - # Verify in logs that all queue catch-up happened before the transition to active. - log = open(cluster[1].log).read() - i = log.find("Status change: recovering -> active") - self.failIf(i < 0) - self.assertEqual(log.find("caught up", i), -1) - def test_broker_info(self): """Check that broker information is correctly published via management""" cluster = HaCluster(self, 3) @@ -667,7 +660,7 @@ class ReplicationTests(BrokerTest): # Check that all brokers have the same membership as the cluster for broker in cluster: qmf = broker.agent().getHaBroker() - assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker) # Add a new broker, check it is updated everywhere b = cluster.start() cluster_ports.append(b.port()) @@ -720,12 +713,10 @@ class LongTests(BrokerTest): def test_failover_send_receive(self): """Test failover with continuous send-receive""" - # Start a cluster, all members will be killed during the test. - # FIXME aconway 2012-05-01: try expected-backups=1, requires catchup-ready fixed. - brokers = HaCluster(self, 3, args=["--ha-expected-backups=2"]) + brokers = HaCluster(self, 3) # Start sender and receiver threads - sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False) + sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False) receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) receiver.start() sender.start() @@ -744,14 +735,14 @@ class LongTests(BrokerTest): # otherwise we can lose messages. When we implement non-promotion # of catchup brokers we can make this stronger: wait only for # there to be at least one ready backup. - assert retry(brokers[i%3].try_connect, 1) + brokers[i%3].wait_status("active") brokers.bounce(i%3) i += 1 def enough(): # Verify we're still running receiver.check() # Verify no exceptions return receiver.received > n + 100 # FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec. - assert retry(enough, 10) + assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n) except: traceback.print_exc() raise @@ -764,6 +755,53 @@ class LongTests(BrokerTest): brokers.kill(i, False) if dead: raise Exception("Brokers not running: %s"%dead) + +class RecoveryTests(BrokerTest): + """Tests for recovery after a failure.""" + + def test_queue_hold(self): + """Verify that the broker holds queues without sufficient backup, + i.e. does not complete messages sent to those queues.""" + + cluster = HaCluster(self, 4); + # Wait for the primary to be ready + cluster[0].wait_status("active") + + # Create a queue before the failure. + s1 = cluster.connect(0).session().sender("q1;{create:always}") + for b in cluster: b.wait_backup("q1") + for i in xrange(100): s1.send(str(i)) + + # Kill primary and 2 backups + for i in [0,1,2]: cluster.kill(i, False) + cluster[3].promote() # New primary, backups will be 1 and 2 + cluster[3].wait_status("recovering") + + # Create a queue after the failure + s2 = cluster.connect(3).session().sender("q2;{create:always}") + + # Verify that messages sent are not completed + for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) + self.assertEqual(s1.unsettled(), 100) + self.assertEqual(s2.unsettled(), 100) + + # Verify we can receive even if sending is on hold: + cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) + + # Restart backups, verify queues are released only when both backups are up + cluster.restart(1) + self.assertEqual(s1.unsettled(), 100) + self.assertEqual(s2.unsettled(), 100) + self.assertEqual(cluster[3].ha_status(), "recovering") + cluster.restart(2) + + def settled(sender): sender.sync(); return sender.unsettled() == 0; + assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) + assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) + cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) + cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) + cluster[3].wait_status("active"), + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) qpid_ha = os.getenv("QPID_HA_EXEC") |
