From 317188a84b04b38c4c905ccd4c8b2fa9b8ae46b8 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 21 Jun 2012 16:05:09 +0000 Subject: QPID-4082: cluster de-sync after broker restart & queue replication Having queue state replication between 2 clusters, restarting a broker in both source+destination clusters sometimes leads to cluster de-sync. No QMF communication is involved, though symptoms are similar to the bug caused by missing propagation of QMF errors within a cluster. The bug is caused by "deliveryCount" in SemanticState::ConsumerImpl (qpid/broker/SemanticState.cpp) not being replicated to a joining cluster node during catch-up. When the elder broker in src.cluster sends session.sync() after sending 5 messages (per --ack 5 in qpid-route), the recently joiner node in src.cluster does not do so, what leads to the cluster de-sync. The patch: - adds to "consumer-state" method (see xml/cluster.xml file change) to update a new joi-ner a new property deliveryCount - updates cluster::Connection::consumerState to send deliveryCount to the method - updates cluster::Connection::consumerState to set the received deliveryCount - add two methods to broker::SemanticState::ConsumerImpl for getting and setting deliveryCount git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1352588 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/SemanticState.h | 2 ++ qpid/cpp/src/qpid/cluster/Connection.cpp | 3 ++- qpid/cpp/src/qpid/cluster/Connection.h | 2 +- qpid/cpp/src/qpid/cluster/UpdateClient.cpp | 3 ++- 4 files changed, 7 insertions(+), 3 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index e5e1d2da16..a3cced9c67 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -146,6 +146,8 @@ class SemanticState : private boost::noncopyable { std::string getResumeId() const { return resumeId; }; const std::string& getTag() const { return tag; } uint64_t getResumeTtl() const { return resumeTtl; } + uint32_t getDeliveryCount() const { return deliveryCount; } + void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = _deliveryCount; } const framing::FieldTable& getArguments() const { return arguments; } SemanticState& getParent() { return *parent; } diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 081d54ab49..ee1bc1feb0 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -406,11 +406,12 @@ void Connection::shadowSetUser(const std::string& userId) { } void Connection::consumerState(const string& name, bool blocked, bool notifyEnabled, const SequenceNumber& position, - uint32_t usedMsgCredit, uint32_t usedByteCredit) + uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount) { broker::SemanticState::ConsumerImpl::shared_ptr c = semanticState().find(name); c->setPosition(position); c->setBlocked(blocked); + c->setDeliveryCount(deliveryCount); if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, usedByteCredit); if (notifyEnabled) c->enableNotify(); else c->disableNotify(); updateIn.consumerNumbering.add(c); diff --git a/qpid/cpp/src/qpid/cluster/Connection.h b/qpid/cpp/src/qpid/cluster/Connection.h index 26514c76e2..d03d6f610a 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.h +++ b/qpid/cpp/src/qpid/cluster/Connection.h @@ -110,7 +110,7 @@ class Connection : void deliveredFrame(const EventFrame&); void consumerState(const std::string& name, bool blocked, bool notifyEnabled, const qpid::framing::SequenceNumber& position, - uint32_t usedMsgCredit, uint32_t usedByteCredit); + uint32_t usedMsgCredit, uint32_t usedByteCredit, const uint32_t deliveryCount); // ==== Used in catch-up mode to build initial state. // diff --git a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp index d1b9a65e14..53818aed85 100644 --- a/qpid/cpp/src/qpid/cluster/UpdateClient.cpp +++ b/qpid/cpp/src/qpid/cluster/UpdateClient.cpp @@ -547,7 +547,8 @@ void UpdateClient::updateConsumer( ci->isNotifyEnabled(), ci->getPosition(), ci->getCredit().used().messages, - ci->getCredit().used().bytes + ci->getCredit().used().bytes, + ci->getDeliveryCount() ); consumerNumbering.add(ci.get()); -- cgit v1.2.1