diff options
author | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2013-02-28 16:14:30 +0000 |
commit | 9c73ef7a5ac10acd6a50d5d52bd721fc2faa5919 (patch) | |
tree | 2a890e1df09e5b896a9b4168a7b22648f559a1f2 /cpp/src/qpid/ha/RemoteBackup.cpp | |
parent | 172d9b2a16cfb817bbe632d050acba7e31401cd2 (diff) | |
download | qpid-python-asyncstore.tar.gz |
Update from trunk r1375509 through r1450773asyncstore
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1451244 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/ha/RemoteBackup.cpp')
-rw-r--r-- | cpp/src/qpid/ha/RemoteBackup.cpp | 53 |
1 files changed, 35 insertions, 18 deletions
diff --git a/cpp/src/qpid/ha/RemoteBackup.cpp b/cpp/src/qpid/ha/RemoteBackup.cpp index 3421380940..798ade3f73 100644 --- a/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/cpp/src/qpid/ha/RemoteBackup.cpp @@ -21,6 +21,7 @@ #include "RemoteBackup.h" #include "QueueGuard.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/log/Statement.h" @@ -32,32 +33,45 @@ namespace ha { using sys::Mutex; using boost::bind; -RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) : - logPrefix("Primary: Remote backup "+info.getLogId()+": "), - brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false) -{} +RemoteBackup::RemoteBackup( + const BrokerInfo& info, broker::Connection* c +) : brokerInfo(info), replicationTest(NONE), connection(c), reportedReady(false) +{ + std::ostringstream oss; + oss << "Primary: Remote backup " << info << ": "; + logPrefix = oss.str(); +} -void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards) +void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards) { - QPID_LOG(debug, logPrefix << "Setting initial queues" << (createGuards ? " and guards" : "")); - queues.eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1, createGuards)); + queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards)); + QPID_LOG(debug, logPrefix << "Set " << catchupQueues.size() << " catch-up queues" + << (createGuards ? " and guards" : "")); } RemoteBackup::~RemoteBackup() { cancel(); } void RemoteBackup::cancel() { + QPID_LOG(debug, logPrefix << "Cancelled " << (connection? "connected":"disconnected") + << " backup: " << brokerInfo); for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i) i->second->cancel(); guards.clear(); + if (connection) { + connection->abort(); + connection = 0; + } } bool RemoteBackup::isReady() { - return connected && initialQueues.empty(); + return connection && catchupQueues.empty(); } -void RemoteBackup::initialQueue(const QueuePtr& q, bool createGuard) { - if (replicationTest.isReplicated(ALL, *q)) { - initialQueues.insert(q); +void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) { + if (replicationTest.getLevel(*q) == ALL) { + QPID_LOG(debug, logPrefix << "Catch-up queue" + << (createGuard ? " and guard" : "") << ": " << q->getName()); + catchupQueues.insert(q); if (createGuard) guards[q].reset(new QueueGuard(*q, brokerInfo)); } } @@ -88,21 +102,24 @@ std::ostream& operator<<(std::ostream& o, const QueueSetPrinter& qp) { } void RemoteBackup::ready(const QueuePtr& q) { - initialQueues.erase(q); - QPID_LOG(debug, logPrefix << "Queue ready: " << q->getName() - << QueueSetPrinter(", waiting for: ", initialQueues)); - if (isReady()) QPID_LOG(debug, logPrefix << "All queues ready"); + catchupQueues.erase(q); + if (catchupQueues.size()) { + QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() << ", " + << catchupQueues.size() << " remain to catch up"); + } + else + QPID_LOG(debug, logPrefix << "Caught up on queue: " << q->getName() ); } -// Called via ConfigurationObserver::queueCreate and from initialQueue +// Called via ConfigurationObserver::queueCreate and from catchupQueue void RemoteBackup::queueCreate(const QueuePtr& q) { - if (replicationTest.isReplicated(ALL, *q)) + if (replicationTest.getLevel(*q) == ALL) guards[q].reset(new QueueGuard(*q, brokerInfo)); } // Called via ConfigurationObserver void RemoteBackup::queueDestroy(const QueuePtr& q) { - initialQueues.erase(q); + catchupQueues.erase(q); GuardMap::iterator i = guards.find(q); if (i != guards.end()) { i->second->cancel(); |