diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 11 |
1 files changed, 5 insertions, 6 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 40ede938a4..0d6cbb7ddc 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -248,9 +248,9 @@ ReplicatingSubscription::ReplicatingSubscription( } QPID_LOG(debug, logPrefix << "Subscribed: " << " backup:" << backup - << " backup position:" << backupPosition << " primary:" << primary << " position:" << position + << " safe position: " << guard->getFirstSafe() ); // Are we ready yet? @@ -308,10 +308,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { backupPosition = qm.position; } // Deliver the message - bool delivered = ConsumerImpl::deliver(qm); - // If we have advanced past the initial position, the backup is ready. - if (qm.position >= guard->getFirstSafe()) setReady(); - return delivered; + return ConsumerImpl::deliver(qm); } else return ConsumerImpl::deliver(qm); // Message is for internal event queue. @@ -329,7 +326,7 @@ void ReplicatingSubscription::setReady() { ready = true; } // Notify Primary that a subscription is ready. - QPID_LOG(info, logPrefix << "Caught up at " << getPosition()); + QPID_LOG(debug, logPrefix << "Caught up"); if (Primary::get()) Primary::get()->readyReplica(*this); } @@ -346,6 +343,8 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& qm) { // Finish completion of message, it has been acknowledged by the backup. QPID_LOG(trace, logPrefix << "Acknowledged " << qm); guard->complete(qm); + // If next message is protected by the guard then we are ready + if (qm.position+1 >= guard->getFirstSafe()) setReady(); } ConsumerImpl::acknowledged(qm); } |
