summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-08 09:23:54 +0000
committerAlan Conway <aconway@apache.org>2014-08-08 09:23:54 +0000
commitd06ee666b50104bcd7cc42f656a68cce8636f79c (patch)
treedc4050b0b94a49bb10d01db78dfb3f56499736c1
parent9f797837921732d538d2331e8018125d3a6eaf2a (diff)
downloadqpid-python-d06ee666b50104bcd7cc42f656a68cce8636f79c.tar.gz
QPID-5973: HA cluster state may get stuck in recovering
A backup queue is considered "ready" when all messages up to the first guarded position have either been replicated and acknowledged or dequeued. Previously this was implemented by waiting for the replicationg subscription to advance to the first guarded position and wating for all expected acks. However if messages are dequeued out-of-order (which happens with transactions) there can be a gap at the tail of the queue. The replicating subscription will not advance past this gap because it only advances when there are messages to consume. This resulted in backups stuck in catch-up. The recovering primary has a time-out for backups that never re-connect, but if they connect sucessfully and don't disconnect, the primary assumes they will become ready and waits - causing the primary to be stuck in "recovering". The fixes is to notify a replicating subscription if it becomes "stopped" because there are no more messages available on the queue. This implies that either it is at the tail OR there are no more messags until the tail. Either way we should consider this "ready" from the point of view of HA catch-up. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616702 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp18
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h3
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py5
5 files changed, 26 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index e01eff98be..4a0621243c 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -91,6 +91,9 @@ class Consumer : public QueueCursor {
const std::string& getTag() const { return tag; }
+ /** Called when there are no more messages immediately available for this consumer on the queue */
+ virtual void stopped() {}
+
protected:
//framing::SequenceNumber position;
const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 2f189da012..8a20bcc69b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -472,6 +472,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c)
}
} else {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ c->stopped();
listeners.addListener(c);
break;
}
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 635d5047bd..a0cfa393aa 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -107,7 +107,8 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag,
resumeId, resumeTtl, arguments),
- position(0), ready(false), cancelled(false),
+
+ position(0), wasStopped(false), ready(false), cancelled(false),
haBroker(hb),
primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole()))
{}
@@ -186,10 +187,23 @@ void ReplicatingSubscription::initialize() {
ReplicatingSubscription::~ReplicatingSubscription() {}
+void ReplicatingSubscription::stopped() {
+ Mutex::ScopedLock l(lock);
+ // We have reached the last available message on the queue.
+ //
+ // Note that if messages have been removed out-of-order this may not be the
+ // head of the queue. We may not even have reached the guard
+ // position. However there are no more messages to protect and we will not
+ // be advanced any further, so we should consider ourselves guarded for
+ // purposes of readiness.
+ wasStopped = true;
+ checkReady(l);
+}
// True if the next position for the ReplicatingSubscription is a guarded position.
bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
- return position+1 >= guard->getFirst();
+ // See comment in stopped()
+ return wasStopped || (position+1 >= guard->getFirst());
}
// Message is delivered in the subscription's connection thread.
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 28ab98f73b..868442da7e 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -113,6 +113,8 @@ class ReplicatingSubscription :
void cancel();
void acknowledged(const broker::DeliveryRecord&);
bool browseAcquired() const { return true; }
+ void stopped();
+
// Hide the "queue deleted" error for a ReplicatingSubscription when a
// queue is deleted, this is normal and not an error.
bool hideDeletedError() { return true; }
@@ -147,6 +149,7 @@ class ReplicatingSubscription :
ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event.
ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues.
ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged.
+ bool wasStopped;
bool ready;
bool cancelled;
BrokerInfo info;
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index dfb65318a9..f71560dffb 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -20,7 +20,6 @@
import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
import traceback
from qpid.datatypes import uuid4, UUID
-from qpid.harness import Skipped
from brokertest import *
from ha_test import *
from threading import Thread, Lock, Condition
@@ -363,7 +362,9 @@ class ReplicationTests(HaBrokerTest):
cluster[0].wait_status("ready")
cluster.bounce(1)
# FIXME aconway 2014-02-20: pr does not fail over with 1.0/swig
- if qm == qpid_messaging: raise Skipped("FIXME SWIG client failover bug")
+ if qm == qpid_messaging:
+ print "WARNING: Skipping SWIG client failover bug"
+ return
self.assertEqual("a", pr.fetch().content)
pr.session.acknowledge()
backup.assert_browse_backup("q", ["b"])