summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp11
1 files changed, 5 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 40ede938a4..0d6cbb7ddc 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -248,9 +248,9 @@ ReplicatingSubscription::ReplicatingSubscription(
}
QPID_LOG(debug, logPrefix << "Subscribed: "
<< " backup:" << backup
- << " backup position:" << backupPosition
<< " primary:" << primary
<< " position:" << position
+ << " safe position: " << guard->getFirstSafe()
);
// Are we ready yet?
@@ -308,10 +308,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
backupPosition = qm.position;
}
// Deliver the message
- bool delivered = ConsumerImpl::deliver(qm);
- // If we have advanced past the initial position, the backup is ready.
- if (qm.position >= guard->getFirstSafe()) setReady();
- return delivered;
+ return ConsumerImpl::deliver(qm);
}
else
return ConsumerImpl::deliver(qm); // Message is for internal event queue.
@@ -329,7 +326,7 @@ void ReplicatingSubscription::setReady() {
ready = true;
}
// Notify Primary that a subscription is ready.
- QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
+ QPID_LOG(debug, logPrefix << "Caught up");
if (Primary::get()) Primary::get()->readyReplica(*this);
}
@@ -346,6 +343,8 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) {
// Finish completion of message, it has been acknowledged by the backup.
QPID_LOG(trace, logPrefix << "Acknowledged " << qm);
guard->complete(qm);
+ // If next message is protected by the guard then we are ready
+ if (qm.position+1 >= guard->getFirstSafe()) setReady();
}
ConsumerImpl::acknowledged(qm);
}