diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/ha/Primary.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/Primary.cpp')
-rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 122 |
1 files changed, 88 insertions, 34 deletions
diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index e4bf9671b8..93dbbbea85 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -31,6 +31,8 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" #include "qpid/sys/Timer.h" #include <boost/bind.hpp> @@ -39,6 +41,8 @@ namespace qpid { namespace ha { using sys::Mutex; +using namespace std; +using namespace framing; namespace { @@ -58,6 +62,8 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver PrimaryConfigurationObserver(Primary& p) : primary(p) {} void queueCreate(const Primary::QueuePtr& q) { primary.queueCreate(q); } void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); } + void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); } + void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q); } private: Primary& primary; }; @@ -76,8 +82,11 @@ class ExpectedBackupTimerTask : public sys::TimerTask { Primary* Primary::instance = 0; Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : - haBroker(hb), logPrefix("Primary: "), active(false) + haBroker(hb), membership(hb.getMembership()), + logPrefix("Primary: "), active(false), + replicationTest(hb.getSettings().replicateDefault.get()) { + hb.getMembership().setStatus(RECOVERING); assert(instance == 0); instance = this; // Let queue replicators find us. if (expect.empty()) { @@ -89,11 +98,10 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : // the QueueGuards are created. QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { - boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(*i, haBroker.getReplicationTest(), false)); + boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0)); backups[i->getSystemId()] = backup; if (!backup->isReady()) expectedBackups.insert(backup); - backup->setInitialQueues(hb.getBroker().getQueues(), true); // Create guards + backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards } // Set timeout for expected brokers to connect and become ready. sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC)); @@ -102,14 +110,21 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : hb.getBroker().getTimer().add(timerTask); } + + // Remove backup tag property from outgoing link properties. + framing::FieldTable linkProperties = hb.getBroker().getLinkClientProperties(); + linkProperties.erase(ConnectionObserver::BACKUP_TAG); + hb.getBroker().setLinkClientProperties(linkProperties); + configurationObserver.reset(new PrimaryConfigurationObserver(*this)); haBroker.getBroker().getConfigurationObservers().add(configurationObserver); Mutex::ScopedLock l(lock); // We are now active as a configurationObserver checkReady(l); + // Allow client connections connectionObserver.reset(new PrimaryConnectionObserver(*this)); - haBroker.getObserver()->setObserver(connectionObserver); + haBroker.getObserver()->setObserver(connectionObserver, logPrefix); } Primary::~Primary() { @@ -122,7 +137,7 @@ void Primary::checkReady(Mutex::ScopedLock&) { active = true; Mutex::ScopedUnlock u(lock); // Don't hold lock across callback QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active."); - haBroker.activate(); + membership.setStatus(ACTIVE); } } @@ -130,7 +145,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { if (i != backups.end() && i->second->reportReady()) { BrokerInfo info = i->second->getBrokerInfo(); info.setStatus(READY); - haBroker.addBroker(info); + membership.add(info); if (expectedBackups.erase(i->second)) { QPID_LOG(info, logPrefix << "Expected backup is ready: " << info); checkReady(l); @@ -155,9 +170,10 @@ void Primary::timeoutExpectedBackups() { expectedBackups.erase(i++); backups.erase(info.getSystemId()); rb->cancel(); - // Downgrade the broker to CATCHUP + // Downgrade the broker's status to CATCHUP + // The broker will get this status change when it eventually connects. info.setStatus(CATCHUP); - haBroker.addBroker(info); + membership.add(info); } else ++i; } @@ -178,46 +194,78 @@ void Primary::readyReplica(const ReplicatingSubscription& rs) { } } +// NOTE: Called with queue registry lock held. void Primary::queueCreate(const QueuePtr& q) { - // Throw if there is an invalid replication level in the queue settings. - haBroker.getReplicationTest().replicateLevel(q->getSettings().storeSettings); - Mutex::ScopedLock l(lock); - for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { - i->second->queueCreate(q); - checkReady(i, l); + // Set replication argument. + ReplicateLevel level = replicationTest.useLevel(*q); + QPID_LOG(debug, logPrefix << "Created queue " << q->getName() + << " replication: " << printable(level)); + q->addArgument(QPID_REPLICATE, printable(level).str()); + if (level) { + // Give each queue a unique id to avoid confusion of same-named queues. + q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true))); + Mutex::ScopedLock l(lock); + for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) { + i->second->queueCreate(q); + checkReady(i, l); + } } } +// NOTE: Called with queue registry lock held. void Primary::queueDestroy(const QueuePtr& q) { + QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName()); Mutex::ScopedLock l(lock); for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) i->second->queueDestroy(q); checkReady(l); } +// NOTE: Called with exchange registry lock held. +void Primary::exchangeCreate(const ExchangePtr& ex) { + ReplicateLevel level = replicationTest.useLevel(*ex); + QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName() + << " replication: " << printable(level)); + FieldTable args = ex->getArgs(); + args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg. + if (level) { + // Give each exchange a unique id to avoid confusion of same-named exchanges. + args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0]))); + } + ex->setArgs(args); +} + +// NOTE: Called with exchange registry lock held. +void Primary::exchangeDestroy(const ExchangePtr& ex) { + QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName()); + // Do nothing + } + void Primary::opened(broker::Connection& connection) { BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { - QPID_LOG(debug, logPrefix << "New backup connected: " << info); - boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(info, haBroker.getReplicationTest(), true)); + QPID_LOG(info, logPrefix << "New backup connected: " << info); + boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection)); { // Avoid deadlock with queue registry lock. Mutex::ScopedUnlock u(lock); - backup->setInitialQueues(haBroker.getBroker().getQueues(), false); + backup->setCatchupQueues(haBroker.getBroker().getQueues(), false); } backups[info.getSystemId()] = backup; + i = backups.find(info.getSystemId()); } else { - QPID_LOG(debug, logPrefix << "Known backup connected: " << info); - i->second->setConnected(true); - checkReady(i, l); + QPID_LOG(info, logPrefix << "Known backup connected: " << info); + i->second->setConnection(&connection); } - if (info.getStatus() == JOINING) info.setStatus(CATCHUP); - haBroker.addBroker(info); + if (info.getStatus() == JOINING) { + info.setStatus(CATCHUP); + membership.add(info); + } + if (i != backups.end()) checkReady(i, l); } else QPID_LOG(debug, logPrefix << "Accepted client connection " @@ -225,19 +273,20 @@ void Primary::opened(broker::Connection& connection) { } void Primary::closed(broker::Connection& connection) { - // NOTE: It is possible for a backup connection to be rejected while we are - // a backup, but closed() is called after we have become primary. - // - // For this reason we do not remove from the backups map here, the backups - // map holds all the backups we know about whether connected or not. - // - Mutex::ScopedLock l(lock); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - QPID_LOG(debug, logPrefix << "Backup disconnected: " << info); - haBroker.removeBroker(info.getSystemId()); + Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); - if (i != backups.end()) i->second->setConnected(false); + // NOTE: It is possible for a backup connection to be rejected while we + // are a backup, but closed() is called after we have become primary. + // Checking isConnected() lets us ignore such spurious closes. + if (i != backups.end() && i->second->isConnected()) { + QPID_LOG(info, logPrefix << "Backup disconnected: " << info); + membership.remove(info.getSystemId()); + expectedBackups.erase(i->second); + backups.erase(i); + checkReady(l); + } } } @@ -249,4 +298,9 @@ boost::shared_ptr<QueueGuard> Primary::getGuard(const QueuePtr& q, const BrokerI return i == backups.end() ? boost::shared_ptr<QueueGuard>() : i->second->guard(q); } +Role* Primary::promote() { + QPID_LOG(info, "Ignoring promotion, already primary: " << haBroker.getBrokerInfo()); + return 0; +} + }} // namespace qpid::ha |