diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-15 21:05:46 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-15 21:05:46 +0000 |
| commit | 9d99c6c37cf265134109e60679d6c2194a3c8d08 (patch) | |
| tree | 55a27d0c61dd71ad09e16a910f01626c034af8d6 | |
| parent | 55305747e6e7f931756bfa21460c37e350f5ea0f (diff) | |
| download | qpid-python-9d99c6c37cf265134109e60679d6c2194a3c8d08.tar.gz | |
QPID-3603: HA primary marks ReplicatingSubscription ready immediately if queue is empty.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338890 13f79535-47bb-0310-9956-ffa450edef68
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/LogPrefix.cpp | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/LogPrefix.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 181 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.h | 39 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 1 |
6 files changed, 123 insertions, 106 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index b1c7bf98a5..c3e7b7f758 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -81,7 +81,6 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); statusChanged(l); - QPID_LOG(notice, logPrefix << "broker initialized"); } HaBroker::~HaBroker() {} diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/LogPrefix.cpp index 685e623287..828e226677 100644 --- a/qpid/cpp/src/qpid/ha/LogPrefix.cpp +++ b/qpid/cpp/src/qpid/ha/LogPrefix.cpp @@ -29,6 +29,12 @@ LogPrefix::LogPrefix(HaBroker& hb, const std::string& queue) : haBroker(&hb), st if (queue.size()) tail = " queue " + queue; } +LogPrefix::LogPrefix(LogPrefix& lp, const std::string& queue) + : haBroker(lp.haBroker), status(0) +{ + if (queue.size()) tail = " queue " + queue; +} + LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {} std::ostream& operator<<(std::ostream& o, const LogPrefix& l) { diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/LogPrefix.h index b45145fbb7..2db9f3c409 100644 --- a/qpid/cpp/src/qpid/ha/LogPrefix.h +++ b/qpid/cpp/src/qpid/ha/LogPrefix.h @@ -39,6 +39,7 @@ class LogPrefix public: /** For use by all classes other than HaBroker */ LogPrefix(HaBroker& hb, const std::string& queue=std::string()); + LogPrefix(LogPrefix& lp, const std::string& queue); /** For use by the HaBroker itself. */ LogPrefix(BrokerStatus&); diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 9067063fcf..62757a9060 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -27,6 +27,7 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" +#include "qpid/types/Uuid.h" #include <sstream> namespace qpid { @@ -65,25 +66,20 @@ ReplicatingSubscription::Factory::create( boost::shared_ptr<ReplicatingSubscription> rs; if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { rs.reset(new ReplicatingSubscription( - haBroker, + LogPrefix(haBroker), parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->addObserver(rs); // NOTE: readyPosition must be set _after_ addObserver, so // messages can't be enqueued after setting readyPosition // but before registering the observer. - rs->readyPosition = queue->getPosition(); - QPID_LOG(debug, rs->logPrefix << "created backup subscription, catching up to " - << QueuedMessage(rs->getQueue().get(), 0, rs->readyPosition) - << rs->logSuffix); - - + rs->setReadyPosition(); } return rs; } ReplicatingSubscription::ReplicatingSubscription( - HaBroker& haBroker, + LogPrefix lp, SemanticState* parent, const string& name, Queue::shared_ptr queue, @@ -96,9 +92,8 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments), - logPrefix(haBroker, queue->getName()), - events(new Queue(mask(name))), - consumer(new DelegatingConsumer(*this)), + logPrefix(lp, queue->getName()), + dummy(new Queue(mask(name))), sentReady(false) { // Separate the remote part from a "local-remote" address for logging. @@ -118,6 +113,36 @@ ReplicatingSubscription::ReplicatingSubscription( // so we will start consuming from the lowest numbered message. // This is incorrect if the sequence number wraps around, but // this is what all consumers currently do. + + QPID_LOG(debug, logPrefix << "Created replicating subscription" << logSuffix); +} + +ReplicatingSubscription::~ReplicatingSubscription() { + QPID_LOG(debug, logPrefix << "Detroyed replicating subscription" << logSuffix); +} + +// Called in subscription's connection thread when the subscription is created. +void ReplicatingSubscription::setReadyPosition() { + // Don't need to lock, this is called only on creation. + + // All messages after this position have been seen by us as QueueObserver. + readyPosition = getQueue()->getPosition(); + // Create a separate subscription to browse the front message on + // the queue so that we can test for queue empty. + boost::shared_ptr<Consumer> c(new GetPositionConsumer); + bool found = getQueue()->dispatch(c); + SequenceNumber front = c->getPosition(); + if (!found || front >= readyPosition) { + // The queue is empty, or has already advanced past the ready position. + QPID_LOG(debug, logPrefix << "backup subscribed, no catch up, at " + << readyPosition << logSuffix); + // Fake lock, only called during creation: + sendReady(*(sys::Mutex::ScopedLock*)0); + } + else { + QPID_LOG(debug, logPrefix << "backup subscribed, catching up " + << front << "-" << readyPosition << logSuffix); + } } // Message is delivered in the subscription's connection thread. @@ -126,38 +151,29 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { // Add position events for the subscribed queue, not for the internal event queue. if (qm.queue == getQueue().get()) { QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix); - sys::Mutex::ScopedLock l(lock); - if (position != qm.position) - throw Exception( - QPID_MSG("Expected position " << position - << " but got " << qm.position)); - // qm.position is the position of the newly enqueued qm on the local queue. - // backupPosition is latest position on backup queue before enqueueing qm. - if (qm.position <= backupPosition) - throw Exception( - QPID_MSG("Expected position > " << backupPosition - << " but got " << qm.position)); - - if (qm.position - backupPosition > 1) { - // Position has advanced because of messages dequeued ahead of us. - SequenceNumber send(qm.position); - --send; // Send the position before qm was enqueued. - sendPositionEvent(send, l); + { + sys::Mutex::ScopedLock l(lock); + assert(position == qm.position); + // qm.position is the position of the newly enqueued qm on the local queue. + // backupPosition is latest position on backup queue before enqueueing + if (qm.position <= backupPosition) + throw Exception( + QPID_MSG("Expected position > " << backupPosition + << " but got " << qm.position)); + if (qm.position - backupPosition > 1) { + // Position has advanced because of messages dequeued ahead of us. + SequenceNumber send(qm.position); + --send; // Send the position before qm was enqueued. + sendPositionEvent(send); + } + backupPosition = qm.position; } - backupPosition = qm.position; // Deliver the message bool delivered = ConsumerImpl::deliver(qm); - - // We have advanced to the initial position, backup is ready. - if (!sentReady && qm.position >= readyPosition) { - sendReadyEvent(l); - sentReady = true; - QPID_LOG(info, logPrefix << "Caught up at " << qm - << logSuffix); - // If we are in a primary broker, notify that a subscription is ready. - // FIXME aconway 2012-04-30: rename addReplica->readyReplica - if (Primary::get()) - Primary::get()->addReplica(qm.queue->getName()); + { + sys::Mutex::ScopedLock l(lock); + // If we have advanced to the initial position, the backup is ready. + if (qm.position >= readyPosition) sendReady(l); } return delivered; } @@ -170,12 +186,22 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) { } } -ReplicatingSubscription::~ReplicatingSubscription() {} - +// Send a ready event to the backup. +void ReplicatingSubscription::sendReady(const sys::Mutex::ScopedLock&) { + if (sentReady) return; + sentReady = true; + framing::Buffer buffer; + sendEvent(QueueReplicator::READY_EVENT_KEY, buffer); + QPID_LOG(info, logPrefix << "Caught up at " << getPosition() << logSuffix); + // Notify Primary that a subscription is ready. + // FIXME aconway 2012-04-30: rename addReplica->readyReplica + if (Primary::get()) Primary::get()->addReplica(getQueue()->getName()); +} // INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg -// Mark a message completed. May be called by acknowledge or dequeued +// Mark a message completed. May be called by acknowledge or dequeued, +// in arbitrary connection threads. void ReplicatingSubscription::complete( const QueuedMessage& qm, const sys::Mutex::ScopedLock&) { @@ -195,14 +221,17 @@ void ReplicatingSubscription::complete( } // Called before we get notified of the message being available and -// under the message lock in the queue. Called in arbitrary connection thread. +// under the message lock in the queue. +// Called in arbitrary connection thread *with the queue lock held* void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { - sys::Mutex::ScopedLock l(lock); // Delay completion QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix); qm.payload->getIngressCompletion().startCompleter(); - assert(delayed.find(qm.position) == delayed.end()); - delayed[qm.position] = qm; + { + sys::Mutex::ScopedLock l(lock); + assert(delayed.find(qm.position) == delayed.end()); + delayed[qm.position] = qm; + } } // Function to complete a delayed message, called by cancel() @@ -229,7 +258,7 @@ void ReplicatingSubscription::cancel() ConsumerImpl::cancel(); } -// Called on primary in the backups IO thread. +// Consumer override, called on primary in the backup's IO thread. void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { sys::Mutex::ScopedLock l(lock); // Finish completion of message, it has been acknowledged by the backup. @@ -241,25 +270,25 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) { bool ReplicatingSubscription::hideDeletedError() { return true; } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l) +void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&) { QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues << " from " << getQueue()->getName() << logSuffix); string buf(dequeues.encodedSize(),'\0'); framing::Buffer buffer(&buf[0], buf.size()); dequeues.encode(buffer); - dequeues.clear(); buffer.reset(); - sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l); + sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer); } -// Called after the message has been removed from the deque and under -// the messageLock in the queue. Called in arbitrary connection threads. +// QueueObserver override. Called after the message has been removed +// from the deque and under the messageLock in the queue. Called in +// arbitrary connection threads. void ReplicatingSubscription::dequeued(const QueuedMessage& qm) { { - sys::Mutex::ScopedLock l(lock); QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix); + sys::Mutex::ScopedLock l(lock); dequeues.add(qm.position); // If we have not yet sent this message to the backup, then // complete it now as it will never be accepted. @@ -269,8 +298,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm) } // Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendPositionEvent( - SequenceNumber position, const sys::Mutex::ScopedLock&l ) +void ReplicatingSubscription::sendPositionEvent(SequenceNumber position) { QPID_LOG(trace, logPrefix << "sending position " << position << ", was " << backupPosition << logSuffix); @@ -278,18 +306,10 @@ void ReplicatingSubscription::sendPositionEvent( framing::Buffer buffer(&buf[0], buf.size()); position.encode(buffer); buffer.reset(); - sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); + sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer); } -// Called with lock held. Called in subscription's connection thread. -void ReplicatingSubscription::sendReadyEvent(const sys::Mutex::ScopedLock&l ) -{ - framing::Buffer buffer; - sendEvent(QueueReplicator::READY_EVENT_KEY, buffer, l); -} - -void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer, - const sys::Mutex::ScopedLock&) +void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer) { //generate event message boost::intrusive_ptr<Message> event = new Message(); @@ -309,15 +329,14 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& event->getFrames().append(header); event->getFrames().append(content); - DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true); + DeliveryProperties* props = + event->getFrames().getHeaders()->get<DeliveryProperties>(true); props->setRoutingKey(key); - // Send the event using the events queue. Consumer is a - // DelegatingConsumer that delegates to *this for everything but - // has an independnet position. We put an event on events and - // dispatch it through ourselves to send it in line with the - // normal browsing messages. - events->deliver(event); - events->dispatch(consumer); + // Send the event directly to the base consumer implementation. + // We don't really need a queue here but we pass a dummy queue + // to conform to the consumer API. + QueuedMessage qm(dummy.get(), event); + ConsumerImpl::deliver(qm); } @@ -326,18 +345,12 @@ bool ReplicatingSubscription::doDispatch() { { sys::Mutex::ScopedLock l(lock); - if (!dequeues.empty()) sendDequeueEvent(l); + if (!dequeues.empty()) { + sendDequeueEvent(l); + dequeues.clear(); + } } return ConsumerImpl::doDispatch(); } -ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} -ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} -bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& qm) { return delegate.deliver(qm); } -void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } -bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } -bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } -bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); } -OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); } - }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index 952c970f41..66b2651f23 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -26,6 +26,7 @@ #include "qpid/broker/SemanticState.h" #include "qpid/broker/QueueObserver.h" #include "qpid/broker/ConsumerFactory.h" +#include "qpid/types/Uuid.h" #include <iosfwd> namespace qpid { @@ -42,6 +43,7 @@ class Buffer; } namespace ha { +class LogPrefix; /** * A susbcription that represents a backup replicating a queue. @@ -73,7 +75,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, // Argument names for consume command. static const std::string QPID_REPLICATING_SUBSCRIPTION; - ReplicatingSubscription(HaBroker&, + ReplicatingSubscription(LogPrefix, broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, @@ -82,7 +84,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, ~ReplicatingSubscription(); - // QueueObserver overrides. + // QueueObserver overrides. NB called with queue lock held. void enqueued(const broker::QueuedMessage&); void dequeued(const broker::QueuedMessage&); void acquired(const broker::QueuedMessage&) {} @@ -95,6 +97,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, bool browseAcquired() const { return true; } bool hideDeletedError(); + void setReadyPosition(); protected: bool doDispatch(); @@ -103,8 +106,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, LogPrefix logPrefix; std::string logSuffix; - boost::shared_ptr<broker::Queue> events; - boost::shared_ptr<broker::Consumer> consumer; + boost::shared_ptr<broker::Queue> dummy; // Used to send event messages Delayed delayed; framing::SequenceSet dequeues; framing::SequenceNumber backupPosition; @@ -114,28 +116,25 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&); void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&); void sendDequeueEvent(const sys::Mutex::ScopedLock&); - void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&); + void sendPositionEvent(framing::SequenceNumber); + void sendReady(const sys::Mutex::ScopedLock&); void sendReadyEvent(const sys::Mutex::ScopedLock&); - void sendEvent(const std::string& key, framing::Buffer&, - const sys::Mutex::ScopedLock&); + void sendEvent(const std::string& key, framing::Buffer&); - class DelegatingConsumer : public Consumer + /** Dummy consumer used to get the front position on the queue */ + class GetPositionConsumer : public Consumer { public: - DelegatingConsumer(ReplicatingSubscription&); - ~DelegatingConsumer(); - bool deliver(broker::QueuedMessage& msg); - void notify(); - bool filter(boost::intrusive_ptr<broker::Message>); - bool accept(boost::intrusive_ptr<broker::Message>); + GetPositionConsumer() : + Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {} + bool deliver(broker::QueuedMessage& ) { return true; } + void notify() {} + bool filter(boost::intrusive_ptr<broker::Message>) { return true; } + bool accept(boost::intrusive_ptr<broker::Message>) { return true; } void cancel() {} void acknowledged(const broker::QueuedMessage&) {} - bool browseAcquired() const; - - broker::OwnershipToken* getSession(); - - private: - ReplicatingSubscription& delegate; + bool browseAcquired() const { return true; } + broker::OwnershipToken* getSession() { return 0; } }; friend class Factory; diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 15137a0c5f..b5dce0e4d7 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -43,7 +43,6 @@ class HaBroker(Broker): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=info+", "--log-enable=debug+:ha::", # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", |
