summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-12 21:21:09 +0000
committerAlan Conway <aconway@apache.org>2012-06-12 21:21:09 +0000
commit2bb43aedeb59eaa9eaf79407ba8fb80f934d632e (patch)
tree731597e12dbe4ce54ba721781e70ae9a3ed42f75 /qpid/cpp
parent314095b5a1f0eeaaafad301954e2137ff543e9a7 (diff)
downloadqpid-python-2bb43aedeb59eaa9eaf79407ba8fb80f934d632e.tar.gz
QPID-3603: HA bug fixes around transition to ready status
- Simplify QueueGuard::firstSafe calculation. - Fix error in setting initial queues - was not checking if replicated. - Send ready status to backups. Tests hang, deadlock in opened()->RemoteBackup on primary? - Fix deadlock in QueueGuard. - Don't start guards inside ConnectionObserver::opened. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349547 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerInfo.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp5
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp15
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h6
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp11
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h2
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp28
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h4
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py12
12 files changed, 68 insertions, 43 deletions
diff --git a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
index 91d497ab43..c8bd1a14be 100644
--- a/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
@@ -97,7 +97,6 @@ void BrokerInfo::assign(const Variant::Map& m) {
std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
return o << b.getHostName() << ":" << b.getPort() << "("
<< printable(b.getStatus()) << ")";
- // FIXME aconway 2012-06-06: include << b.getSystemId()?
}
std::ostream& operator<<(std::ostream& o, const BrokerInfo::Set& infos) {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index cbfd5b1f32..a99a8a3b91 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -258,6 +258,7 @@ void BrokerReplicator::route(Deliverable& msg) {
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
+ QPID_LOG(trace, "Broker replicator event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
@@ -271,6 +272,7 @@ void BrokerReplicator::route(Deliverable& msg) {
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
+ QPID_LOG(trace, "Broker replicator response: " << map);
string type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index c06e1ffad5..9a359b75e7 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -268,6 +268,11 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) {
}
void HaBroker::membershipUpdate(const Variant::List& brokers) {
+ // FIXME aconway 2012-06-12: nasty callback in callback, clean up.
+ BrokerInfo info;
+ if (getStatus() == CATCHUP && getMembership().get(systemId, info) && info.getStatus() == READY)
+ setStatus(READY);
+
// No lock, only calls thread-safe objects.
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index 92436c9e56..34c1ccb657 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -43,7 +43,7 @@ void Membership::add(const BrokerInfo& b) {
void Membership::remove(const types::Uuid& id) {
sys::Mutex::ScopedLock l(lock);
- BrokerMap::iterator i = brokers.find(id);
+ BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
update(l);
@@ -72,7 +72,7 @@ types::Variant::List Membership::asList() const {
types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
types::Variant::List list;
- for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+ for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
return list;
}
@@ -81,8 +81,6 @@ void Membership::update(sys::Mutex::ScopedLock& l) {
if (updateCallback) {
types::Variant::List list = asList(l);
sys::Mutex::ScopedUnlock u(lock);
- // FIXME aconway 2012-06-06: messy: Make this a data object,
- // move locking into HaBroker?
updateCallback(list);
}
QPID_LOG(debug, " HA: Membership update: " << brokers);
@@ -95,11 +93,18 @@ BrokerInfo::Set Membership::otherBackups() const {
BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const {
BrokerInfo::Set result;
- for (BrokerMap::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
+ for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self)
result.insert(i->second);
return result;
}
+bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
+ sys::Mutex::ScopedLock l(lock);
+ BrokerInfo::Map::iterator i = brokers.find(id);
+ if (i == brokers.end()) return false;
+ result = i->second;
+ return true;
+}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
index 6b88b6e5d7..623d77970c 100644
--- a/qpid/cpp/src/qpid/ha/Membership.h
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -55,16 +55,16 @@ class Membership
void assign(const types::Variant::List&);
types::Variant::List asList() const;
+ bool get(const types::Uuid& id, BrokerInfo& result);
+
private:
- typedef std::map<types::Uuid, BrokerInfo> BrokerMap;
BrokerInfo::Set otherBackups(sys::Mutex::ScopedLock&) const;
types::Variant::List asList(sys::Mutex::ScopedLock&) const;
void update(sys::Mutex::ScopedLock&);
- std::ostream& print(std::ostream& o, sys::Mutex::ScopedLock&) const;
mutable sys::Mutex lock;
types::Uuid self;
- BrokerMap brokers;
+ BrokerInfo::Map brokers;
UpdateCallback updateCallback;
};
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 0aea112d8c..f4709511c8 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -74,8 +74,9 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
else {
QPID_LOG(debug, logPrefix << "Expected backups: " << expect);
for (BrokerInfo::Set::iterator i = expect.begin(); i != expect.end(); ++i) {
+ bool guard = true; // Create queue guards immediately for expected backups.
boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest()));
+ new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest(), guard));
backups[i->getSystemId()] = backup;
if (!backup->isReady()) initialBackups.insert(backup);
}
@@ -107,6 +108,9 @@ void Primary::checkReady(Mutex::ScopedLock&) {
void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
if (i != backups.end() && i->second->isReady()) {
+ BrokerInfo info = i->second->getBrokerInfo();
+ info.setStatus(READY);
+ haBroker.getMembership().add(info);
initialBackups.erase(i->second);
checkReady(l);
}
@@ -140,16 +144,17 @@ void Primary::opened(broker::Connection& connection) {
Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- haBroker.getMembership().add(info);
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
QPID_LOG(debug, logPrefix << "New backup connected: " << info);
+ bool guard = false; // Lazy-create queue guards, pre-creating them here could cause deadlock.
backups[info.getSystemId()].reset(
- new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest()));
+ new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest(), guard));
}
else {
QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
}
+ haBroker.getMembership().add(info);
}
}
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index b577b3cfdb..40262a180c 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -49,21 +49,16 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0), isFirstSet(false)
+ : queue(q), subscription(0)
{
std::ostringstream os;
os << "HA primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
- queue.addObserver(observer); // We can now receive concurrent calls to dequeued
- sys::Mutex::ScopedLock l(lock);
- // Race between this thread and enqueued thread to set first safe position.
- if (!isFirstSet) {
- // Must set after addObserver so we don't miss any dequeues.
- firstSafe = queue.getPosition()+1; // Next message will be safe.
- isFirstSet = true;
- QPID_LOG(debug, logPrefix << "First position (initial): " << firstSafe);
- }
+ // Once we call addObserver we can get calls to enqueued and dequeued
+ queue.addObserver(observer);
+ // Must set after addObserver so we don't miss any enqueues.
+ firstSafe = queue.getPosition()+1; // Next message will be safe.
}
QueueGuard::~QueueGuard() { cancel(); }
@@ -78,12 +73,6 @@ void QueueGuard::enqueued(const QueuedMessage& qm) {
Mutex::ScopedLock l(lock);
assert(!delayed.contains(qm.position));
delayed += qm.position;
- if (!isFirstSet) {
- firstSafe = qm.position;
- isFirstSet = true;
- QPID_LOG(debug, logPrefix << "First position (enqueued): " << firstSafe);
- }
- assert(qm.position >= firstSafe);
}
}
@@ -132,7 +121,7 @@ void QueueGuard::complete(const QueuedMessage& qm) {
}
framing::SequenceNumber QueueGuard::getFirstSafe() {
- // No lock, first is immutable.
+ // No lock, firstSafe is immutable.
return firstSafe;
}
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index 2064227f4b..cf6e39e96c 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -92,8 +92,6 @@ class QueueGuard {
framing::SequenceSet delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
-
- bool isFirstSet;
framing::SequenceNumber firstSafe; // Immutable
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index bc51dba5b8..4f79537113 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -31,10 +31,12 @@ namespace ha {
using sys::Mutex;
RemoteBackup::RemoteBackup(
- const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt) :
- logPrefix("HA primary, backup to "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt)
+ const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg) :
+ logPrefix("HA primary, backup to "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt),
+ createGuards(cg)
{
QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
+ // FIXME aconway 2012-06-12: potential deadlocks, this is called inside ConnectionObserver::opened.
broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
}
@@ -48,29 +50,43 @@ bool RemoteBackup::isReady() {
}
void RemoteBackup::initialQueue(const QueuePtr& q) {
- initialQueues.insert(q);
+ if (replicationTest.isReplicated(ALL, *q)) initialQueues.insert(q);
queueCreate(q);
}
RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) {
+ if (!createGuards) return RemoteBackup::GuardPtr();
GuardMap::iterator i = guards.find(q);
if (i == guards.end()) {
assert(0);
- throw Exception(logPrefix+": Guard cannot find queue guard: "+q->getName());
+ throw Exception(logPrefix+": Cannot find queue guard: "+q->getName());
}
GuardPtr guard = i->second;
guards.erase(i);
return guard;
}
+namespace {
+typedef std::set<boost::shared_ptr<broker::Queue> > QS;
+struct QueueSetPrinter {
+ const QS& qs;
+ QueueSetPrinter(const QS& q) : qs(q) {}
+};
+std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) {
+ for (QS::const_iterator i = qp.qs.begin(); i != qp.qs.end(); ++i)
+ o << (*i)->getName() << " ";
+ return o;
+}
+}
+
void RemoteBackup::ready(const QueuePtr& q) {
- QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName());
initialQueues.erase(q);
+ QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() << " remaining unready: " << QueueSetPrinter(initialQueues));
if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
}
void RemoteBackup::queueCreate(const QueuePtr& q) {
- if (replicationTest.isReplicated(ALL, *q))
+ if (createGuards && replicationTest.isReplicated(ALL, *q))
guards[q].reset(new QueueGuard(*q, brokerInfo));
}
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index 72d844094d..37177ec562 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -51,7 +51,7 @@ class RemoteBackup
typedef boost::shared_ptr<QueueGuard> GuardPtr;
typedef boost::shared_ptr<broker::Queue> QueuePtr;
- RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt);
+ RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, bool createGuards);
~RemoteBackup();
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
@@ -67,6 +67,7 @@ class RemoteBackup
/**@return true when all initial queues for this backup are ready */
bool isReady();
+ BrokerInfo getBrokerInfo() const { return brokerInfo; }
private:
typedef std::map<QueuePtr, GuardPtr> GuardMap;
typedef std::set<QueuePtr> QueueSet;
@@ -76,6 +77,7 @@ class RemoteBackup
ReplicationTest replicationTest;
GuardMap guards;
QueueSet initialQueues;
+ bool createGuards;
void initialQueue(const QueuePtr&);
};
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index fcc5671c90..1dac3d579c 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -243,7 +243,7 @@ class Broker(Popen):
_broker_count = 0
_log_count = 0
- def __str__(self): return "Broker<%s %s>"%(self.name, self.pname)
+ def __str__(self): return "Broker<%s %s :%d>"%(self.name, self.pname, self.port())
def find_log(self):
self.log = "%03d:%s.log" % (Broker._log_count, self.name)
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 2576a2fa6e..06b2aec3cd 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -65,6 +65,8 @@ class HaBroker(Broker):
self.qpid_ha_script=import_script(self.qpid_ha_path)
self._agent = None
+ def __str__(self): return Broker.__str__(self)
+
def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args)
def promote(self): self.qpid_ha(["promote"])
@@ -78,7 +80,7 @@ class HaBroker(Broker):
def ha_status(self): return self.agent().getHaBroker().status
def wait_status(self, status):
- assert retry(lambda: self.ha_status() == status), "%r != %r"%(self.ha_status(), status)
+ assert retry(lambda: self.ha_status() == status), "%s, %r != %r"%(self, self.ha_status(), status)
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
@@ -738,10 +740,12 @@ class LongTests(BrokerTest):
for r in receivers: r.receiver.assert_running()
n = receivers[0].received
# FIXME aconway 2012-05-01: don't kill primary till it's active
- # otherwise we can lose messages. When we implement non-promotion
- # of catchup brokers we can make this stronger: wait only for
- # there to be at least one ready backup.
+ # and backups are ready, otherwise we can lose messages. When we
+ # implement non-promotion of catchup brokers we can make this
+ # stronger: wait only for there to be at least one ready backup.
brokers[i%3].wait_status("active")
+ brokers[(i+1)%3].wait_status("ready")
+ brokers[(i+2)%3].wait_status("ready")
brokers.bounce(i%3)
i += 1
def enough(): # Verify we're still running