diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:07 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-12 21:20:07 +0000 |
| commit | 2c26294c60daa02e19189cbbd935e2441f2c541c (patch) | |
| tree | b62b1f4dc8ce0627d9fc28782772ee9f2e503147 /qpid/cpp | |
| parent | bf69fd2f69325dd660454e6b6c8399c51cacea2c (diff) | |
| download | qpid-python-2c26294c60daa02e19189cbbd935e2441f2c541c.tar.gz | |
QPID-3603: Introduced RemoteBackup to track backup status.
The primary creates RemoteBackup object for each connected or expected
backup. On first being promoted, the new primary has a RemoteBackup
for each of the known backups at the time of the failure.
The RemoteBackup manages queue guards for its backup and
tracks it's readiness.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349540 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
32 files changed, 597 insertions, 551 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index b83e8d9c6d..8933d60104 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -656,8 +656,8 @@ if (BUILD_HA) qpid/ha/Settings.h qpid/ha/Types.cpp qpid/ha/Types.h - qpid/ha/UnreadyQueueSet.cpp - qpid/ha/UnreadyQueueSet.h + qpid/ha/RemoteBackup.cpp + qpid/ha/RemoteBackup.h ) add_library (ha MODULE ${ha_SOURCES}) diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index dae924b6ea..2027a75706 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -51,8 +51,8 @@ ha_la_SOURCES = \ qpid/ha/ReplicationTest.cpp \ qpid/ha/ReplicationTest.h \ qpid/ha/Settings.h \ - qpid/ha/UnreadyQueueSet.cpp \ - qpid/ha/UnreadyQueueSet.h \ + qpid/ha/RemoteBackup.cpp \ + qpid/ha/RemoteBackup.h \ qpid/ha/types.cpp \ qpid/ha/types.h diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index f2cc2a3454..1c48d1a4f1 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -59,7 +59,8 @@ bool Backup::isSelf(const Address& a) const { Url Backup::linkUrl(const Url& brokers) const { return brokers; /** FIXME aconway 2012-05-29: Problems with self-test, false positives. - // linkUrl contains only the addresses of *other* brokers, not this one. + // linkUrl contains only the addresses of * + other* brokers, not this one. Url url; for (Url::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (!isSelf(*i)) url.push_back(*i); diff --git a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h index 9776fac016..6fd3a3ae09 100644 --- a/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/BackupConnectionExcluder.h @@ -27,6 +27,7 @@ namespace qpid { namespace ha { +// FIXME aconway 2012-06-06: move to Backup.cpp /** * Exclude connections to a backup broker. diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp index 0c5de9542f..91d497ab43 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp @@ -26,7 +26,8 @@ #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" #include <iostream> - +#include <iterator> +#include <sstream> namespace qpid { namespace ha { @@ -43,7 +44,16 @@ using types::Variant; using framing::FieldTable; BrokerInfo::BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id) : - logId(id.str().substr(0,9)+"..."), hostName(host), port(port_), systemId(id) {} + hostName(host), port(port_), systemId(id) +{ + updateLogId(); +} + +void BrokerInfo::updateLogId() { + std::ostringstream o; + o << hostName << ":" << port; + logId = o.str(); +} FieldTable BrokerInfo::asFieldTable() const { Variant::Map m = asMap(); @@ -81,11 +91,29 @@ void BrokerInfo::assign(const Variant::Map& m) { hostName = get(m, HOST_NAME).asString(); port = get(m, PORT).asUint16(); status = BrokerStatus(get(m, STATUS).asUint8()); + updateLogId(); } std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { - return o << b.getHostName() << ":" << b.getPort() << "(" << b.getSystemId() - << "," << printable(b.getStatus()) << ")"; + return o << b.getHostName() << ":" << b.getPort() << "(" + << printable(b.getStatus()) << ")"; + // FIXME aconway 2012-06-06: include << b.getSystemId()? +} + +std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) { + std::ostream_iterator<BrokerInfo> out(o, " "); + copy(infos.begin(), infos.end(), out); + return o; +} + +std::ostream& operator<<(std::ostream& o, const BrokerInfo::Map::value_type& v) { + return o << v.second; +} + +std::ostream& operator<<(std::ostream& o, const BrokerInfo::Map& infos) { + std::ostream_iterator<BrokerInfo::Map::value_type> out(o, " "); + copy(infos.begin(), infos.end(), out); + return o; } }} diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.h b/qpid/cpp/src/qpid/ha/BrokerInfo.h index 55479df4b9..642f7c1361 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.h +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.h @@ -29,6 +29,7 @@ #include "qpid/types/Variant.h" #include <string> #include <iosfwd> +#include <vector> namespace qpid { namespace ha { @@ -39,6 +40,9 @@ namespace ha { class BrokerInfo { public: + typedef std::set<BrokerInfo> Set; + typedef std::map<types::Uuid, BrokerInfo> Map; + BrokerInfo() {} BrokerInfo(const std::string& host, uint16_t port_, const types::Uuid& id); BrokerInfo(const framing::FieldTable& ft) { assign(ft); } @@ -58,7 +62,11 @@ class BrokerInfo void assign(const framing::FieldTable&); void assign(const types::Variant::Map&); + // So it can be put in a set. + bool operator<(const BrokerInfo x) const { return systemId < x.systemId; } + private: + void updateLogId(); std::string logId; std::string hostName; uint16_t port; @@ -67,6 +75,9 @@ class BrokerInfo }; std::ostream& operator<<(std::ostream&, const BrokerInfo&); +std::ostream& operator<<(std::ostream&, const BrokerInfo::Set&); +std::ostream& operator<<(std::ostream&, const BrokerInfo::Map::value_type&); +std::ostream& operator<<(std::ostream&, const BrokerInfo::Map&); }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 81667e0437..cbfd5b1f32 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -13,7 +13,7 @@ * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * @@ -292,7 +292,10 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); if (!replicationTest.isReplicated( - values[ARGS].asMap(), values[AUTODEL].asBool(), values[EXCL].asBool())) + CONFIGURATION, + values[ARGS].asMap(), + values[AUTODEL].asBool(), + values[EXCL].asBool())) return; if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) { framing::FieldTable args; @@ -441,9 +444,11 @@ void BrokerReplicator::doEventMembersUpdate(Variant::Map& values) { void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicationTest.isReplicated(values[ARGUMENTS].asMap(), - values[AUTODELETE].asBool(), - values[EXCLUSIVE].asBool())) + if (!replicationTest.isReplicated( + CONFIGURATION, + values[ARGUMENTS].asMap(), + values[AUTODELETE].asBool(), + values[EXCLUSIVE].asBool())) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 677de31370..9788e4b647 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -94,7 +94,6 @@ class BrokerReplicator : public broker::Exchange, QueueReplicatorPtr findQueueReplicator(const std::string& qname); void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); - bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive); void ready(); std::string logPrefix; diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp deleted file mode 100644 index 61835b15d1..0000000000 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ConnectionExcluder.h" -#include "BrokerInfo.h" -#include "HaBroker.h" -#include "qpid/framing/FieldTable.h" -#include "qpid/broker/Connection.h" -#include <boost/function.hpp> -#include <sstream> - -namespace qpid { -namespace ha { - -ConnectionExcluder::ConnectionExcluder(HaBroker& hb, const types::Uuid& uuid) - : haBroker(hb), logPrefix("HA: "), self(uuid) {} - -namespace { -bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info) { - framing::FieldTable ft; - if (connection.getClientProperties().getTable(ConnectionExcluder::BACKUP_TAG, ft)) { - info = BrokerInfo(ft); - return true; - } - return false; -} -} - -void ConnectionExcluder::opened(broker::Connection& connection) { - if (connection.isLink()) return; // Allow all outgoing links - if (connection.getClientProperties().isSet(ADMIN_TAG)) { - QPID_LOG(debug, logPrefix << "Allowing admin connection: " - << connection.getMgmtId()); - return; - } - BrokerStatus status = haBroker.getStatus(); - if (isBackup(status)) reject(connection); - BrokerInfo info; // Avoid self connections. - if (getBrokerInfo(connection, info)) { - if (info.getSystemId() == self) { - QPID_LOG(debug, logPrefix << "Rejected self connection"); - reject(connection); - } - else { - QPID_LOG(debug, logPrefix << "Allowed backup connection " << info); - haBroker.getMembership().add(info); - return; - } - } - // else: Primary node accepts connections. -} - -void ConnectionExcluder::reject(broker::Connection& connection) { - throw Exception( - QPID_MSG(logPrefix << "Rejected connection " << connection.getMgmtId())); -} - -void ConnectionExcluder::closed(broker::Connection& connection) { - BrokerInfo info; - BrokerStatus status = haBroker.getStatus(); - if (isBackup(status)) return; // Don't mess with the map received from primary. - if (getBrokerInfo(connection, info)) - haBroker.getMembership().remove(info.getSystemId()); -} - -const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin"; -const std::string ConnectionExcluder::BACKUP_TAG="qpid.ha-backup"; - -}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h deleted file mode 100644 index c24a138e2c..0000000000 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h +++ /dev/null @@ -1,68 +0,0 @@ -#ifndef QPID_HA_CONNECTIONEXCLUDER_H -#define QPID_HA_CONNECTIONEXCLUDER_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "types.h" -#include "qpid/broker/ConnectionObserver.h" -#include "qpid/types/Uuid.h" -#include "qpid/sys/Mutex.h" -#include <boost/function.hpp> - -namespace qpid { - -namespace broker { -class Connection; -} - -namespace ha { -class HaBroker; - -/** - * Exclude normal connections to a backup broker. - * Admin connections are identified by a special flag in client-properties - * during connection negotiation. - */ -class ConnectionExcluder : public broker::ConnectionObserver -{ - public: - static const std::string ADMIN_TAG; - static const std::string BACKUP_TAG; - - ConnectionExcluder(HaBroker&, const types::Uuid& self); - - void opened(broker::Connection& connection); - void closed(broker::Connection& connection); - - void setStatus(BrokerStatus); - - private: - void reject(broker::Connection&); - - HaBroker& haBroker; - std::string logPrefix; - types::Uuid self; -}; - -}} // namespace qpid::ha - -#endif /*!QPID_HA_CONNECTIONEXCLUDER_H*/ diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp index 694a253fc3..d121aa1191 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -21,6 +21,7 @@ #include "ConnectionObserver.h" #include "BrokerInfo.h" +#include "HaBroker.h" #include "qpid/framing/FieldTable.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" @@ -28,9 +29,10 @@ namespace qpid { namespace ha { -ConnectionObserver::ConnectionObserver(const types::Uuid& uuid) - : logPrefix("HA: "), self(uuid) {} +ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid) + : haBroker(hb), logPrefix("HA: "), self(uuid) {} +// FIXME aconway 2012-06-06: move to BrokerInfo bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) { framing::FieldTable ft; if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) { @@ -58,13 +60,16 @@ void ConnectionObserver::opened(broker::Connection& connection) { return; // No need to call observer, always allow admins. } BrokerInfo info; // Avoid self connections. - if (getBrokerInfo(connection, info) && info.getSystemId() == self) - throw Exception("HA rejected self connection"); + if (getBrokerInfo(connection, info)) { + if (info.getSystemId() == self) + throw Exception("HA rejected self connection"); + } ObserverPtr o(getObserver()); if (o) o->opened(connection); } void ConnectionObserver::closed(broker::Connection& connection) { + BrokerInfo info; ObserverPtr o(getObserver()); if (o) o->closed(connection); } diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.h b/qpid/cpp/src/qpid/ha/ConnectionObserver.h index a950f41739..5c1dabe8f8 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.h +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.h @@ -31,6 +31,7 @@ namespace qpid { namespace ha { class BrokerInfo; +class HaBroker; /** * Observes connections, delegates to another ConnectionObserver for @@ -52,7 +53,7 @@ class ConnectionObserver : public broker::ConnectionObserver static bool getBrokerInfo(broker::Connection& connection, BrokerInfo& info); - ConnectionObserver(const types::Uuid& self); + ConnectionObserver(HaBroker& haBroker, const types::Uuid& self); void setObserver(const ObserverPtr&); ObserverPtr getObserver(); @@ -62,6 +63,7 @@ class ConnectionObserver : public broker::ConnectionObserver private: sys::Mutex lock; + HaBroker& haBroker; std::string logPrefix; ObserverPtr observer; types::Uuid self; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 046800791d..cfe202c6f7 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -23,7 +23,6 @@ #include "ConnectionObserver.h" #include "HaBroker.h" #include "Primary.h" -#include "PrimaryConnectionMonitor.h" #include "ReplicatingSubscription.h" #include "Settings.h" #include "qpid/amqp_0_10/Codecs.h" @@ -53,6 +52,7 @@ using namespace management; using namespace std; using types::Variant; using types::Uuid; +using sys::Mutex; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : logPrefix("HA: "), @@ -61,11 +61,11 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) settings(s), mgmtObject(0), status(STANDALONE), - observer(new ConnectionObserver(systemId)), + observer(new ConnectionObserver(*this, systemId)), brokerInfo(broker.getSystem()->getNodeName(), // TODO aconway 2012-05-24: other transports? broker.getPort(broker::Broker::TCP_TRANSPORT), systemId), - membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1, _2)), + membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1)), replicationTest(s.replicateDefault.get()) { // Set up the management object. @@ -95,7 +95,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) // NOTE: lock is not needed in a constructor, but create one // to pass to functions that have a ScopedLock parameter. - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); statusChanged(l); @@ -108,28 +108,26 @@ HaBroker::~HaBroker() { broker.getConnectionObservers().remove(observer); } -void HaBroker::recover(sys::Mutex::ScopedLock& l) { +void HaBroker::recover(Mutex::ScopedLock&) { setStatus(RECOVERING); backup.reset(); // No longer replicating, close link. - IdSet backups = membership.otherBackups(); + BrokerInfo::Set backups = membership.otherBackups(); membership.reset(brokerInfo); + // Drop the lock, new Primary may call back on activate. + Mutex::ScopedUnlock u(lock); primary.reset(new Primary(*this, backups)); // Starts primary-ready check. - observer->setObserver( // Allow connections - boost::shared_ptr<broker::ConnectionObserver>( - new PrimaryConnectionMonitor(*this))); - if (primary->isActive()) activate(l); } // Called back from Primary active check. void HaBroker::activate() { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); activate(l); } -void HaBroker::activate(sys::Mutex::ScopedLock&) { setStatus(ACTIVE); } +void HaBroker::activate(Mutex::ScopedLock&) { setStatus(ACTIVE); } Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { switch (status) { @@ -186,13 +184,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, return Manageable::STATUS_OK; } -void HaBroker::setClientUrl(const Url& url, sys::Mutex::ScopedLock& l) { +void HaBroker::setClientUrl(const Url& url, Mutex::ScopedLock& l) { if (url.empty()) throw Exception("Invalid empty URL for HA client failover"); clientUrl = url; updateClientUrl(l); } -void HaBroker::updateClientUrl(sys::Mutex::ScopedLock&) { +void HaBroker::updateClientUrl(Mutex::ScopedLock&) { Url url = clientUrl.empty() ? brokerUrl : clientUrl; if (url.empty()) throw Url::Invalid("HA client URL is empty"); mgmtObject->set_publicUrl(url.str()); @@ -201,7 +199,7 @@ void HaBroker::updateClientUrl(sys::Mutex::ScopedLock&) { QPID_LOG(debug, logPrefix << "Setting client URL to: " << url); } -void HaBroker::setBrokerUrl(const Url& url, sys::Mutex::ScopedLock& l) { +void HaBroker::setBrokerUrl(const Url& url, Mutex::ScopedLock& l) { if (url.empty()) throw Url::Invalid("HA broker URL is empty"); brokerUrl = url; mgmtObject->set_brokersUrl(brokerUrl.str()); @@ -220,12 +218,12 @@ void HaBroker::shutdown() { } BrokerStatus HaBroker::getStatus() const { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); return status; } void HaBroker::setStatus(BrokerStatus newStatus) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); setStatus(newStatus, l); } @@ -249,7 +247,7 @@ bool checkTransition(BrokerStatus from, BrokerStatus to) { } } // namespace -void HaBroker::setStatus(BrokerStatus newStatus, sys::Mutex::ScopedLock& l) { +void HaBroker::setStatus(BrokerStatus newStatus, Mutex::ScopedLock& l) { QPID_LOG(notice, logPrefix << "Status change: " << printable(status) << " -> " << printable(newStatus)); bool legal = checkTransition(status, newStatus); @@ -263,22 +261,19 @@ void HaBroker::setStatus(BrokerStatus newStatus, sys::Mutex::ScopedLock& l) { statusChanged(l); } -void HaBroker::statusChanged(sys::Mutex::ScopedLock& l) { +void HaBroker::statusChanged(Mutex::ScopedLock& l) { mgmtObject->set_status(printable(status).str()); brokerInfo.setStatus(status); setLinkProperties(l); } -void HaBroker::membershipUpdate(const Variant::List& brokers, const IdSet& otherBackups) -{ - QPID_LOG(debug, logPrefix << "Membership update: " << brokers); +void HaBroker::membershipUpdate(const Variant::List& brokers) { + // No lock, only calls thread-safe objects. mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); - sys::Mutex::ScopedLock l(lock); - if (primary.get()) primary->getUnreadyQueueSet().setExpectedBackups(otherBackups); } -void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) { +void HaBroker::setLinkProperties(Mutex::ScopedLock&) { framing::FieldTable linkProperties = broker.getLinkClientProperties(); if (isBackup(status)) { // If this is a backup then any links we make are backup links @@ -296,18 +291,18 @@ void HaBroker::setLinkProperties(sys::Mutex::ScopedLock&) { } void HaBroker::activatedBackup(const std::string& queue) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); activeBackups.insert(queue); } void HaBroker::deactivatedBackup(const std::string& queue) { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); activeBackups.erase(queue); } // FIXME aconway 2012-05-31: strip out. HaBroker::QueueNames HaBroker::getActiveBackups() const { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); return activeBackups; } diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 80e8a6cc3d..d36789e565 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -95,7 +95,7 @@ class HaBroker : public management::Manageable const BrokerInfo& getBrokerInfo() const { return brokerInfo; } Membership& getMembership() { return membership; } - void membershipUpdate(const types::Variant::List&, const IdSet&); + void membershipUpdate(const types::Variant::List&); private: void setClientUrl(const Url&, sys::Mutex::ScopedLock&); diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp index 6c6961f094..92436c9e56 100644 --- a/qpid/cpp/src/qpid/ha/Membership.cpp +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -19,6 +19,9 @@ * */ #include "Membership.h" +#include <boost/bind.hpp> +#include <iostream> +#include <iterator> namespace qpid { namespace ha { @@ -77,23 +80,26 @@ types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const { void Membership::update(sys::Mutex::ScopedLock& l) { if (updateCallback) { types::Variant::List list = asList(l); - IdSet ids = otherBackups(l); sys::Mutex::ScopedUnlock u(lock); - updateCallback(list, ids); + // FIXME aconway 2012-06-06: messy: Make this a data object, + // move locking into HaBroker? + updateCallback(list); } + QPID_LOG(debug, " HA: Membership update: " << brokers); } -IdSet Membership::otherBackups() const { +BrokerInfo::Set Membership::otherBackups() const { sys::Mutex::ScopedLock l(lock); return otherBackups(l); } -IdSet Membership::otherBackups(sys::Mutex::ScopedLock&) const { - IdSet result; +BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const { + BrokerInfo::Set result; for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self) - result.insert(i->second.getSystemId()); + result.insert(i->second); return result; } + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h index bdb55425dc..6b88b6e5d7 100644 --- a/qpid/cpp/src/qpid/ha/Membership.h +++ b/qpid/cpp/src/qpid/ha/Membership.h @@ -30,6 +30,7 @@ #include <boost/function.hpp> #include <set> #include <vector> +#include <iosfwd> namespace qpid { namespace ha { @@ -40,9 +41,7 @@ namespace ha { class Membership { public: - typedef boost::function<void (const types::Variant::List&, - const IdSet&) > UpdateCallback; - + typedef boost::function<void (const types::Variant::List&) > UpdateCallback; Membership(const types::Uuid& self_, UpdateCallback updateFn) : self(self_), updateCallback(updateFn) {} @@ -51,22 +50,24 @@ class Membership void remove(const types::Uuid& id); bool contains(const types::Uuid& id); /** Return IDs of all backups other than self */ - IdSet otherBackups() const; + BrokerInfo::Set otherBackups() const; void assign(const types::Variant::List&); types::Variant::List asList() const; private: typedef std::map<types::Uuid, BrokerInfo> BrokerMap; - IdSet otherBackups(sys::Mutex::ScopedLock&) const; + BrokerInfo::Set otherBackups(sys::Mutex::ScopedLock&) const; types::Variant::List asList(sys::Mutex::ScopedLock&) const; void update(sys::Mutex::ScopedLock&); + std::ostream& print(std::ostream& o, sys::Mutex::ScopedLock&) const; mutable sys::Mutex lock; types::Uuid self; BrokerMap brokers; UpdateCallback updateCallback; }; + }} // namespace qpid::ha #endif /*!QPID_HA_MEMBERSHIP_H*/ diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 63cba14484..cd731fe732 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -19,11 +19,14 @@ * */ #include "Backup.h" -#include "ConnectionExcluder.h" #include "HaBroker.h" #include "Primary.h" #include "ReplicatingSubscription.h" +#include "RemoteBackup.h" +#include "ConnectionObserver.h" +#include "qpid/assert.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/ConfigurationObserver.h" #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" @@ -32,31 +35,140 @@ namespace qpid { namespace ha { +using sys::Mutex; + +namespace { +// No-op connection observer, allows all connections. +class PrimaryConnectionObserver : public broker::ConnectionObserver +{ + public: + PrimaryConnectionObserver(Primary& p) : primary(p) {} + void opened(broker::Connection& c) { primary.opened(c); } + void closed(broker::Connection& c) { primary.closed(c); } + private: + Primary& primary; +}; + +class PrimaryConfigurationObserver : public broker::ConfigurationObserver +{ + public: + PrimaryConfigurationObserver(Primary& p) : primary(p) {} + void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); } + void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } + private: + Primary& primary; +}; + +} // namespace + Primary* Primary::instance = 0; -Primary::Primary(HaBroker& hb, const IdSet& backups) : - haBroker(hb), logPrefix("HA primary: "), - unready(0), activated(false), - queues(hb.getBroker(), hb.getReplicationTest(), backups) +Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : + haBroker(hb), logPrefix("HA primary: "), active(false) { assert(instance == 0); instance = this; // Let queue replicators find us. - if (backups.empty()) { - QPID_LOG(debug, logPrefix << "Not waiting for backups"); - activated = true; + if (expect.empty()) { + QPID_LOG(debug, logPrefix << "No initial backups"); } else { - QPID_LOG(debug, logPrefix << "Waiting for backups: " << backups); + QPID_LOG(debug, logPrefix << "Waiting for initial backups: " << expect); + for (BrokerInfo::Set::iterator i = expect.begin(); i != expect.end(); ++i) { + boost::shared_ptr<RemoteBackup> backup( + new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest())); + backups[i->getSystemId()] = backup; + if (!backup->isReady()) initialBackups.insert(backup); + } + } + + configurationObserver.reset(new PrimaryConfigurationObserver(*this)); + haBroker.getBroker().getConfigurationObservers().add(configurationObserver); + + Mutex::ScopedLock l(lock); // We are now active as a configurationObserver + checkReady(l); + // Allow client connections + connectionObserver.reset(new PrimaryConnectionObserver(*this)); + haBroker.getObserver()->setObserver(connectionObserver); +} + +Primary::~Primary() { + haBroker.getObserver()->setObserver(boost::shared_ptr<broker::ConnectionObserver>()); + haBroker.getBroker().getConfigurationObservers().remove(configurationObserver); +} + +void Primary::checkReady(Mutex::ScopedLock&) { + if (!active && initialBackups.empty()) { + active = true; + QPID_LOG(notice, logPrefix << "Active, all initial queues are safe."); + Mutex::ScopedUnlock u(lock); // Don't hold lock across callback + haBroker.activate(); + } +} + +void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { + if (i != backups.end() && i->second->isReady()) { + initialBackups.erase(i->second); + checkReady(l); } } void Primary::readyReplica(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); - if (queues.ready(rs.getQueue(), rs.getBrokerInfo().getSystemId()) && !activated) { - activated = true; - haBroker.activate(); - QPID_LOG(notice, logPrefix << "Activated, all initial queues are safe."); + BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId()); + if (i != backups.end()) { + i->second->ready(rs.getQueue()); + checkReady(i, l); + } +} + +void Primary::queueCreate(const QueuePtr& q) { + Mutex::ScopedLock l(lock); + for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { + i->second->queueCreate(q); + checkReady(i, l); + } +} + +void Primary::queueDestroy(const QueuePtr& q) { + Mutex::ScopedLock l(lock); + for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) + i->second->queueDestroy(q); + checkReady(l); +} + +void Primary::opened(broker::Connection& connection) { + Mutex::ScopedLock l(lock); + BrokerInfo info; + if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { + haBroker.getMembership().add(info); + BackupMap::iterator i = backups.find(info.getSystemId()); + if (i == backups.end()) { + QPID_LOG(debug, logPrefix << "New backup connected: " << info); + backups[info.getSystemId()].reset( + new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest())); + } + else { + QPID_LOG(debug, logPrefix << "Known backup connected: " << info); + } + } +} + +void Primary::closed(broker::Connection& connection) { + Mutex::ScopedLock l(lock); + BrokerInfo info; + if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { + haBroker.getMembership().remove(info.getSystemId()); + QPID_LOG(debug, "HA primary: Backup disconnected: " << info); + backups.erase(info.getSystemId()); + // FIXME aconway 2012-06-01: changes to expected backup set for unready queues. } } + +boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerInfo& info) +{ + BackupMap::iterator i = backups.find(info.getSystemId()); + return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q); +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 3a1a9be9e8..d9a4eb365c 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -22,8 +22,8 @@ * */ -#include "UnreadyQueueSet.h" #include "types.h" +#include "BrokerInfo.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> #include <map> @@ -33,38 +33,63 @@ namespace qpid { namespace broker { class Queue; +class Connection; +class ConnectionObserver; +class ConfigurationObserver; } namespace ha { class HaBroker; class ReplicatingSubscription; +class RemoteBackup; +class QueueGuard; /** - * State associated with a primary broker. Tracks replicating - * subscriptions to determine when primary is active. + * State associated with a primary broker: + * - tracks readiness of initial backups to determine when primary is active. + * - sets updates queue guards on new queues with for each backup. * - * THREAD SAFE: readyReplica is called in arbitray threads. + * THREAD SAFE: readyReplica and ConfigurationObserver functions called concurrently. */ + class Primary { public: + typedef boost::shared_ptr<broker::Queue> QueuePtr; + static Primary* get() { return instance; } - Primary(HaBroker& hb, const IdSet& expectedBackups); + Primary(HaBroker& hb, const BrokerInfo::Set& expectedBackups); + ~Primary(); void readyReplica(const ReplicatingSubscription&); void removeReplica(const std::string& q); - UnreadyQueueSet& getUnreadyQueueSet() { return queues; } - bool isActive() { return activated; } + // Called via ConfigurationObserver + void queueCreate(const QueuePtr&); + void queueDestroy(const QueuePtr&); + + // Called via ConnectionObserver + void opened(broker::Connection& connection); + void closed(broker::Connection& connection); + + boost::shared_ptr<QueueGuard> getGuard(const QueuePtr& q, const BrokerInfo&); private: + typedef std::map<types::Uuid, boost::shared_ptr<RemoteBackup> > BackupMap; + typedef std::set<boost::shared_ptr<RemoteBackup> > BackupSet; + + void checkReady(sys::Mutex::ScopedLock&); + void checkReady(BackupMap::iterator, sys::Mutex::ScopedLock&); + sys::Mutex lock; HaBroker& haBroker; std::string logPrefix; - size_t expected, unready; - bool activated; - UnreadyQueueSet queues; + bool active; + BackupSet initialBackups; + BackupMap backups; + boost::shared_ptr<broker::ConnectionObserver> connectionObserver; + boost::shared_ptr<broker::ConfigurationObserver> configurationObserver; static Primary* instance; }; diff --git a/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h index 1aa61b2dea..b453d64ae6 100644 --- a/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h +++ b/qpid/cpp/src/qpid/ha/PrimaryConnectionMonitor.h @@ -44,32 +44,17 @@ class HaBroker; * * THREAD SAFE: has no state, just mediates between other thread-safe objects. */ +// FIXME aconway 2012-06-06: rename observer class PrimaryConnectionMonitor : public broker::ConnectionObserver { public: - PrimaryConnectionMonitor(HaBroker& hb) : haBroker(hb) {} + PrimaryConnectionMonitor(Primary& p) : primary(p) {} + void opened(broker::Connection& connection) { primary.opened(connection); } + void closed(broker::Connection& connection) { primary.closed(connection); } - void opened(broker::Connection& connection) { - BrokerInfo info; - if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - QPID_LOG(debug, "HA primary: Backup connected: " << info); - haBroker.getMembership().add(info); - // FIXME aconway 2012-06-01: changes to expected backup set for unready queues. - } - } - - void closed(broker::Connection& connection) { - BrokerInfo info; - if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - QPID_LOG(debug, "HA primary: Backup disconnected: " << info); - haBroker.getMembership().remove(info.getSystemId()); - // FIXME aconway 2012-06-01: changes to expected backup set for unready queues. - } - } - private: - void reject(broker::Connection&); - HaBroker& haBroker; - }; + private: + Primary& primary; +}; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index 55dc6b0d50..5ea4c8e6f8 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -22,6 +22,7 @@ #include "ReplicatingSubscription.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/QueueObserver.h" #include "qpid/log/Statement.h" #include <sstream> @@ -31,19 +32,33 @@ namespace ha { using namespace broker; using sys::Mutex; +class QueueGuard::QueueObserver : public broker::QueueObserver +{ + public: + QueueObserver(QueueGuard& g) : guard(g) {} + void enqueued(const broker::QueuedMessage& qm) { guard.enqueued(qm); } + void dequeued(const broker::QueuedMessage& qm) { guard.dequeued(qm); } + void acquired(const broker::QueuedMessage&) {} + void requeued(const broker::QueuedMessage&) {} + private: + QueueGuard& guard; +}; + + + QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) : queue(q), subscription(0) { // Set a log prefix message that identifies the remote broker. std::ostringstream os; - os << "HA subscription " << queue.getName() << "@" << info.getLogId() << ": "; + os << "HA guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); + observer.reset(new QueueObserver(*this)); + queue.addObserver(observer); + readyPosition = queue.getPosition(); // Must set after addObserver() } -void QueueGuard::initialize() { - Mutex::ScopedLock l(lock); - queue.addObserver(shared_from_this()); -} +QueueGuard::~QueueGuard() { cancel(); } void QueueGuard::enqueued(const QueuedMessage& qm) { // Delay completion @@ -69,7 +84,11 @@ void QueueGuard::dequeued(const QueuedMessage& qm) { } void QueueGuard::cancel() { - queue.removeObserver(shared_from_this()); + queue.removeObserver(observer); + { + sys::Mutex::ScopedLock l(lock); + if (delayed.empty()) return; // No need if no delayed messages. + } queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1)); } @@ -94,6 +113,11 @@ void QueueGuard::complete(const QueuedMessage& qm) { complete(qm, l); } +framing::SequenceNumber QueueGuard::getReadyPosition() { + // No lock, readyPosition is immutable. + return readyPosition; +} + // FIXME aconway 2012-06-04: TODO support for timeout. }} // namespaces qpid::ha diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 739c1e0e13..bb98d2052d 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -23,13 +23,11 @@ */ #include "types.h" -#include "qpid/broker/QueueObserver.h" #include "qpid/framing/SequenceNumber.h" #include "qpid/framing/SequenceSet.h" #include "qpid/types/Uuid.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> -#include <boost/enable_shared_from_this.hpp> #include <deque> #include <set> @@ -58,16 +56,10 @@ class ReplicatingSubscription; * arbitrary connection threads, and from ReplicatingSubscription * in the subscriptions thread. */ -class QueueGuard : public broker::QueueObserver, - public boost::enable_shared_from_this<QueueGuard> -{ +class QueueGuard { public: QueueGuard(broker::Queue& q, const BrokerInfo&); - - /** Must be called after ctor, requires a shared_ptr to this to exist. - * Must be called before ReplicatingSubscription::initialize(this) - */ - void initialize(); + ~QueueGuard(); /** QueueObserver override. Delay completion of the message. */ void enqueued(const broker::QueuedMessage&); @@ -83,16 +75,19 @@ class QueueGuard : public broker::QueueObserver, void attach(ReplicatingSubscription&); - // Unused QueueObserver functions. - void acquired(const broker::QueuedMessage&) {} - void requeued(const broker::QueuedMessage&) {} + /** The first sequence number that has been processed */ + framing::SequenceNumber getReadyPosition(); private: + class QueueObserver; + sys::Mutex lock; std::string logPrefix; broker::Queue& queue; framing::SequenceSet delayed; ReplicatingSubscription* subscription; + boost::shared_ptr<QueueObserver> observer; + framing::SequenceNumber readyPosition; void complete(const broker::QueuedMessage&, sys::Mutex::ScopedLock&); }; diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index 4d12015008..8b12231453 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -115,13 +115,13 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa FieldTable settings; settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1); settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: optimize? - settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, + settings.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition()); settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable()); SequenceNumber front; if (ReplicatingSubscription::getFront(*queue, front)) - settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, front); + settings.setInt(ReplicatingSubscription::QPID_FRONT, front); peer.getMessage().subscribe( args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/, false/*exclusive*/, "", 0, settings); diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp new file mode 100644 index 0000000000..94e60d7ed8 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "RemoteBackup.h" +#include "QueueGuard.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" +#include <boost/bind.hpp> + +namespace qpid { +namespace ha { + +using sys::Mutex; + +RemoteBackup::RemoteBackup( + const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt) : + logPrefix("HA backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt) +{ + QPID_LOG(debug, logPrefix << "Guarding queues for backup broker. "); + broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1)); +} + +bool RemoteBackup::isReady() { + return initialQueues.empty(); +} + +void RemoteBackup::initialQueue(const QueuePtr& q) { + initialQueues.insert(q); + queueCreate(q); +} + +RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) { + GuardMap::iterator i = guards.find(q); + if (i == guards.end()) { + assert(0); + throw Exception(logPrefix+": Guard cannot find queue guard: "+q->getName()); + } + GuardPtr guard = i->second; + guards.erase(i); + return guard; +} + +void RemoteBackup::ready(const QueuePtr& q) { + initialQueues.erase(q); +} + +void RemoteBackup::queueCreate(const QueuePtr& q) { + if (replicationTest.isReplicated(ALL, *q)) { + QPID_LOG(debug, logPrefix << "Setting guard on " << q->getName()); + guards[q].reset(new QueueGuard(*q, brokerInfo)); + } +} + +void RemoteBackup::queueDestroy(const QueuePtr& q) { + initialQueues.erase(q); + GuardMap::iterator i = guards.find(q); + if (i != guards.end()) { + i->second->cancel(); + guards.erase(i); + } +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h new file mode 100644 index 0000000000..39020c9b7d --- /dev/null +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h @@ -0,0 +1,84 @@ +#ifndef QPID_HA_REMOTEBACKUP_H +#define QPID_HA_REMOTEBACKUP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ReplicationTest.h" +#include "BrokerInfo.h" +#include "types.h" +#include <set> +#include <map> + +namespace qpid { + +namespace broker { +class Broker; +class Queue; +} + +namespace ha { +class QueueGuard; + +/** + * Track readiness for a remote broker. + * Creates queue guards on behalf of the remote broker to keep + * queues safe till the ReplicatingSubscription is ready. + * + * THREAD UNSAFE: Caller must serialize. + */ +class RemoteBackup +{ + public: + typedef boost::shared_ptr<QueueGuard> GuardPtr; + typedef boost::shared_ptr<broker::Queue> QueuePtr; + + RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt); + + /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ + GuardPtr guard(const QueuePtr&); + + /** ReplicatingSubscription associated with queue is ready. */ + void ready(const QueuePtr& queue); + + // Called ConfigurationObserver + void queueCreate(const QueuePtr&); + void queueDestroy(const QueuePtr&); + + /**@return true when all initial queues for this backup are ready */ + bool isReady(); + + private: + typedef std::map<QueuePtr, GuardPtr> GuardMap; + typedef std::set<QueuePtr> QueueSet; + + std::string logPrefix; + BrokerInfo brokerInfo; + ReplicationTest replicationTest; + GuardMap guards; + QueueSet initialQueues; + + void initialQueue(const QueuePtr&); +}; + +}} // namespace qpid::ha + +#endif /*!QPID_HA_REMOTEBACKUP_H*/ diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index f7bfe6fda0..7438c95bc2 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -37,11 +37,12 @@ namespace ha { using namespace framing; using namespace broker; using namespace std; +using sys::Mutex; -const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription"); -const string ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number"); -const string ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number"); -const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.broker-info"); +const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription"); +const string ReplicatingSubscription::QPID_BACK("qpid.ha-back"); +const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front"); +const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info"); namespace { const string DOLLAR("$"); @@ -61,7 +62,7 @@ class DequeueRemover } void operator()(const QueuedMessage& message) { - if (message.position >= start && message.position <= end) { + if (message.position >= start && message.position <= end) { //i.e. message is within the intial range and has not been dequeued, //so remove it from the dequeues dequeues.remove(message.position); @@ -112,6 +113,11 @@ bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front) return getNext(q, 0, front); } +bool ReplicatingSubscription::isEmpty(broker::Queue& q) { + SequenceNumber front; + return getFront(q, front); +} + /* Called by SemanticState::consume to create a consumer */ boost::shared_ptr<broker::SemanticState::ConsumerImpl> ReplicatingSubscription::Factory::create( @@ -131,9 +137,7 @@ ReplicatingSubscription::Factory::create( rs.reset(new ReplicatingSubscription( parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); - boost::shared_ptr<QueueGuard> guard(new QueueGuard(*queue, rs->getBrokerInfo())); - guard->initialize(); // Must call before ReplicatingSubscription::initialize - rs->initialize(guard); + rs->initialize(); } return rs; } @@ -153,23 +157,19 @@ struct QueueRange { } QueueRange(const framing::FieldTable args) { - back = args.getAsInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER); + back = args.getAsInt(ReplicatingSubscription::QPID_BACK); front = back+1; - empty = !args.isSet(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER); + empty = !args.isSet(ReplicatingSubscription::QPID_FRONT); if (!empty) { - front = args.getAsInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER); + front = args.getAsInt(ReplicatingSubscription::QPID_FRONT); if (back < front) throw InvalidArgumentException("Invalid bounds for backup queue"); } } - - /** Consumer position to start consuming from the front */ - SequenceNumber browserStart() { return front-1; } }; ostream& operator<<(ostream& o, const QueueRange& qr) { - - if (qr.front > qr.back) return o << "empty(" << qr.back << ")"; + if (qr.front > qr.back) return o << "[-" << qr.back << "]"; else return o << "[" << qr.front << "," << qr.back << "]"; } @@ -222,14 +222,31 @@ ReplicatingSubscription::ReplicatingSubscription( // Clear the backup queue and reset to start browsing at the // front of the primary queue. if (!backup.empty) dequeues.add(backup.front, backup.back); - position = primary.browserStart(); + position = primary.front - 1; // Start consuming from front. } - QPID_LOG(debug, logPrefix << "New backup subscription " << getName() - << " backup range " << backup - << " primary range " << primary - << " position " << position - << " dequeues " << dequeues); + QPID_LOG(debug, logPrefix << "Subscribed: " + << " backup" << backup + << " primary" << primary + << " position=" << position + << " dequeues=" << dequeues); + + // Set the guard + if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo()); + if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo())); + guard->attach(*this); + + // Guard is active, dequeued can be called concurrently. + Mutex::ScopedLock l(lock); + + // Set the ready position. All messages after this position have + // been seen by the guard. + readyPosition = guard->getReadyPosition(); + if (position >= readyPosition || isEmpty(*getQueue())) + setReady(l); + else + QPID_LOG(debug, logPrefix << "Catching up from " + << position << " to " << readyPosition); } catch (const std::exception& e) { throw Exception(QPID_MSG(logPrefix << "Error setting up replication: " @@ -242,35 +259,17 @@ ReplicatingSubscription::~ReplicatingSubscription() { } // Called in subscription's connection thread when the subscription is created. -void ReplicatingSubscription::initialize(const boost::shared_ptr<QueueGuard>& g) { - sys::Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. - // Attach to the guard. - guard = g; - guard->attach(*this); +// Called separate from ctor because sending events requires +// shared_from_this +// +void ReplicatingSubscription::initialize() { + Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently. // Send initial dequeues and position to the backup. // There must be a shared_ptr(this) when sending. sendDequeueEvent(l); sendPositionEvent(position, l); backupPosition = position; - - // Set the ready position. All messages after this position have - // been seen by the guard. - QueueRange range; - { - // Drop the lock, QueueRange will lock the queues message lock - // which is also locked around calls to enqueued() and dequeued() - sys::Mutex::ScopedUnlock u(lock); - range = QueueRange(*getQueue()); - } - readyPosition = range.back; - if (range.empty || position >= readyPosition) { - setReady(l); - } - else { - QPID_LOG(debug, logPrefix << "Backup subscription catching up from " - << position << " to " << readyPosition); - } } // Message is delivered in the subscription's connection thread. @@ -280,7 +279,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { if (qm.queue == getQueue().get()) { QPID_LOG(trace, logPrefix << "Replicating " << qm); { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); assert(position == qm.position); // qm.position is the position of the newly enqueued qm on local queue. // backupPosition is latest position on backup queue before enqueueing @@ -299,7 +298,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { // Deliver the message bool delivered = ConsumerImpl::deliver(qm); { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); // If we have advanced to the initial position, the backup is ready. if (qm.position >= readyPosition) setReady(l); } @@ -314,12 +313,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { } } -void ReplicatingSubscription::setReady(sys::Mutex::ScopedLock&) { +void ReplicatingSubscription::setReady(Mutex::ScopedLock&) { if (ready) return; ready = true; // Notify Primary that a subscription is ready. { - sys::Mutex::ScopedUnlock u(lock); + Mutex::ScopedUnlock u(lock); QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); if (Primary::get()) Primary::get()->readyReplica(*this); } @@ -343,7 +342,7 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { bool ReplicatingSubscription::hideDeletedError() { return true; } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&) +void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&) { if (dequeues.empty()) return; QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues); @@ -353,7 +352,7 @@ void ReplicatingSubscription::sendDequeueEvent(sys::Mutex::ScopedLock&) dequeues.clear(); buffer.reset(); { - sys::Mutex::ScopedUnlock u(lock); + Mutex::ScopedUnlock u(lock); sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer); } } @@ -367,7 +366,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) bool doComplete = false; QPID_LOG(trace, logPrefix << "Dequeued " << qm); { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); dequeues.add(qm.position); if (qm.position > position) doComplete = true; } @@ -378,7 +377,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex::ScopedLock&) +void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&) { if (pos == backupPosition) return; // No need to send. QPID_LOG(trace, logPrefix << "Sending position " << pos @@ -388,7 +387,7 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, sys::Mutex:: pos.encode(buffer); buffer.reset(); { - sys::Mutex::ScopedUnlock u(lock); + Mutex::ScopedUnlock u(lock); sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer); } } @@ -428,7 +427,7 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& bool ReplicatingSubscription::doDispatch() { { - sys::Mutex::ScopedLock l(lock); + Mutex::ScopedLock l(lock); if (!dequeues.empty()) sendDequeueEvent(l); } return ConsumerImpl::doDispatch(); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index e69a2159e6..9be8364117 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -73,8 +73,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl // Argument names for consume command. static const std::string QPID_REPLICATING_SUBSCRIPTION; - static const std::string QPID_HIGH_SEQUENCE_NUMBER; - static const std::string QPID_LOW_SEQUENCE_NUMBER; + static const std::string QPID_BACK; + static const std::string QPID_FRONT; static const std::string QPID_BROKER_INFO; // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription @@ -111,7 +111,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl /** Initialization that must be done after construction because it * requires a shared_ptr to this to exist. Will attach to guard */ - void initialize(const boost::shared_ptr<QueueGuard>& guard); + void initialize(); BrokerInfo getBrokerInfo() const { return info; } diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp index 1db101dc94..613ac0a8c6 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.cpp @@ -52,18 +52,23 @@ namespace { const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout"); } -bool ReplicationTest::isReplicated(const Variant::Map& args, bool autodelete, bool exclusive) { +bool ReplicationTest::isReplicated( + ReplicateLevel level, const Variant::Map& args, bool autodelete, bool exclusive) +{ bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end(); - return replicateLevel(args) && !ignore; + return !ignore && replicateLevel(args) >= level; } -bool ReplicationTest::isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive) { +bool ReplicationTest::isReplicated( + ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive) +{ bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT); - return replicateLevel(args) && !ignore; + return !ignore && replicateLevel(args) >= level; } -bool ReplicationTest::isReplicated(const broker::Queue& q) { - return isReplicated(q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner()); +bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q) +{ + return isReplicated(level, q.getSettings(), q.isAutoDelete(), q.hasExclusiveOwner()); } diff --git a/qpid/cpp/src/qpid/ha/ReplicationTest.h b/qpid/cpp/src/qpid/ha/ReplicationTest.h index 4851f34e84..50a41ccbc3 100644 --- a/qpid/cpp/src/qpid/ha/ReplicationTest.h +++ b/qpid/cpp/src/qpid/ha/ReplicationTest.h @@ -47,13 +47,18 @@ class ReplicationTest ReplicationTest(ReplicateLevel replicateDefault_) : replicateDefault(replicateDefault_) {} + // Return the simple replication level, accounting for defaults. ReplicateLevel replicateLevel(const std::string& str); ReplicateLevel replicateLevel(const framing::FieldTable& f); ReplicateLevel replicateLevel(const types::Variant::Map& m); - bool isReplicated(const types::Variant::Map& args, bool autodelete, bool exclusive); - bool isReplicated(const framing::FieldTable& args, bool autodelete, bool exclusive); - bool isReplicated(const broker::Queue&); + // Return true if replication for a queue is enabled at level or + // higher, taking account of all settings. + bool isReplicated(ReplicateLevel level, + const types::Variant::Map& args, bool autodelete, bool exclusive); + bool isReplicated(ReplicateLevel level, + const framing::FieldTable& args, bool autodelete, bool exclusive); + bool isReplicated(ReplicateLevel level, const broker::Queue&); private: ReplicateLevel replicateDefault; }; diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp deleted file mode 100644 index 279eb2c0e1..0000000000 --- a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "UnreadyQueueSet.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/QueueRegistry.h" -#include <boost/bind.hpp> -#include <iostream> -#include <algorithm> - -namespace qpid { -namespace ha { - -using sys::Mutex; - -UnreadyQueueSet::UnreadyQueueSet(broker::Broker& broker, ReplicationTest rt, const IdSet& ids) : - logPrefix("HA unsafe queues: "), replicationTest(rt), expected(ids), - initializing(true), initialQueues(0) -{ - if (!expected.empty()) { - QPID_LOG(debug, logPrefix << "Recovering, waiting for backups: " << expected); - broker.getQueues().eachQueue(boost::bind(&UnreadyQueueSet::queueCreate, this, _1)); - initialQueues = queues.size(); - } - initializing = false; -} - -void UnreadyQueueSet::setExpectedBackups(const IdSet& ids) { - Mutex::ScopedLock l(lock); - expected = ids; -} - -bool UnreadyQueueSet::ready(const boost::shared_ptr<broker::Queue>& q, const types::Uuid& id) { - Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix << "Replicating subscription ready: " << id << " on " - << q->getName()); - QueueMap::iterator i = queues.find(q); - if (i != queues.end()) { - // if (i->second.guard->ready(id)) { - // QPID_LOG(debug, logPrefix << "Releasing guard on " << q->getName()); - // remove(i, l); - if (i->second.initial) --initialQueues; - // } - } - return initialQueues == 0; -} - -void UnreadyQueueSet::queueCreate(const boost::shared_ptr<broker::Queue>& q) { - Mutex::ScopedLock l(lock); - if (replicationTest.isReplicated(*q) && !expected.empty()) { - QPID_LOG(debug, logPrefix << "Guarding " << q->getName() << " for " << expected); - // GuardPtr guard(new QueueGuard(*q, expected)); - // FIXME aconway 2012-06-05: q->addObserver(guard); - queues[q] = Entry(initializing);//, guard); - } -} - -void UnreadyQueueSet::queueDestroy(const boost::shared_ptr<broker::Queue>& q) { - Mutex::ScopedLock l(lock); - remove(queues.find(q), l); -} - -void UnreadyQueueSet::remove(QueueMap::iterator i, sys::Mutex::ScopedLock&) { - if (i != queues.end()) { - QPID_LOG(debug, logPrefix << "Queue is safe: " << i->first->getName()); - // FIXME aconway 2012-06-05: i->first->removeObserver(i->second.guard); - //i->second.guard->release(); - queues.erase(i); - } -} - -}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h b/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h deleted file mode 100644 index 0731282c2b..0000000000 --- a/qpid/cpp/src/qpid/ha/UnreadyQueueSet.h +++ /dev/null @@ -1,88 +0,0 @@ -#ifndef QPID_HA_BROKERGUARD_H -#define QPID_HA_BROKERGUARD_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ReplicationTest.h" -#include "types.h" -#include "qpid/broker/ConfigurationObserver.h" -#include "qpid/sys/Mutex.h" -#include <set> -#include <map> - -namespace qpid { - -namespace broker { -class Broker; -} - -namespace ha { -class QueueGuard; - -/** - * ConfigurationObserver that sets a QueueGuard on all existing/new queues - * so they are safe until their ReplicatingSubscriptions catch up. - * - * THREAD SAFE: Called concurrently as a ConfigurationObserver and via ready() - */ -class UnreadyQueueSet : public qpid::broker::ConfigurationObserver -{ - public: - /** Caller should call broker.getConfigurationObservers().add(shared_ptr(this)) */ - UnreadyQueueSet(broker::Broker&, ReplicationTest rt, const IdSet& expected); - - void setExpectedBackups(const IdSet&); - - /** Backup id is ready on queue. - *@return true if all initial queuse are now ready. - */ - bool ready(const boost::shared_ptr<broker::Queue>&, const types::Uuid& id); - - // ConfigurationObserver overrides - void queueCreate(const boost::shared_ptr<broker::Queue>&); - void queueDestroy(const boost::shared_ptr<broker::Queue>&); - - // FIXME aconway 2012-05-31: handle IdSet changes. - private: - typedef boost::shared_ptr<QueueGuard> GuardPtr; - struct Entry { - bool initial; - // FIXME aconway 2012-06-05: GuardPtr guard; - // Entry(bool i=false, GuardPtr g=GuardPtr()) : initial(i), guard(g) {} - Entry(bool i=false) : initial(i) {} - }; - typedef std::map<boost::shared_ptr<broker::Queue>, Entry> QueueMap; - - void remove(QueueMap::iterator i, sys::Mutex::ScopedLock&); - - sys::Mutex lock; - std::string logPrefix; - ReplicationTest replicationTest; - IdSet expected; - QueueMap queues; - bool initializing; - size_t initialQueues; // Count of initial queues still unready. -}; - -}} // namespace qpid::ha - -#endif /*!QPID_HA_BROKERGUARD_H*/ diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index c32b7f2c96..6855ed03bb 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -241,15 +241,13 @@ def find_in_file(str, filename): class Broker(Popen): "A broker process. Takes care of start, stop and logging." _broker_count = 0 + _log_count = 0 def __str__(self): return "Broker<%s %s>"%(self.name, self.pname) def find_log(self): - self.log = "%s.log" % self.name - i = 1 - while (os.path.exists(self.log)): - self.log = "%s.%d.log" % (self.name, i) - i += 1 + self.log = "%03d:%s.log" % (Broker._log_count, self.name) + Broker._log_count += 1 def get_log(self): return os.path.abspath(self.log) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 86679611c4..e43d8bcb91 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math import traceback -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition @@ -493,6 +493,8 @@ class ReplicationTests(BrokerTest): for i in range(10): s.send(Message(str(i)), sync=False) except qpid.messaging.exceptions.TargetCapacityExceeded: pass backup.assert_browse_backup("q", [str(i) for i in range(0,5)]) + # Detach, don't close as there is a broken session + s.session.connection.detach() def test_priority(self): """Verify priority queues replicate correctly""" @@ -716,21 +718,24 @@ class LongTests(BrokerTest): brokers = HaCluster(self, 3) # Start sender and receiver threads - sender = NumberedSender(brokers[0], max_depth=1024, failover_updates=False) - receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) - receiver.start() - sender.start() + senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False, + queue="test%s"%(i)) for i in xrange(10)] + receivers = [NumberedReceiver(brokers[0], sender=senders[i], + failover_updates=False, + queue="test%s"%(i)) for i in xrange(10)] + for r in receivers: r.start() + for s in senders: s.start() # Wait for sender & receiver to get up and running - assert retry(lambda: receiver.received > 100) + assert retry(lambda: receivers[0].received > 100) # Kill and restart brokers in a cycle: endtime = time.time() + self.duration() i = 0 try: while time.time() < endtime or i < 3: # At least 3 iterations - sender.sender.assert_running() - receiver.receiver.assert_running() - n = receiver.received + for s in senders: s.sender.assert_running() + for r in receivers: r.receiver.assert_running() + n = receivers[0].received # FIXME aconway 2012-05-01: don't kill primary till it's active # otherwise we can lose messages. When we implement non-promotion # of catchup brokers we can make this stronger: wait only for @@ -739,23 +744,22 @@ class LongTests(BrokerTest): brokers.bounce(i%3) i += 1 def enough(): # Verify we're still running - receiver.check() # Verify no exceptions - return receiver.received > n + 100 + receivers[0].check() # Verify no exceptions + return receivers[0].received > n + 100 # FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec. - assert retry(enough, 3), "Stalled: %s < %s+100"%(receiver.received, n) + assert retry(enough, 10), "Stalled: %s < %s+100"%(receivers[0].received, n) except: traceback.print_exc() raise finally: - sender.stop() - receiver.stop() + for s in senders: s.stop() + for r in receivers: r.stop() dead = [] for i in xrange(3): if not brokers[i].is_running(): dead.append(i) brokers.kill(i, False) if dead: raise Exception("Brokers not running: %s"%dead) - class RecoveryTests(BrokerTest): """Tests for recovery after a failure.""" @@ -766,31 +770,37 @@ class RecoveryTests(BrokerTest): cluster = HaCluster(self, 4); # Wait for the primary to be ready cluster[0].wait_status("active") - # Create a queue before the failure. s1 = cluster.connect(0).session().sender("q1;{create:always}") for b in cluster: b.wait_backup("q1") for i in xrange(100): s1.send(str(i)) - # Kill primary and 2 backups for i in [0,1,2]: cluster.kill(i, False) cluster[3].promote() # New primary, backups will be 1 and 2 cluster[3].wait_status("recovering") + def trySync(s): + try: + s.sync(timeout=.1) + self.fail("Expected Timeout exception") + except Timeout: pass + # Create a queue after the failure s2 = cluster.connect(3).session().sender("q2;{create:always}") - # Verify that messages sent are not completed for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) + trySync(s1) self.assertEqual(s1.unsettled(), 100) + trySync(s2) self.assertEqual(s2.unsettled(), 100) # Verify we can receive even if sending is on hold: cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) - # Restart backups, verify queues are released only when both backups are up cluster.restart(1) + trySync(s1) self.assertEqual(s1.unsettled(), 100) + trySync(s2) self.assertEqual(s2.unsettled(), 100) self.assertEqual(cluster[3].ha_status(), "recovering") cluster.restart(2) @@ -801,6 +811,8 @@ class RecoveryTests(BrokerTest): cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) cluster[3].wait_status("active"), + s1.session.connection.close() + s2.session.connection.close() if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |
