diff options
| author | Alan Conway <aconway@apache.org> | 2012-06-12 21:21:09 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-06-12 21:21:09 +0000 |
| commit | 2bb43aedeb59eaa9eaf79407ba8fb80f934d632e (patch) | |
| tree | 731597e12dbe4ce54ba721781e70ae9a3ed42f75 /qpid/cpp/src | |
| parent | 314095b5a1f0eeaaafad301954e2137ff543e9a7 (diff) | |
| download | qpid-python-2bb43aedeb59eaa9eaf79407ba8fb80f934d632e.tar.gz | |
QPID-3603: HA bug fixes around transition to ready status
- Simplify QueueGuard::firstSafe calculation.
- Fix error in setting initial queues - was not checking if replicated.
- Send ready status to backups. Tests hang, deadlock in opened()->RemoteBackup on primary?
- Fix deadlock in QueueGuard.
- Don't start guards inside ConnectionObserver::opened.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerInfo.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.cpp | 15 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Membership.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.cpp | 23 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/QueueGuard.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 28 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 2 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 12 |
12 files changed, 68 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp index 91d497ab43..c8bd1a14be 100644 --- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp @@ -97,7 +97,6 @@ void BrokerInfo::assign(const Variant::Map& m) { std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) { return o << b.getHostName() << ":" << b.getPort() << "(" << printable(b.getStatus()) << ")"; - // FIXME aconway 2012-06-06: include << b.getSystemId()? } std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) { diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index cbfd5b1f32..a99a8a3b91 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -258,6 +258,7 @@ void BrokerReplicator::route(Deliverable& msg) { if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); + QPID_LOG(trace, "Broker replicator event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); @@ -271,6 +272,7 @@ void BrokerReplicator::route(Deliverable& msg) { } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); + QPID_LOG(trace, "Broker replicator response: " << map); string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString(); Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index c06e1ffad5..9a359b75e7 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -268,6 +268,11 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) { } void HaBroker::membershipUpdate(const Variant::List& brokers) { + // FIXME aconway 2012-06-12: nasty callback in callback, clean up. + BrokerInfo info; + if (getStatus() == CATCHUP && getMembership().get(systemId, info) && info.getStatus() == READY) + setStatus(READY); + // No lock, only calls thread-safe objects. mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp index 92436c9e56..34c1ccb657 100644 --- a/qpid/cpp/src/qpid/ha/Membership.cpp +++ b/qpid/cpp/src/qpid/ha/Membership.cpp @@ -43,7 +43,7 @@ void Membership::add(const BrokerInfo& b) { void Membership::remove(const types::Uuid& id) { sys::Mutex::ScopedLock l(lock); - BrokerMap::iterator i = brokers.find(id); + BrokerInfo::Map::iterator i = brokers.find(id); if (i != brokers.end()) { brokers.erase(i); update(l); @@ -72,7 +72,7 @@ types::Variant::List Membership::asList() const { types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const { types::Variant::List list; - for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i) + for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) list.push_back(i->second.asMap()); return list; } @@ -81,8 +81,6 @@ void Membership::update(sys::Mutex::ScopedLock& l) { if (updateCallback) { types::Variant::List list = asList(l); sys::Mutex::ScopedUnlock u(lock); - // FIXME aconway 2012-06-06: messy: Make this a data object, - // move locking into HaBroker? updateCallback(list); } QPID_LOG(debug, " HA: Membership update: " << brokers); @@ -95,11 +93,18 @@ BrokerInfo::Set Membership::otherBackups() const { BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const { BrokerInfo::Set result; - for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i) + for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i) if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self) result.insert(i->second); return result; } +bool Membership::get(const types::Uuid& id, BrokerInfo& result) { + sys::Mutex::ScopedLock l(lock); + BrokerInfo::Map::iterator i = brokers.find(id); + if (i == brokers.end()) return false; + result = i->second; + return true; +} }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h index 6b88b6e5d7..623d77970c 100644 --- a/qpid/cpp/src/qpid/ha/Membership.h +++ b/qpid/cpp/src/qpid/ha/Membership.h @@ -55,16 +55,16 @@ class Membership void assign(const types::Variant::List&); types::Variant::List asList() const; + bool get(const types::Uuid& id, BrokerInfo& result); + private: - typedef std::map<types::Uuid, BrokerInfo> BrokerMap; BrokerInfo::Set otherBackups(sys::Mutex::ScopedLock&) const; types::Variant::List asList(sys::Mutex::ScopedLock&) const; void update(sys::Mutex::ScopedLock&); - std::ostream& print(std::ostream& o, sys::Mutex::ScopedLock&) const; mutable sys::Mutex lock; types::Uuid self; - BrokerMap brokers; + BrokerInfo::Map brokers; UpdateCallback updateCallback; }; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 0aea112d8c..f4709511c8 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -74,8 +74,9 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : else { QPID_LOG(debug, logPrefix << "Expected backups: " << expect); for (BrokerInfo::Set::iterator i = expect.begin(); i != expect.end(); ++i) { + bool guard = true; // Create queue guards immediately for expected backups. boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest())); + new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest(), guard)); backups[i->getSystemId()] = backup; if (!backup->isReady()) initialBackups.insert(backup); } @@ -107,6 +108,9 @@ void Primary::checkReady(Mutex::ScopedLock&) { void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { if (i != backups.end() && i->second->isReady()) { + BrokerInfo info = i->second->getBrokerInfo(); + info.setStatus(READY); + haBroker.getMembership().add(info); initialBackups.erase(i->second); checkReady(l); } @@ -140,16 +144,17 @@ void Primary::opened(broker::Connection& connection) { Mutex::ScopedLock l(lock); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - haBroker.getMembership().add(info); BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { QPID_LOG(debug, logPrefix << "New backup connected: " << info); + bool guard = false; // Lazy-create queue guards, pre-creating them here could cause deadlock. backups[info.getSystemId()].reset( - new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest())); + new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest(), guard)); } else { QPID_LOG(debug, logPrefix << "Known backup connected: " << info); } + haBroker.getMembership().add(info); } } diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp index b577b3cfdb..40262a180c 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp +++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp @@ -49,21 +49,16 @@ class QueueGuard::QueueObserver : public broker::QueueObserver QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info) - : queue(q), subscription(0), isFirstSet(false) + : queue(q), subscription(0) { std::ostringstream os; os << "HA primary guard " << queue.getName() << "@" << info.getLogId() << ": "; logPrefix = os.str(); observer.reset(new QueueObserver(*this)); - queue.addObserver(observer); // We can now receive concurrent calls to dequeued - sys::Mutex::ScopedLock l(lock); - // Race between this thread and enqueued thread to set first safe position. - if (!isFirstSet) { - // Must set after addObserver so we don't miss any dequeues. - firstSafe = queue.getPosition()+1; // Next message will be safe. - isFirstSet = true; - QPID_LOG(debug, logPrefix << "First position (initial): " << firstSafe); - } + // Once we call addObserver we can get calls to enqueued and dequeued + queue.addObserver(observer); + // Must set after addObserver so we don't miss any enqueues. + firstSafe = queue.getPosition()+1; // Next message will be safe. } QueueGuard::~QueueGuard() { cancel(); } @@ -78,12 +73,6 @@ void QueueGuard::enqueued(const QueuedMessage& qm) { Mutex::ScopedLock l(lock); assert(!delayed.contains(qm.position)); delayed += qm.position; - if (!isFirstSet) { - firstSafe = qm.position; - isFirstSet = true; - QPID_LOG(debug, logPrefix << "First position (enqueued): " << firstSafe); - } - assert(qm.position >= firstSafe); } } @@ -132,7 +121,7 @@ void QueueGuard::complete(const QueuedMessage& qm) { } framing::SequenceNumber QueueGuard::getFirstSafe() { - // No lock, first is immutable. + // No lock, firstSafe is immutable. return firstSafe; } diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h index 2064227f4b..cf6e39e96c 100644 --- a/qpid/cpp/src/qpid/ha/QueueGuard.h +++ b/qpid/cpp/src/qpid/ha/QueueGuard.h @@ -92,8 +92,6 @@ class QueueGuard { framing::SequenceSet delayed; ReplicatingSubscription* subscription; boost::shared_ptr<QueueObserver> observer; - - bool isFirstSet; framing::SequenceNumber firstSafe; // Immutable }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index bc51dba5b8..4f79537113 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -31,10 +31,12 @@ namespace ha { using sys::Mutex; RemoteBackup::RemoteBackup( - const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt) : - logPrefix("HA primary, backup to "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt) + const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg) : + logPrefix("HA primary, backup to "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt), + createGuards(cg) { QPID_LOG(debug, logPrefix << "Guarding queues for backup broker."); + // FIXME aconway 2012-06-12: potential deadlocks, this is called inside ConnectionObserver::opened. broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1)); } @@ -48,29 +50,43 @@ bool RemoteBackup::isReady() { } void RemoteBackup::initialQueue(const QueuePtr& q) { - initialQueues.insert(q); + if (replicationTest.isReplicated(ALL, *q)) initialQueues.insert(q); queueCreate(q); } RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) { + if (!createGuards) return RemoteBackup::GuardPtr(); GuardMap::iterator i = guards.find(q); if (i == guards.end()) { assert(0); - throw Exception(logPrefix+": Guard cannot find queue guard: "+q->getName()); + throw Exception(logPrefix+": Cannot find queue guard: "+q->getName()); } GuardPtr guard = i->second; guards.erase(i); return guard; } +namespace { +typedef std::set<boost::shared_ptr<broker::Queue> > QS; +struct QueueSetPrinter { + const QS& qs; + QueueSetPrinter(const QS& q) : qs(q) {} +}; +std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) { + for (QS::const_iterator i = qp.qs.begin(); i != qp.qs.end(); ++i) + o << (*i)->getName() << " "; + return o; +} +} + void RemoteBackup::ready(const QueuePtr& q) { - QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName()); initialQueues.erase(q); + QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() << " remaining unready: " << QueueSetPrinter(initialQueues)); if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); } void RemoteBackup::queueCreate(const QueuePtr& q) { - if (replicationTest.isReplicated(ALL, *q)) + if (createGuards && replicationTest.isReplicated(ALL, *q)) guards[q].reset(new QueueGuard(*q, brokerInfo)); } diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h index 72d844094d..37177ec562 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.h +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h @@ -51,7 +51,7 @@ class RemoteBackup typedef boost::shared_ptr<QueueGuard> GuardPtr; typedef boost::shared_ptr<broker::Queue> QueuePtr; - RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt); + RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, bool createGuards); ~RemoteBackup(); /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ @@ -67,6 +67,7 @@ class RemoteBackup /**@return true when all initial queues for this backup are ready */ bool isReady(); + BrokerInfo getBrokerInfo() const { return brokerInfo; } private: typedef std::map<QueuePtr, GuardPtr> GuardMap; typedef std::set<QueuePtr> QueueSet; @@ -76,6 +77,7 @@ class RemoteBackup ReplicationTest replicationTest; GuardMap guards; QueueSet initialQueues; + bool createGuards; void initialQueue(const QueuePtr&); }; diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index fcc5671c90..1dac3d579c 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -243,7 +243,7 @@ class Broker(Popen): _broker_count = 0 _log_count = 0 - def __str__(self): return "Broker<%s %s>"%(self.name, self.pname) + def __str__(self): return "Broker<%s %s :%d>"%(self.name, self.pname, self.port()) def find_log(self): self.log = "%03d:%s.log" % (Broker._log_count, self.name) diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 2576a2fa6e..06b2aec3cd 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -65,6 +65,8 @@ class HaBroker(Broker): self.qpid_ha_script=import_script(self.qpid_ha_path) self._agent = None + def __str__(self): return Broker.__str__(self) + def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args) def promote(self): self.qpid_ha(["promote"]) @@ -78,7 +80,7 @@ class HaBroker(Broker): def ha_status(self): return self.agent().getHaBroker().status def wait_status(self, status): - assert retry(lambda: self.ha_status() == status), "%r != %r"%(self.ha_status(), status) + assert retry(lambda: self.ha_status() == status), "%s, %r != %r"%(self, self.ha_status(), status) # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -738,10 +740,12 @@ class LongTests(BrokerTest): for r in receivers: r.receiver.assert_running() n = receivers[0].received # FIXME aconway 2012-05-01: don't kill primary till it's active - # otherwise we can lose messages. When we implement non-promotion - # of catchup brokers we can make this stronger: wait only for - # there to be at least one ready backup. + # and backups are ready, otherwise we can lose messages. When we + # implement non-promotion of catchup brokers we can make this + # stronger: wait only for there to be at least one ready backup. brokers[i%3].wait_status("active") + brokers[(i+1)%3].wait_status("ready") + brokers[(i+2)%3].wait_status("ready") brokers.bounce(i%3) i += 1 def enough(): # Verify we're still running |
