diff options
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 69 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.h | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.h | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Settings.h | 3 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 33 |
7 files changed, 112 insertions, 33 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index 42758c4689..360b6892ab 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -46,6 +46,8 @@ struct Options : public qpid::Options { "Password for connections between HA brokers") ("ha-mechanism", optValue(settings.mechanism, "MECH"), "Authentication mechanism for connections between HA brokers") + ("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"), + "Maximum time to wait for an expected backup to connect and become ready.") ; } }; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 56598c2b5a..a1ce81c3a5 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -32,6 +32,7 @@ #include "qpid/broker/Queue.h" #include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Timer.h" #include <boost/bind.hpp> namespace qpid { @@ -40,7 +41,7 @@ namespace ha { using sys::Mutex; namespace { -// No-op connection observer, allows all connections. + class PrimaryConnectionObserver : public broker::ConnectionObserver { public: @@ -61,6 +62,15 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver Primary& primary; }; +class ExpectedBackupTimerTask : public sys::TimerTask { + public: + ExpectedBackupTimerTask(Primary& p, sys::AbsTime deadline) + : TimerTask(deadline, "ExpectedBackupTimerTask"), primary(p) {} + void fire() { primary.timeoutExpectedBackups(); } + private: + Primary& primary; +}; + } // namespace Primary* Primary::instance = 0; @@ -75,16 +85,24 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : } else { // NOTE: RemoteBackups must be created before we set the ConfigurationObserver - // orr ConnectionObserver so that there is no client activity while + // or ConnectionObserver so that there is no client activity while // the QueueGuards are created. QPID_LOG(debug, logPrefix << "Promoted, expected backups: " << expect); for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) { - bool guard = true; // Create queue guards immediately for expected backups. boost::shared_ptr<RemoteBackup> backup( - new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest(), guard)); + new RemoteBackup( + *i, haBroker.getBroker(), haBroker.getReplicationTest(), + true, // Create queue guards immediately for expected backups. + false // Not yet connected. + )); backups[i->getSystemId()] = backup; - if (!backup->isReady()) initialBackups.insert(backup); + if (!backup->isReady()) expectedBackups.insert(backup); } + // Set timeout for expected brokers to connect and become ready. + sys::Duration timeout(hb.getSettings().backupTimeout*sys::TIME_SEC); + sys::AbsTime deadline(sys::now(), timeout); + timerTask.reset(new ExpectedBackupTimerTask(*this, deadline)); + hb.getBroker().getTimer().add(timerTask); } configurationObserver.reset(new PrimaryConfigurationObserver(*this)); @@ -98,14 +116,15 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : } Primary::~Primary() { + if (timerTask) timerTask->cancel(); haBroker.getBroker().getConfigurationObservers().remove(configurationObserver); } void Primary::checkReady(Mutex::ScopedLock&) { - if (!active && initialBackups.empty()) { + if (!active && expectedBackups.empty()) { active = true; - QPID_LOG(notice, logPrefix << "All initial backups are ready."); Mutex::ScopedUnlock u(lock); // Don't hold lock across callback + QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active."); haBroker.activate(); } } @@ -113,13 +132,26 @@ void Primary::checkReady(Mutex::ScopedLock&) { void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { if (i != backups.end() && i->second->isReady()) { BrokerInfo info = i->second->getBrokerInfo(); + QPID_LOG(info, "Expected backup is ready: " << info); info.setStatus(READY); haBroker.addBroker(info); - initialBackups.erase(i->second); + expectedBackups.erase(i->second); checkReady(l); } } +void Primary::timeoutExpectedBackups() { + sys::Mutex::ScopedLock l(lock); + if (active) return; // Already activated + for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end(); ++i) + { + QPID_LOG(error, "Expected backup timed out: " << (*i)->getBrokerInfo()); + (*i)->cancel(); + } + expectedBackups.clear(); + checkReady(l); +} + void Primary::readyReplica(const ReplicatingSubscription& rs) { sys::Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId()); @@ -153,12 +185,17 @@ void Primary::opened(broker::Connection& connection) { BackupMap::iterator i = backups.find(info.getSystemId()); if (i == backups.end()) { QPID_LOG(debug, logPrefix << "New backup connected: " << info); - bool guard = false; // Lazy-create guards for new backups. Creating them here could deadlock. backups[info.getSystemId()].reset( - new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest(), guard)); + new RemoteBackup( + info, haBroker.getBroker(), haBroker.getReplicationTest(), + false, // Lazy-create guards for new backups, creating now deadlocks + true // Backup is connected + )); } else { QPID_LOG(debug, logPrefix << "Known backup connected: " << info); + i->second->setConnected(true); + checkReady(i, l); } haBroker.addBroker(info); } @@ -171,14 +208,16 @@ void Primary::closed(broker::Connection& connection) { Mutex::ScopedLock l(lock); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - haBroker.removeBroker(info.getSystemId()); QPID_LOG(debug, logPrefix << "Backup disconnected: " << info); + haBroker.removeBroker(info.getSystemId()); + BackupMap::iterator i = backups.find(info.getSystemId()); + if (i != backups.end()) i->second->setConnected(false); } - // NOTE: we do not modify backups here, we only add to the known backups set - // we never remove from it. - + // NOTE: we do not remove from the backups map here, the backups map holds + // all the backups we know about whether connected or not. + // // It is possible for a backup connection to be rejected while we are a backup, - // but the closed is seen when we have become primary. Removing the entry + // but the closed is seen after we have become primary. Removing the entry // from backups in this case would be incorrect. } diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h index 3de579a88d..26883f4416 100644 --- a/qpid/cpp/src/qpid/ha/Primary.h +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -26,6 +26,7 @@ #include "BrokerInfo.h" #include "qpid/sys/Mutex.h" #include <boost/shared_ptr.hpp> +#include <boost/intrusive_ptr.hpp> #include <map> #include <string> @@ -38,6 +39,10 @@ class ConnectionObserver; class ConfigurationObserver; } +namespace sys { +class TimerTask; +} + namespace ha { class HaBroker; class ReplicatingSubscription; @@ -74,6 +79,9 @@ class Primary boost::shared_ptr<QueueGuard> getGuard(const QueuePtr& q, const BrokerInfo&); + // Called in timer thread when the deadline for expected backups expires. + void timeoutExpectedBackups(); + private: typedef std::map<types::Uuid, boost::shared_ptr<RemoteBackup> > BackupMap; typedef std::set<boost::shared_ptr<RemoteBackup> > BackupSet; @@ -89,7 +97,7 @@ class Primary * Set of expected backups that must be ready before we declare ourselves * active */ - BackupSet initialBackups; + BackupSet expectedBackups; /** * Map of all the remote backups we know about: any expected backups plus * all actual backups that have connected. We do not remove entries when a @@ -98,6 +106,7 @@ class Primary BackupMap backups; boost::shared_ptr<broker::ConnectionObserver> connectionObserver; boost::shared_ptr<broker::ConfigurationObserver> configurationObserver; + boost::intrusive_ptr<sys::TimerTask> timerTask; static Primary* instance; }; diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 7e65b287b0..2b8a0077f5 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -31,21 +31,21 @@ namespace ha { using sys::Mutex; RemoteBackup::RemoteBackup( - const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg) : - logPrefix("Primary remote backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt), - createGuards(cg) + const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg, bool con) : + logPrefix("Primary remote backup "+info.getLogId()+": "), + brokerInfo(info), replicationTest(rt), + createGuards(cg), connected(con) { QPID_LOG(debug, logPrefix << "Guarding queues for backup broker."); broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1)); } -RemoteBackup::~RemoteBackup() { - for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i) - i->second->cancel(); -} +RemoteBackup::~RemoteBackup() { cancel(); } + +void RemoteBackup::cancel() { guards.clear(); } bool RemoteBackup::isReady() { - return initialQueues.empty(); + return connected && initialQueues.empty(); } void RemoteBackup::initialQueue(const QueuePtr& q) { diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h index f2e46c8042..8ea539b167 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.h +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h @@ -52,12 +52,17 @@ class RemoteBackup typedef boost::shared_ptr<broker::Queue> QueuePtr; /** Note: isReady() can be true after construction */ - RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, bool createGuards); + RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, + bool createGuards, bool connected); ~RemoteBackup(); /** Return guard associated with a queue. Used to create ReplicatingSubscription. */ GuardPtr guard(const QueuePtr&); + /** Is the remote backup connected? */ + void setConnected(bool b) { connected=b; } + bool isConnected() const { return connected; } + /** ReplicatingSubscription associated with queue is ready. * Note: may set isReady() */ @@ -72,6 +77,9 @@ class RemoteBackup /**@return true when all initial queues for this backup are ready. */ bool isReady(); + /**Cancel all queue guards, called if we are timed out. */ + void cancel(); + BrokerInfo getBrokerInfo() const { return brokerInfo; } private: typedef std::map<QueuePtr, GuardPtr> GuardMap; @@ -83,6 +91,7 @@ class RemoteBackup GuardMap guards; QueueSet initialQueues; bool createGuards; + bool connected; void initialQueue(const QueuePtr&); }; diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index 213a5f64d5..1a612aee66 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -34,7 +34,7 @@ namespace ha { class Settings { public: - Settings() : cluster(false), replicateDefault(NONE) + Settings() : cluster(false), replicateDefault(NONE), backupTimeout(2) {} bool cluster; // True if we are a cluster member. @@ -42,6 +42,7 @@ class Settings std::string brokerUrl; Enum<ReplicateLevel> replicateDefault; std::string username, password, mechanism; + double backupTimeout; private: }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 3612837214..01dac2664d 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -869,9 +869,9 @@ class RecoveryTests(BrokerTest): cluster[3].promote() # New primary, backups will be 1 and 2 cluster[3].wait_status("recovering") - def trySync(s): + def assertSyncTimeout(s): try: - s.sync(timeout=.1) + s.sync(timeout=.01) self.fail("Expected Timeout exception") except Timeout: pass @@ -879,18 +879,18 @@ class RecoveryTests(BrokerTest): s2 = cluster.connect(3).session().sender("q2;{create:always}") # Verify that messages sent are not completed for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) - trySync(s1) + assertSyncTimeout(s1) self.assertEqual(s1.unsettled(), 100) - trySync(s2) + assertSyncTimeout(s2) self.assertEqual(s2.unsettled(), 100) # Verify we can receive even if sending is on hold: cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) # Restart backups, verify queues are released only when both backups are up cluster.restart(1) - trySync(s1) + assertSyncTimeout(s1) self.assertEqual(s1.unsettled(), 100) - trySync(s2) + assertSyncTimeout(s2) self.assertEqual(s2.unsettled(), 100) self.assertEqual(cluster[3].ha_status(), "recovering") cluster.restart(2) @@ -904,7 +904,26 @@ class RecoveryTests(BrokerTest): s1.session.connection.close() s2.session.connection.close() - + def test_expected_backup_timeout(self): + """Verify that we time-out expected backups and release held queues + after a configured interval + """ + cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]); + cluster[0].wait_status("active") # Primary ready + for b in cluster[1:4]: b.wait_status("ready") # Backups ready + for i in [0,1]: cluster.kill(i, False) + cluster[2].promote() # New primary, backups will be 1 and 2 + cluster[2].wait_status("recovering") + # Should not go active till the expected backup connects or times out. + self.assertEqual(cluster[2].ha_status(), "recovering") + # Messages should be held expected backup times out + s = cluster[2].connect().session().sender("q;{create:always}") + for i in xrange(100): s.send(str(i), sync=False) + # Verify message held initially. + try: s.sync(timeout=.01); self.fail("Expected Timeout exception") + except Timeout: pass + s.sync(timeout=1) # And released after the timeout. + self.assertEqual(cluster[2].ha_status(), "active") if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |
