diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 18 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 3 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 5 |
5 files changed, 26 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index e01eff98be..4a0621243c 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -91,6 +91,9 @@ class Consumer : public QueueCursor { const std::string& getTag() const { return tag; } + /** Called when there are no more messages immediately available for this consumer on the queue */ + virtual void stopped() {} + protected: //framing::SequenceNumber position; const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 2f189da012..8a20bcc69b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -472,6 +472,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) } } else { QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'"); + c->stopped(); listeners.addListener(c); break; } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 635d5047bd..a0cfa393aa 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -107,7 +107,8 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue_, ack, REPLICATOR, exclusive, tag, resumeId, resumeTtl, arguments), - position(0), ready(false), cancelled(false), + + position(0), wasStopped(false), ready(false), cancelled(false), haBroker(hb), primary(boost::dynamic_pointer_cast<Primary>(haBroker.getRole())) {} @@ -186,10 +187,23 @@ void ReplicatingSubscription::initialize() { ReplicatingSubscription::~ReplicatingSubscription() {} +void ReplicatingSubscription::stopped() { + Mutex::ScopedLock l(lock); + // We have reached the last available message on the queue. + // + // Note that if messages have been removed out-of-order this may not be the + // head of the queue. We may not even have reached the guard + // position. However there are no more messages to protect and we will not + // be advanced any further, so we should consider ourselves guarded for + // purposes of readiness. + wasStopped = true; + checkReady(l); +} // True if the next position for the ReplicatingSubscription is a guarded position. bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) { - return position+1 >= guard->getFirst(); + // See comment in stopped() + return wasStopped || (position+1 >= guard->getFirst()); } // Message is delivered in the subscription's connection thread. diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 28ab98f73b..868442da7e 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -113,6 +113,8 @@ class ReplicatingSubscription : void cancel(); void acknowledged(const broker::DeliveryRecord&); bool browseAcquired() const { return true; } + void stopped(); + // Hide the "queue deleted" error for a ReplicatingSubscription when a // queue is deleted, this is normal and not an error. bool hideDeletedError() { return true; } @@ -147,6 +149,7 @@ class ReplicatingSubscription : ReplicationIdSet dequeues; // Dequeues to be sent in next dequeue event. ReplicationIdSet skip; // Skip enqueues: messages already on backup and tx enqueues. ReplicationIdSet unready; // Unguarded, replicated and un-acknowledged. + bool wasStopped; bool ready; bool cancelled; BrokerInfo info; diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index dfb65318a9..f71560dffb 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -20,7 +20,6 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest import traceback from qpid.datatypes import uuid4, UUID -from qpid.harness import Skipped from brokertest import * from ha_test import * from threading import Thread, Lock, Condition @@ -363,7 +362,9 @@ class ReplicationTests(HaBrokerTest): cluster[0].wait_status("ready") cluster.bounce(1) # FIXME aconway 2014-02-20: pr does not fail over with 1.0/swig - if qm == qpid_messaging: raise Skipped("FIXME SWIG client failover bug") + if qm == qpid_messaging: + print "WARNING: Skipping SWIG client failover bug" + return self.assertEqual("a", pr.fetch().content) pr.session.acknowledge() backup.assert_browse_backup("q", ["b"]) |
