diff options
| author | Alan Conway <aconway@apache.org> | 2014-08-08 09:23:54 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-08-08 09:23:54 +0000 |
| commit | d06ee666b50104bcd7cc42f656a68cce8636f79c (patch) | |
| tree | dc4050b0b94a49bb10d01db78dfb3f56499736c1 | |
| parent | 9f797837921732d538d2331e8018125d3a6eaf2a (diff) | |
| download | qpid-python-d06ee666b50104bcd7cc42f656a68cce8636f79c.tar.gz | |
QPID-5973: HA cluster state may get stuck in recovering
A backup queue is considered "ready" when all messages up to the first guarded
position have either been replicated and acknowledged or dequeued.
Previously this was implemented by waiting for the replicationg subscription to
advance to the first guarded position and wating for all expected acks. However
if messages are dequeued out-of-order (which happens with transactions) there
can be a gap at the tail of the queue. The replicating subscription will not
advance past this gap because it only advances when there are messages to
consume. This resulted in backups stuck in catch-up. The recovering primary has
a time-out for backups that never re-connect, but if they connect sucessfully
and don't disconnect, the primary assumes they will become ready and waits -
causing the primary to be stuck in "recovering".
The fixes is to notify a replicating subscription if it becomes "stopped"
because there are no more messages available on the queue. This implies that
either it is at the tail OR there are no more messags until the tail. Either way
we should consider this "ready" from the point of view of HA catch-up.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616702 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 18 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 3 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 5 |
5 files changed, 26 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index e01eff98be..4a0621243c 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -91,6 +91,9 @@ class Consumer : public QueueCursor { const std::string& getTag() const { return tag; } + /** Called when there are no more messages immediately available for this consumer on the queue */ + virtual void stopped() {} + protected: //framing::SequenceNumber position; const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 2f189da012..8a20bcc69b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -472,6 +472,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) } } else { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + c->stopped(); listeners.addListener(c); break; } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 635d5047bd..a0cfa393aa 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -107,7 +107,8 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), - position(0), ready(false), cancelled(false), + + position(0), wasStopped(false), ready(false), cancelled(false), haBroker(hb), primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())) {} @@ -186,10 +187,23 @@ void ReplicatingSubscription::initialize() { ReplicatingSubscription::~ReplicatingSubscription() {} +void ReplicatingSubscription::stopped() { + Mutex::ScopedLock l(lock); + // We have reached the last available message on the queue. + // + // Note that if messages have been removed out-of-order this may not be the + // head of the queue. We may not even have reached the guard + // position. However there are no more messages to protect and we will not + // be advanced any further, so we should consider ourselves guarded for + // purposes of readiness. + wasStopped = true; + checkReady(l); +} // True if the next position for the ReplicatingSubscription is a guarded position. bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) { - return position+1 >= guard->getFirst(); + // See comment in stopped() + return wasStopped || (position+1 >= guard->getFirst()); } // Message is delivered in the subscription's connection thread. diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 28ab98f73b..868442da7e 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -113,6 +113,8 @@ class ReplicatingSubscription : void cancel(); void acknowledged(const broker::DeliveryRecord&); bool browseAcquired() const { return true; } + void stopped(); + // Hide the "queue deleted" error for a ReplicatingSubscription when a // queue is deleted, this is normal and not an error. bool hideDeletedError() { return true; } @@ -147,6 +149,7 @@ class ReplicatingSubscription : ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. + bool wasStopped; bool ready; bool cancelled; BrokerInfo info; diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index dfb65318a9..f71560dffb 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,6 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback from qpid.datatypes import uuid4, UUID -from qpid.harness import Skipped from brokertest import * from ha_test import * from threading import Thread, Lock, Condition @@ -363,7 +362,9 @@ class ReplicationTests(HaBrokerTest): cluster[0].wait_status("ready") cluster.bounce(1) # FIXME aconway 2014-02-20: pr does not fail over with 1.0/swig - if qm == qpid_messaging: raise Skipped("FIXME SWIG client failover bug") + if qm == qpid_messaging: + print "WARNING: Skipping SWIG client failover bug" + return self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() backup.assert_browse_backup("q", ["b"]) |
