summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-15 21:05:46 +0000
committerAlan Conway <aconway@apache.org>2012-05-15 21:05:46 +0000
commit9d99c6c37cf265134109e60679d6c2194a3c8d08 (patch)
tree55a27d0c61dd71ad09e16a910f01626c034af8d6
parent55305747e6e7f931756bfa21460c37e350f5ea0f (diff)
downloadqpid-python-9d99c6c37cf265134109e60679d6c2194a3c8d08.tar.gz
QPID-3603: HA primary marks ReplicatingSubscription ready immediately if queue is empty.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338890 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.h1
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp181
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h39
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py1
6 files changed, 123 insertions, 106 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index b1c7bf98a5..c3e7b7f758 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -81,7 +81,6 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
statusChanged(l);
- QPID_LOG(notice, logPrefix << "broker initialized");
}
HaBroker::~HaBroker() {}
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
index 685e623287..828e226677 100644
--- a/qpid/cpp/src/qpid/ha/LogPrefix.cpp
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
@@ -29,6 +29,12 @@ LogPrefix::LogPrefix(HaBroker& hb, const std::string& queue) : haBroker(&hb), st
if (queue.size()) tail = " queue " + queue;
}
+LogPrefix::LogPrefix(LogPrefix& lp, const std::string& queue)
+ : haBroker(lp.haBroker), status(0)
+{
+ if (queue.size()) tail = " queue " + queue;
+}
+
LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {}
std::ostream& operator<<(std::ostream& o, const LogPrefix& l) {
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.h b/qpid/cpp/src/qpid/ha/LogPrefix.h
index b45145fbb7..2db9f3c409 100644
--- a/qpid/cpp/src/qpid/ha/LogPrefix.h
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.h
@@ -39,6 +39,7 @@ class LogPrefix
public:
/** For use by all classes other than HaBroker */
LogPrefix(HaBroker& hb, const std::string& queue=std::string());
+ LogPrefix(LogPrefix& lp, const std::string& queue);
/** For use by the HaBroker itself. */
LogPrefix(BrokerStatus&);
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 9067063fcf..62757a9060 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -27,6 +27,7 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/log/Statement.h"
+#include "qpid/types/Uuid.h"
#include <sstream>
namespace qpid {
@@ -65,25 +66,20 @@ ReplicatingSubscription::Factory::create(
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
- haBroker,
+ LogPrefix(haBroker),
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
queue->addObserver(rs);
// NOTE: readyPosition must be set _after_ addObserver, so
// messages can't be enqueued after setting readyPosition
// but before registering the observer.
- rs->readyPosition = queue->getPosition();
- QPID_LOG(debug, rs->logPrefix << "created backup subscription, catching up to "
- << QueuedMessage(rs->getQueue().get(), 0, rs->readyPosition)
- << rs->logSuffix);
-
-
+ rs->setReadyPosition();
}
return rs;
}
ReplicatingSubscription::ReplicatingSubscription(
- HaBroker& haBroker,
+ LogPrefix lp,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -96,9 +92,8 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments),
- logPrefix(haBroker, queue->getName()),
- events(new Queue(mask(name))),
- consumer(new DelegatingConsumer(*this)),
+ logPrefix(lp, queue->getName()),
+ dummy(new Queue(mask(name))),
sentReady(false)
{
// Separate the remote part from a "local-remote" address for logging.
@@ -118,6 +113,36 @@ ReplicatingSubscription::ReplicatingSubscription(
// so we will start consuming from the lowest numbered message.
// This is incorrect if the sequence number wraps around, but
// this is what all consumers currently do.
+
+ QPID_LOG(debug, logPrefix << "Created replicating subscription" << logSuffix);
+}
+
+ReplicatingSubscription::~ReplicatingSubscription() {
+ QPID_LOG(debug, logPrefix << "Detroyed replicating subscription" << logSuffix);
+}
+
+// Called in subscription's connection thread when the subscription is created.
+void ReplicatingSubscription::setReadyPosition() {
+ // Don't need to lock, this is called only on creation.
+
+ // All messages after this position have been seen by us as QueueObserver.
+ readyPosition = getQueue()->getPosition();
+ // Create a separate subscription to browse the front message on
+ // the queue so that we can test for queue empty.
+ boost::shared_ptr<Consumer> c(new GetPositionConsumer);
+ bool found = getQueue()->dispatch(c);
+ SequenceNumber front = c->getPosition();
+ if (!found || front >= readyPosition) {
+ // The queue is empty, or has already advanced past the ready position.
+ QPID_LOG(debug, logPrefix << "backup subscribed, no catch up, at "
+ << readyPosition << logSuffix);
+ // Fake lock, only called during creation:
+ sendReady(*(sys::Mutex::ScopedLock*)0);
+ }
+ else {
+ QPID_LOG(debug, logPrefix << "backup subscribed, catching up "
+ << front << "-" << readyPosition << logSuffix);
+ }
}
// Message is delivered in the subscription's connection thread.
@@ -126,38 +151,29 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
// Add position events for the subscribed queue, not for the internal event queue.
if (qm.queue == getQueue().get()) {
QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix);
- sys::Mutex::ScopedLock l(lock);
- if (position != qm.position)
- throw Exception(
- QPID_MSG("Expected position " << position
- << " but got " << qm.position));
- // qm.position is the position of the newly enqueued qm on the local queue.
- // backupPosition is latest position on backup queue before enqueueing qm.
- if (qm.position <= backupPosition)
- throw Exception(
- QPID_MSG("Expected position > " << backupPosition
- << " but got " << qm.position));
-
- if (qm.position - backupPosition > 1) {
- // Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(qm.position);
- --send; // Send the position before qm was enqueued.
- sendPositionEvent(send, l);
+ {
+ sys::Mutex::ScopedLock l(lock);
+ assert(position == qm.position);
+ // qm.position is the position of the newly enqueued qm on the local queue.
+ // backupPosition is latest position on backup queue before enqueueing
+ if (qm.position <= backupPosition)
+ throw Exception(
+ QPID_MSG("Expected position > " << backupPosition
+ << " but got " << qm.position));
+ if (qm.position - backupPosition > 1) {
+ // Position has advanced because of messages dequeued ahead of us.
+ SequenceNumber send(qm.position);
+ --send; // Send the position before qm was enqueued.
+ sendPositionEvent(send);
+ }
+ backupPosition = qm.position;
}
- backupPosition = qm.position;
// Deliver the message
bool delivered = ConsumerImpl::deliver(qm);
-
- // We have advanced to the initial position, backup is ready.
- if (!sentReady && qm.position >= readyPosition) {
- sendReadyEvent(l);
- sentReady = true;
- QPID_LOG(info, logPrefix << "Caught up at " << qm
- << logSuffix);
- // If we are in a primary broker, notify that a subscription is ready.
- // FIXME aconway 2012-04-30: rename addReplica->readyReplica
- if (Primary::get())
- Primary::get()->addReplica(qm.queue->getName());
+ {
+ sys::Mutex::ScopedLock l(lock);
+ // If we have advanced to the initial position, the backup is ready.
+ if (qm.position >= readyPosition) sendReady(l);
}
return delivered;
}
@@ -170,12 +186,22 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
}
}
-ReplicatingSubscription::~ReplicatingSubscription() {}
-
+// Send a ready event to the backup.
+void ReplicatingSubscription::sendReady(const sys::Mutex::ScopedLock&) {
+ if (sentReady) return;
+ sentReady = true;
+ framing::Buffer buffer;
+ sendEvent(QueueReplicator::READY_EVENT_KEY, buffer);
+ QPID_LOG(info, logPrefix << "Caught up at " << getPosition() << logSuffix);
+ // Notify Primary that a subscription is ready.
+ // FIXME aconway 2012-04-30: rename addReplica->readyReplica
+ if (Primary::get()) Primary::get()->addReplica(getQueue()->getName());
+}
// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
-// Mark a message completed. May be called by acknowledge or dequeued
+// Mark a message completed. May be called by acknowledge or dequeued,
+// in arbitrary connection threads.
void ReplicatingSubscription::complete(
const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
{
@@ -195,14 +221,17 @@ void ReplicatingSubscription::complete(
}
// Called before we get notified of the message being available and
-// under the message lock in the queue. Called in arbitrary connection thread.
+// under the message lock in the queue.
+// Called in arbitrary connection thread *with the queue lock held*
void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
- sys::Mutex::ScopedLock l(lock);
// Delay completion
QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
qm.payload->getIngressCompletion().startCompleter();
- assert(delayed.find(qm.position) == delayed.end());
- delayed[qm.position] = qm;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ assert(delayed.find(qm.position) == delayed.end());
+ delayed[qm.position] = qm;
+ }
}
// Function to complete a delayed message, called by cancel()
@@ -229,7 +258,7 @@ void ReplicatingSubscription::cancel()
ConsumerImpl::cancel();
}
-// Called on primary in the backups IO thread.
+// Consumer override, called on primary in the backup's IO thread.
void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
sys::Mutex::ScopedLock l(lock);
// Finish completion of message, it has been acknowledged by the backup.
@@ -241,25 +270,25 @@ void ReplicatingSubscription::acknowledged(const QueuedMessage& msg) {
bool ReplicatingSubscription::hideDeletedError() { return true; }
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock& l)
+void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
{
QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
<< " from " << getQueue()->getName() << logSuffix);
string buf(dequeues.encodedSize(),'\0');
framing::Buffer buffer(&buf[0], buf.size());
dequeues.encode(buffer);
- dequeues.clear();
buffer.reset();
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
}
-// Called after the message has been removed from the deque and under
-// the messageLock in the queue. Called in arbitrary connection threads.
+// QueueObserver override. Called after the message has been removed
+// from the deque and under the messageLock in the queue. Called in
+// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
{
- sys::Mutex::ScopedLock l(lock);
QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
+ sys::Mutex::ScopedLock l(lock);
dequeues.add(qm.position);
// If we have not yet sent this message to the backup, then
// complete it now as it will never be accepted.
@@ -269,8 +298,7 @@ void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
}
// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(
- SequenceNumber position, const sys::Mutex::ScopedLock&l )
+void ReplicatingSubscription::sendPositionEvent(SequenceNumber position)
{
QPID_LOG(trace, logPrefix << "sending position " << position
<< ", was " << backupPosition << logSuffix);
@@ -278,18 +306,10 @@ void ReplicatingSubscription::sendPositionEvent(
framing::Buffer buffer(&buf[0], buf.size());
position.encode(buffer);
buffer.reset();
- sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
+ sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
}
-// Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendReadyEvent(const sys::Mutex::ScopedLock&l )
-{
- framing::Buffer buffer;
- sendEvent(QueueReplicator::READY_EVENT_KEY, buffer, l);
-}
-
-void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer,
- const sys::Mutex::ScopedLock&)
+void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)
{
//generate event message
boost::intrusive_ptr<Message> event = new Message();
@@ -309,15 +329,14 @@ void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer&
event->getFrames().append(header);
event->getFrames().append(content);
- DeliveryProperties* props = event->getFrames().getHeaders()->get<DeliveryProperties>(true);
+ DeliveryProperties* props =
+ event->getFrames().getHeaders()->get<DeliveryProperties>(true);
props->setRoutingKey(key);
- // Send the event using the events queue. Consumer is a
- // DelegatingConsumer that delegates to *this for everything but
- // has an independnet position. We put an event on events and
- // dispatch it through ourselves to send it in line with the
- // normal browsing messages.
- events->deliver(event);
- events->dispatch(consumer);
+ // Send the event directly to the base consumer implementation.
+ // We don't really need a queue here but we pass a dummy queue
+ // to conform to the consumer API.
+ QueuedMessage qm(dummy.get(), event);
+ ConsumerImpl::deliver(qm);
}
@@ -326,18 +345,12 @@ bool ReplicatingSubscription::doDispatch()
{
{
sys::Mutex::ScopedLock l(lock);
- if (!dequeues.empty()) sendDequeueEvent(l);
+ if (!dequeues.empty()) {
+ sendDequeueEvent(l);
+ dequeues.clear();
+ }
}
return ConsumerImpl::doDispatch();
}
-ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
-ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& qm) { return delegate.deliver(qm); }
-void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
-bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
-bool ReplicatingSubscription::DelegatingConsumer::browseAcquired() const { return delegate.browseAcquired(); }
-OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { return delegate.getSession(); }
-
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index 952c970f41..66b2651f23 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -26,6 +26,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/QueueObserver.h"
#include "qpid/broker/ConsumerFactory.h"
+#include "qpid/types/Uuid.h"
#include <iosfwd>
namespace qpid {
@@ -42,6 +43,7 @@ class Buffer;
}
namespace ha {
+class LogPrefix;
/**
* A susbcription that represents a backup replicating a queue.
@@ -73,7 +75,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
// Argument names for consume command.
static const std::string QPID_REPLICATING_SUBSCRIPTION;
- ReplicatingSubscription(HaBroker&,
+ ReplicatingSubscription(LogPrefix,
broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
@@ -82,7 +84,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
~ReplicatingSubscription();
- // QueueObserver overrides.
+ // QueueObserver overrides. NB called with queue lock held.
void enqueued(const broker::QueuedMessage&);
void dequeued(const broker::QueuedMessage&);
void acquired(const broker::QueuedMessage&) {}
@@ -95,6 +97,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
bool browseAcquired() const { return true; }
bool hideDeletedError();
+ void setReadyPosition();
protected:
bool doDispatch();
@@ -103,8 +106,7 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
LogPrefix logPrefix;
std::string logSuffix;
- boost::shared_ptr<broker::Queue> events;
- boost::shared_ptr<broker::Consumer> consumer;
+ boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
Delayed delayed;
framing::SequenceSet dequeues;
framing::SequenceNumber backupPosition;
@@ -114,28 +116,25 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
void sendDequeueEvent(const sys::Mutex::ScopedLock&);
- void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+ void sendPositionEvent(framing::SequenceNumber);
+ void sendReady(const sys::Mutex::ScopedLock&);
void sendReadyEvent(const sys::Mutex::ScopedLock&);
- void sendEvent(const std::string& key, framing::Buffer&,
- const sys::Mutex::ScopedLock&);
+ void sendEvent(const std::string& key, framing::Buffer&);
- class DelegatingConsumer : public Consumer
+ /** Dummy consumer used to get the front position on the queue */
+ class GetPositionConsumer : public Consumer
{
public:
- DelegatingConsumer(ReplicatingSubscription&);
- ~DelegatingConsumer();
- bool deliver(broker::QueuedMessage& msg);
- void notify();
- bool filter(boost::intrusive_ptr<broker::Message>);
- bool accept(boost::intrusive_ptr<broker::Message>);
+ GetPositionConsumer() :
+ Consumer("ha.GetPositionConsumer."+types::Uuid(true).str(), false) {}
+ bool deliver(broker::QueuedMessage& ) { return true; }
+ void notify() {}
+ bool filter(boost::intrusive_ptr<broker::Message>) { return true; }
+ bool accept(boost::intrusive_ptr<broker::Message>) { return true; }
void cancel() {}
void acknowledged(const broker::QueuedMessage&) {}
- bool browseAcquired() const;
-
- broker::OwnershipToken* getSession();
-
- private:
- ReplicatingSubscription& delegate;
+ bool browseAcquired() const { return true; }
+ broker::OwnershipToken* getSession() { return 0; }
};
friend class Factory;
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 15137a0c5f..b5dce0e4d7 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -43,7 +43,6 @@ class HaBroker(Broker):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=info+",
"--log-enable=debug+:ha::",
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",