summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-12 21:20:49 +0000
committerAlan Conway <aconway@apache.org>2012-06-12 21:20:49 +0000
commit8c8e141d3b471e47e67302edb2dad6e75dcd84a5 (patch)
treeeef0200247b7fb30a369ae56970ccb35969efcf2 /qpid/cpp
parentf3eea787423e30e831c4f804f6ee2f7c7c6e6584 (diff)
downloadqpid-python-8c8e141d3b471e47e67302edb2dad6e75dcd84a5.tar.gz
QPID-3603: HA bug fixes around ha::QueueGuard
- Remove nested calls between QueueGuard::dequeued and ReplicatingSubscription - ReplicatingSubscription can't start ahead of QueueGuard::getReadyPosition() - Fix QueueGuard firstSafe calcultatoin - Replace DequeueRemover with DequeueScanner in ReplicatingSubscription - Removed bad assertions in ReplicatingSubscription and QueueGuard git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1349544 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h7
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.cpp70
-rw-r--r--qpid/cpp/src/qpid/ha/QueueGuard.h27
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp13
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h1
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp177
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h19
-rw-r--r--qpid/cpp/src/tests/brokertest.py2
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py11
13 files changed, 188 insertions, 159 deletions
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index 9c35daee22..234a55ee2c 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -30,7 +30,7 @@ namespace qpid {
namespace ha {
ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
- : haBroker(hb), logPrefix("HA: "), self(uuid) {}
+ : haBroker(hb), logPrefix("HA connections: "), self(uuid) {}
// FIXME aconway 2012-06-06: move to BrokerInfo
bool ConnectionObserver::getBrokerInfo(broker::Connection& connection, BrokerInfo& info) {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index cfe202c6f7..c06e1ffad5 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -276,15 +276,13 @@ void HaBroker::membershipUpdate(const Variant::List& brokers) {
void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
framing::FieldTable linkProperties = broker.getLinkClientProperties();
if (isBackup(status)) {
- // If this is a backup then any links we make are backup links
- // and need to be tagged.
- QPID_LOG(debug, logPrefix << "Backup setting info for outgoing links: " << brokerInfo);
+ // If this is a backup then any outgoing links are backup
+ // links and need to be tagged.
linkProperties.setTable(ConnectionObserver::BACKUP_TAG, brokerInfo.asFieldTable());
}
else {
- // If this is a primary then any links are federation links
+ // If this is a primary then any outgoing links are federation links
// and should not be tagged.
- QPID_LOG(debug, logPrefix << "Primary removing backup info for outgoing links");
linkProperties.erase(ConnectionObserver::BACKUP_TAG);
}
broker.setLinkClientProperties(linkProperties);
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 9f655ff6eb..0aea112d8c 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -99,7 +99,7 @@ Primary::~Primary() {
void Primary::checkReady(Mutex::ScopedLock&) {
if (!active && initialBackups.empty()) {
active = true;
- QPID_LOG(notice, logPrefix << "Active, all initial queues are safe.");
+ QPID_LOG(notice, logPrefix << "All initial backups are ready.");
Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
haBroker.activate();
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index d9a4eb365c..12b5073747 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -46,12 +46,11 @@ class QueueGuard;
/**
* State associated with a primary broker:
- * - tracks readiness of initial backups to determine when primary is active.
- * - sets updates queue guards on new queues with for each backup.
+ * - sets queue guards and tracks readiness of initial backups till active.
+ * - sets queue guards on new queues for each backup.
*
- * THREAD SAFE: readyReplica and ConfigurationObserver functions called concurrently.
+ * THREAD SAFE: called concurrently in arbitrary connection threads.
*/
-
class Primary
{
public:
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.cpp b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
index 88244f2c87..b577b3cfdb 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.cpp
@@ -31,6 +31,8 @@ namespace ha {
using namespace broker;
using sys::Mutex;
+using framing::SequenceNumber;
+using framing::SequenceSet;
class QueueGuard::QueueObserver : public broker::QueueObserver
{
@@ -47,79 +49,91 @@ class QueueGuard::QueueObserver : public broker::QueueObserver
QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
- : queue(q), subscription(0)
+ : queue(q), subscription(0), isFirstSet(false)
{
- // Set a log prefix message that identifies the remote broker.
std::ostringstream os;
os << "HA primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
observer.reset(new QueueObserver(*this));
- queue.addObserver(observer);
- readyPosition = queue.getPosition(); // Must set after addObserver()
+ queue.addObserver(observer); // We can now receive concurrent calls to dequeued
+ sys::Mutex::ScopedLock l(lock);
+ // Race between this thread and enqueued thread to set first safe position.
+ if (!isFirstSet) {
+ // Must set after addObserver so we don't miss any dequeues.
+ firstSafe = queue.getPosition()+1; // Next message will be safe.
+ isFirstSet = true;
+ QPID_LOG(debug, logPrefix << "First position (initial): " << firstSafe);
+ }
}
QueueGuard::~QueueGuard() { cancel(); }
+// NOTE: Called with message lock held.
void QueueGuard::enqueued(const QueuedMessage& qm) {
assert(qm.queue == &queue);
// Delay completion
QPID_LOG(trace, logPrefix << "Delayed completion of " << qm);
qm.payload->getIngressCompletion().startCompleter();
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
assert(!delayed.contains(qm.position));
delayed += qm.position;
+ if (!isFirstSet) {
+ firstSafe = qm.position;
+ isFirstSet = true;
+ QPID_LOG(debug, logPrefix << "First position (enqueued): " << firstSafe);
+ }
+ assert(qm.position >= firstSafe);
}
}
-// FIXME aconway 2012-06-05: ERROR, must call on ReplicatingSubscription
-
+// NOTE: Called with message lock held.
void QueueGuard::dequeued(const QueuedMessage& qm) {
assert(qm.queue == &queue);
QPID_LOG(trace, logPrefix << "Dequeued " << qm);
- ReplicatingSubscription* rs = 0;
+ ReplicatingSubscription* rs=0;
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
rs = subscription;
}
if (rs) rs->dequeued(qm);
+ complete(qm);
}
void QueueGuard::cancel() {
queue.removeObserver(observer);
{
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
if (delayed.empty()) return; // No need if no delayed messages.
}
queue.eachMessage(boost::bind(&QueueGuard::complete, this, _1));
}
void QueueGuard::attach(ReplicatingSubscription& rs) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ assert(firstSafe >= rs.getPosition());
subscription = &rs;
}
-void QueueGuard::complete(const QueuedMessage& qm, sys::Mutex::ScopedLock&) {
- assert(qm.queue == &queue);
- // The same message can be completed twice, by acknowledged and
- // dequeued, remove it from the set so we only call
- // finishCompleter() once
- if (delayed.contains(qm.position)) {
- QPID_LOG(trace, logPrefix << "Completed " << qm);
- qm.payload->getIngressCompletion().finishCompleter();
- delayed -= qm.position;
- }
-}
-
void QueueGuard::complete(const QueuedMessage& qm) {
assert(qm.queue == &queue);
- Mutex::ScopedLock l(lock);
- complete(qm, l);
+ {
+ Mutex::ScopedLock l(lock);
+ // The same message can be completed twice, by
+ // ReplicatingSubscription::acknowledged and dequeued. Remove it
+ // from the set so we only call finishCompleter() once
+ if (delayed.contains(qm.position))
+ delayed -= qm.position;
+ else
+ return;
+ }
+ QPID_LOG(trace, logPrefix << "Completed " << qm);
+ qm.payload->getIngressCompletion().finishCompleter();
}
-framing::SequenceNumber QueueGuard::getReadyPosition() {
- // No lock, readyPosition is immutable.
- return readyPosition;
+framing::SequenceNumber QueueGuard::getFirstSafe() {
+ // No lock, first is immutable.
+ return firstSafe;
}
// FIXME aconway 2012-06-04: TODO support for timeout.
diff --git a/qpid/cpp/src/qpid/ha/QueueGuard.h b/qpid/cpp/src/qpid/ha/QueueGuard.h
index bb98d2052d..2064227f4b 100644
--- a/qpid/cpp/src/qpid/ha/QueueGuard.h
+++ b/qpid/cpp/src/qpid/ha/QueueGuard.h
@@ -49,22 +49,25 @@ class ReplicatingSubscription;
* till they have been replicated.
*
* The guard is created before the ReplicatingSubscription to protect
- * messages arriving before the creation of the subscription has not
- * yet seen.
+ * messages arriving before the creation of the subscription.
*
- * THREAD SAFE: Called concurrently via QueueObserver::enqueued in
- * arbitrary connection threads, and from ReplicatingSubscription
- * in the subscriptions thread.
+ * THREAD SAFE: Concurrent calls:
+ * - enqueued() via QueueObserver in arbitrary connection threads.
+ * - attach(), cancel(), complete() from ReplicatingSubscription in subscription thread.
*/
class QueueGuard {
public:
QueueGuard(broker::Queue& q, const BrokerInfo&);
~QueueGuard();
- /** QueueObserver override. Delay completion of the message. */
+ /** QueueObserver override. Delay completion of the message.
+ * NOTE: Called under the queues message lock.
+ */
void enqueued(const broker::QueuedMessage&);
- /** QueueObserver override: Complete a delayed message */
+ /** QueueObserver override: Complete a delayed message.
+ * NOTE: Called under the queues message lock.
+ */
void dequeued(const broker::QueuedMessage&);
/** Complete a delayed message. */
@@ -75,8 +78,10 @@ class QueueGuard {
void attach(ReplicatingSubscription&);
- /** The first sequence number that has been processed */
- framing::SequenceNumber getReadyPosition();
+ /** The first sequence number protected by this guard.
+ * All messages at or after this position are protected.
+ */
+ framing::SequenceNumber getFirstSafe();
private:
class QueueObserver;
@@ -87,9 +92,9 @@ class QueueGuard {
framing::SequenceSet delayed;
ReplicatingSubscription* subscription;
boost::shared_ptr<QueueObserver> observer;
- framing::SequenceNumber readyPosition;
- void complete(const broker::QueuedMessage&, sys::Mutex::ScopedLock&);
+ bool isFirstSet;
+ framing::SequenceNumber firstSafe; // Immutable
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index 7c65ea3522..af987a1e5e 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -144,11 +144,9 @@ template <class T> T decodeContent(Message& m) {
void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
// Thread safe: only calls thread safe Queue functions.
- if (queue->getPosition() >= n) { // Ignore messages we haven't reached yet
- QueuedMessage message;
- if (queue->acquireMessageAt(n, message))
- queue->dequeue(0, message);
- }
+ QueuedMessage message;
+ if (queue->acquireMessageAt(n, message))
+ queue->dequeue(0, message);
}
// Called in connection thread of the queues bridge to primary.
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 42861b1e78..bc51dba5b8 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -34,10 +34,15 @@ RemoteBackup::RemoteBackup(
const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt) :
logPrefix("HA primary, backup to "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt)
{
- QPID_LOG(debug, logPrefix << "Guarding queues for backup broker. ");
+ QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
}
+RemoteBackup::~RemoteBackup() {
+ for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i)
+ i->second->cancel();
+}
+
bool RemoteBackup::isReady() {
return initialQueues.empty();
}
@@ -59,14 +64,14 @@ RemoteBackup::GuardPtr RemoteBackup::guard(const QueuePtr& q) {
}
void RemoteBackup::ready(const QueuePtr& q) {
+ QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName());
initialQueues.erase(q);
+ if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready");
}
void RemoteBackup::queueCreate(const QueuePtr& q) {
- if (replicationTest.isReplicated(ALL, *q)) {
- QPID_LOG(debug, logPrefix << "Setting guard on " << q->getName());
+ if (replicationTest.isReplicated(ALL, *q))
guards[q].reset(new QueueGuard(*q, brokerInfo));
- }
}
void RemoteBackup::queueDestroy(const QueuePtr& q) {
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index 39020c9b7d..72d844094d 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -52,6 +52,7 @@ class RemoteBackup
typedef boost::shared_ptr<broker::Queue> QueuePtr;
RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt);
+ ~RemoteBackup();
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
GuardPtr guard(const QueuePtr&);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index c74abb6cdd..08f6fb7dcc 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -49,30 +49,38 @@ const string DOLLAR("$");
const string INTERNAL("-internal");
} // namespace
-class DequeueRemover
+// Scan the queue for gaps and add them to the subscriptions dequed set.
+class DequeueScanner
{
public:
- DequeueRemover(
- SequenceSet& r,
- const SequenceNumber& s,
- const SequenceNumber& e
- ) : dequeues(r), start(s), end(e)
+ DequeueScanner(
+ ReplicatingSubscription& rs,
+ const SequenceNumber& first_,
+ const SequenceNumber& last_ // Inclusive
+ ) : subscription(rs), first(first_), last(last_)
{
- dequeues.add(start, end);
+ assert(first <= last);
+ // INVARIANT no deques are needed for positions <= at.
+ at = first - 1;
}
- void operator()(const QueuedMessage& message) {
- if (message.position >= start && message.position <= end) {
- //i.e. message is within the intial range and has not been dequeued,
- //so remove it from the dequeues
- dequeues.remove(message.position);
+ void operator()(const QueuedMessage& qm) {
+ if (qm.position >= first && qm.position <= last) {
+ if (qm.position > at+1)
+ subscription.dequeued(at+1, qm.position-1);
+ at = qm.position;
}
}
+ // Must call after scanning the queue.
+ void finish() {
+ if (at < last) subscription.dequeued(at+1, last);
+ }
private:
- SequenceSet& dequeues;
- const SequenceNumber start;
- const SequenceNumber end;
+ ReplicatingSubscription& subscription;
+ SequenceNumber first;
+ SequenceNumber last;
+ SequenceNumber at;
};
string mask(const string& in)
@@ -113,11 +121,6 @@ bool ReplicatingSubscription::getFront(broker::Queue& q, SequenceNumber& front)
return getNext(q, 0, front);
}
-bool ReplicatingSubscription::isEmpty(broker::Queue& q) {
- SequenceNumber front;
- return getFront(q, front);
-}
-
/* Called by SemanticState::consume to create a consumer */
boost::shared_ptr<broker::SemanticState::ConsumerImpl>
ReplicatingSubscription::Factory::create(
@@ -149,11 +152,11 @@ struct QueueRange {
QueueRange() { }
- // FIXME aconway 2012-05-26: fix front calculation
QueueRange(broker::Queue& q) {
back = q.getPosition();
- front = back+1;
+ front = back+1; // Assume empty
empty = !ReplicatingSubscription::getFront(q, front);
+ assert(empty || front <= back);
}
QueueRange(const framing::FieldTable args) {
@@ -163,7 +166,8 @@ struct QueueRange {
if (!empty) {
front = args.getAsInt(ReplicatingSubscription::QPID_FRONT);
if (back < front)
- throw InvalidArgumentException("Invalid bounds for backup queue");
+ throw InvalidArgumentException(
+ QPID_MSG("Invalid range [" << front << "," << back <<"]"));
}
}
};
@@ -192,68 +196,72 @@ ReplicatingSubscription::ReplicatingSubscription(
try {
FieldTable ft;
if (!arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft))
- throw Exception("Replicating subscription does not have broker info");
+ throw Exception("Replicating subscription does not have broker info: " + tag);
info.assign(ft);
// Set a log prefix message that identifies the remote broker.
ostringstream os;
- os << "HA primary replicate " << queue->getName() << "@" << info.getLogId() << ": ";
+ os << "HA primary replica " << queue->getName() << "@" << info.getLogId() << ": ";
logPrefix = os.str();
- QueueRange primary(*queue);
- QueueRange backup(arguments);
+ // FIXME aconway 2012-06-10: unsafe to rely in queue front or position they are changing?
+
+ QueueRange primary(*queue); // The local primary queue.
+ QueueRange backup(arguments); // The remote backup state.
backupPosition = backup.back;
+
+ // NOTE: Once the guard is attached we can have concurrent
+ // calles to dequeued so we need to lock use of this->deques.
+ //
+
+ // However we must attach the guard _before_ we scan for
+ // initial dequeues to be sure we don't miss any dequeues
+ // between the scan and attaching the guard.
+ if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo());
+ if (!guard) guard.reset(new QueueGuard(*queue, getBrokerInfo()));
+ guard->attach(*this);
+
// We can re-use some backup messages if backup and primary queues
// overlap and the backup is not missing messages at the front of the queue.
- if (!primary.empty && // Primary not empty
+ // FIXME aconway 2012-06-10: disable re-use of backup queue till stall problem is solved.
+ /* if (!primary.empty && // Primary not empty
!backup.empty && // Backup not empty
primary.front >= backup.front && // Not missing messages at the front
primary.front <= backup.back // Overlap
)
{
- // Remove messages that are still on the primary queue from dequeues
- // FIXME aconway 2012-05-22: optimize to iterate only the relevant
- // section of the queue
- DequeueRemover remover(dequeues, backup.front, backup.back);
- queue->eachMessage(remover);
- position = std::min(primary.back, backup.back);
+ // Scan primary queue for gaps that should be dequeued on the backup.
+ // NOTE: this runs concurrently with the guard calling dequeued()
+ // FIXME aconway 2012-05-22: optimize queue iteration
+ DequeueScanner scan(*this, backup.front, backup.back);
+ queue->eachMessage(scan);
+ scan.finish();
+ // If the backup was ahead it has been pruned back to the primary.
+ position = std::min(guard->getFirstSafe(), backup.back);
}
- else {
+ else */ {
// Clear the backup queue and reset to start browsing at the
// front of the primary queue.
- if (!backup.empty) dequeues.add(backup.front, backup.back);
+ if (!backup.empty) dequeued(backup.front, backup.back);
position = primary.front - 1; // Start consuming from front.
-
}
QPID_LOG(debug, logPrefix << "Subscribed: "
- << " backup" << backup
- << " primary" << primary
- << " position=" << position
- << " dequeues=" << dequeues);
-
- // Set the guard
- if (Primary::get()) guard = Primary::get()->getGuard(queue, getBrokerInfo());
- if (!guard) {
- QPID_LOG(debug, logPrefix << "No pre-set guard found, creating one.");
- guard.reset(new QueueGuard(*queue, getBrokerInfo()));
- }
- guard->attach(*this);
-
- // Guard is active, dequeued can be called concurrently.
- Mutex::ScopedLock l(lock);
-
- // Set the ready position. All messages after this position have
- // been seen by the guard.
- readyPosition = guard->getReadyPosition();
- if (position >= readyPosition || isEmpty(*getQueue()))
- setReady(l);
+ << " backup:" << backup
+ << " backup position:" << backupPosition
+ << " primary:" << primary
+ << " position:" << position
+ );
+
+ // Are we ready yet?
+ if (position+1 >= guard->getFirstSafe()) // Next message will be safe.
+ setReady();
else
QPID_LOG(debug, logPrefix << "Catching up from "
- << position << " to " << readyPosition);
+ << position << " to " << guard->getFirstSafe());
}
catch (const std::exception& e) {
- throw Exception(QPID_MSG(logPrefix << "Error setting up replication: "
- << e.what()));
+ throw InvalidArgumentException(QPID_MSG(logPrefix << e.what()
+ << ": arguments=" << arguments));
}
}
@@ -295,16 +303,13 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
// Send the position before qm was enqueued.
sendPositionEvent(qm.position-1, l);
}
- // Backup will automaticall advance by 1 on delivery of message.
+ // Backup will automatically advance by 1 on delivery of message.
backupPosition = qm.position;
}
// Deliver the message
bool delivered = ConsumerImpl::deliver(qm);
- {
- Mutex::ScopedLock l(lock);
- // If we have advanced to the initial position, the backup is ready.
- if (qm.position >= readyPosition) setReady(l);
- }
+ // If we have advanced past the initial position, the backup is ready.
+ if (qm.position >= guard->getFirstSafe()) setReady();
return delivered;
}
else
@@ -316,15 +321,15 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
}
}
-void ReplicatingSubscription::setReady(Mutex::ScopedLock&) {
- if (ready) return;
- ready = true;
- // Notify Primary that a subscription is ready.
+void ReplicatingSubscription::setReady() {
{
- Mutex::ScopedUnlock u(lock);
- QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
- if (Primary::get()) Primary::get()->readyReplica(*this);
+ Mutex::ScopedLock l(lock);
+ if (ready) return;
+ ready = true;
}
+ // Notify Primary that a subscription is ready.
+ QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
+ if (Primary::get()) Primary::get()->readyReplica(*this);
}
// Called in the subscription's connection thread.
@@ -341,12 +346,9 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
guard->complete(qm);
}
+ ConsumerImpl::acknowledged(qm);
}
-// Hide the "queue deleted" error for a ReplicatingSubscription when a
-// queue is deleted, this is normal and not an error.
-bool ReplicatingSubscription::hideDeletedError() { return true; }
-
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
{
@@ -370,25 +372,28 @@ void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock&)
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
assert (qm.queue == getQueue().get());
- bool doComplete = false;
+ QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
Mutex::ScopedLock l(lock);
- assert(!dequeues.contains(qm.position));
dequeues.add(qm.position);
- // If we have not yet sent this message to the backup, then
- // complete it now as it will never be accepted.
- if (qm.position > position) doComplete = true;
}
- if (doComplete) guard->complete(qm);
notify(); // Ensure a call to doDispatch
}
+// Called during construction while scanning for initial dequeues.
+void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
+ QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
+ {
+ Mutex::ScopedLock l(lock);
+ dequeues.add(first,last);
+ }
+}
+
// Called with lock held. Called in subscription's connection thread.
void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock&)
{
if (pos == backupPosition) return; // No need to send.
- QPID_LOG(trace, logPrefix << "Sending position " << pos
- << ", was " << backupPosition);
+ QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
string buf(pos.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
pos.encode(buffer);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 9be8364117..c25749c6b0 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -87,7 +87,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
*/
static bool getNext(broker::Queue&, framing::SequenceNumber from,
framing::SequenceNumber& result);
- static bool isEmpty(broker::Queue&);
ReplicatingSubscription(broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
@@ -97,19 +96,24 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
~ReplicatingSubscription();
- // Called via QueueGuard::dequeued
+ // Called via QueueGuard::dequeued.
+ //@return true if the message requires completion.
void dequeued(const broker::QueuedMessage& qm);
+ // Called during initial scan for dequeues.
+ void dequeued(framing::SequenceNumber first, framing::SequenceNumber last);
+
// Consumer overrides.
bool deliver(broker::QueuedMessage& msg);
void cancel();
void acknowledged(const broker::QueuedMessage&);
bool browseAcquired() const { return true; }
+ // Hide the "queue deleted" error for a ReplicatingSubscription when a
+ // queue is deleted, this is normal and not an error.
+ bool hideDeletedError() { return true; }
- bool hideDeletedError();
-
- /** Initialization that must be done after construction because it
- * requires a shared_ptr to this to exist. Will attach to guard
+ /** Initialization that must be done separately from construction
+ * because it requires a shared_ptr to this to exist.
*/
void initialize();
@@ -122,7 +126,6 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
std::string logPrefix;
boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
framing::SequenceSet dequeues;
- framing::SequenceNumber readyPosition;
framing::SequenceNumber backupPosition;
bool ready;
BrokerInfo info;
@@ -130,7 +133,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
void sendDequeueEvent(sys::Mutex::ScopedLock&);
void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&);
- void setReady(sys::Mutex::ScopedLock&);
+ void setReady();
void sendEvent(const std::string& key, framing::Buffer&);
friend struct Factory;
};
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 6855ed03bb..fcc5671c90 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -76,7 +76,7 @@ def error_line(filename, n=1):
except: return ""
return ":\n" + "".join(result)
-def retry(function, timeout=10, delay=.01):
+def retry(function, timeout=3, delay=.01):
"""Call function until it returns a true value or timeout expires.
Double the delay for each retry. Returns what function returns if
true, None if timeout expires."""
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index e43d8bcb91..2576a2fa6e 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -718,16 +718,17 @@ class LongTests(BrokerTest):
brokers = HaCluster(self, 3)
# Start sender and receiver threads
+ n = 1; # FIXME aconway 2012-06-10: n = 10
senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
- queue="test%s"%(i)) for i in xrange(10)]
+ queue="test%s"%(i)) for i in xrange(n)]
receivers = [NumberedReceiver(brokers[0], sender=senders[i],
failover_updates=False,
- queue="test%s"%(i)) for i in xrange(10)]
+ queue="test%s"%(i)) for i in xrange(n)]
for r in receivers: r.start()
for s in senders: s.start()
# Wait for sender & receiver to get up and running
- assert retry(lambda: receivers[0].received > 100)
+ assert retry(lambda: receivers[0].received > 100), "%s<=100"%receivers[0].received
# Kill and restart brokers in a cycle:
endtime = time.time() + self.duration()
i = 0
@@ -748,12 +749,12 @@ class LongTests(BrokerTest):
return receivers[0].received > n + 100
# FIXME aconway 2012-05-17: client reconnect sometimes takes > 1 sec.
assert retry(enough, 10), "Stalled: %s < %s+100"%(receivers[0].received, n)
+ for s in senders: s.stop()
+ for r in receivers: r.stop()
except:
traceback.print_exc()
raise
finally:
- for s in senders: s.stop()
- for r in receivers: r.stop()
dead = []
for i in xrange(3):
if not brokers[i].is_running(): dead.append(i)