diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 18 |
1 files changed, 16 insertions, 2 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 635d5047bd..a0cfa393aa 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -107,7 +107,8 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), - position(0), ready(false), cancelled(false), + + position(0), wasStopped(false), ready(false), cancelled(false), haBroker(hb), primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())) {} @@ -186,10 +187,23 @@ void ReplicatingSubscription::initialize() { ReplicatingSubscription::~ReplicatingSubscription() {} +void ReplicatingSubscription::stopped() { + Mutex::ScopedLock l(lock); + // We have reached the last available message on the queue. + // + // Note that if messages have been removed out-of-order this may not be the + // head of the queue. We may not even have reached the guard + // position. However there are no more messages to protect and we will not + // be advanced any further, so we should consider ourselves guarded for + // purposes of readiness. + wasStopped = true; + checkReady(l); +} // True if the next position for the ReplicatingSubscription is a guarded position. bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) { - return position+1 >= guard->getFirst(); + // See comment in stopped() + return wasStopped || (position+1 >= guard->getFirst()); } // Message is delivered in the subscription's connection thread. |
