summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp69
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h11
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp16
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h11
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h3
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py33
7 files changed, 112 insertions, 33 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index 42758c4689..360b6892ab 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -46,6 +46,8 @@ struct Options : public qpid::Options {
"Password for connections between HA brokers")
("ha-mechanism", optValue(settings.mechanism, "MECH"),
"Authentication mechanism for connections between HA brokers")
+ ("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"),
+ "Maximum time to wait for an expected backup to connect and become ready.")
;
}
};
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 56598c2b5a..a1ce81c3a5 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -32,6 +32,7 @@
#include "qpid/broker/Queue.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Timer.h"
#include <boost/bind.hpp>
namespace qpid {
@@ -40,7 +41,7 @@ namespace ha {
using sys::Mutex;
namespace {
-// No-op connection observer, allows all connections.
+
class PrimaryConnectionObserver : public broker::ConnectionObserver
{
public:
@@ -61,6 +62,15 @@ class PrimaryConfigurationObserver : public broker::ConfigurationObserver
Primary& primary;
};
+class ExpectedBackupTimerTask : public sys::TimerTask {
+ public:
+ ExpectedBackupTimerTask(Primary& p, sys::AbsTime deadline)
+ : TimerTask(deadline, "ExpectedBackupTimerTask"), primary(p) {}
+ void fire() { primary.timeoutExpectedBackups(); }
+ private:
+ Primary& primary;
+};
+
} // namespace
Primary* Primary::instance = 0;
@@ -75,16 +85,24 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
else {
// NOTE: RemoteBackups must be created before we set the ConfigurationObserver
- // orr ConnectionObserver so that there is no client activity while
+ // or ConnectionObserver so that there is no client activity while
// the QueueGuards are created.
QPID_LOG(debug, logPrefix << "Promoted, expected backups: " << expect);
for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
- bool guard = true; // Create queue guards immediately for expected backups.
boost::shared_ptr<RemoteBackup> backup(
- new RemoteBackup(*i, haBroker.getBroker(), haBroker.getReplicationTest(), guard));
+ new RemoteBackup(
+ *i, haBroker.getBroker(), haBroker.getReplicationTest(),
+ true, // Create queue guards immediately for expected backups.
+ false // Not yet connected.
+ ));
backups[i->getSystemId()] = backup;
- if (!backup->isReady()) initialBackups.insert(backup);
+ if (!backup->isReady()) expectedBackups.insert(backup);
}
+ // Set timeout for expected brokers to connect and become ready.
+ sys::Duration timeout(hb.getSettings().backupTimeout*sys::TIME_SEC);
+ sys::AbsTime deadline(sys::now(), timeout);
+ timerTask.reset(new ExpectedBackupTimerTask(*this, deadline));
+ hb.getBroker().getTimer().add(timerTask);
}
configurationObserver.reset(new PrimaryConfigurationObserver(*this));
@@ -98,14 +116,15 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
}
Primary::~Primary() {
+ if (timerTask) timerTask->cancel();
haBroker.getBroker().getConfigurationObservers().remove(configurationObserver);
}
void Primary::checkReady(Mutex::ScopedLock&) {
- if (!active && initialBackups.empty()) {
+ if (!active && expectedBackups.empty()) {
active = true;
- QPID_LOG(notice, logPrefix << "All initial backups are ready.");
Mutex::ScopedUnlock u(lock); // Don't hold lock across callback
+ QPID_LOG(notice, logPrefix << "Finished waiting for backups, primary is active.");
haBroker.activate();
}
}
@@ -113,13 +132,26 @@ void Primary::checkReady(Mutex::ScopedLock&) {
void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
if (i != backups.end() && i->second->isReady()) {
BrokerInfo info = i->second->getBrokerInfo();
+ QPID_LOG(info, "Expected backup is ready: " << info);
info.setStatus(READY);
haBroker.addBroker(info);
- initialBackups.erase(i->second);
+ expectedBackups.erase(i->second);
checkReady(l);
}
}
+void Primary::timeoutExpectedBackups() {
+ sys::Mutex::ScopedLock l(lock);
+ if (active) return; // Already activated
+ for (BackupSet::iterator i = expectedBackups.begin(); i != expectedBackups.end(); ++i)
+ {
+ QPID_LOG(error, "Expected backup timed out: " << (*i)->getBrokerInfo());
+ (*i)->cancel();
+ }
+ expectedBackups.clear();
+ checkReady(l);
+}
+
void Primary::readyReplica(const ReplicatingSubscription& rs) {
sys::Mutex::ScopedLock l(lock);
BackupMap::iterator i = backups.find(rs.getBrokerInfo().getSystemId());
@@ -153,12 +185,17 @@ void Primary::opened(broker::Connection& connection) {
BackupMap::iterator i = backups.find(info.getSystemId());
if (i == backups.end()) {
QPID_LOG(debug, logPrefix << "New backup connected: " << info);
- bool guard = false; // Lazy-create guards for new backups. Creating them here could deadlock.
backups[info.getSystemId()].reset(
- new RemoteBackup(info, haBroker.getBroker(), haBroker.getReplicationTest(), guard));
+ new RemoteBackup(
+ info, haBroker.getBroker(), haBroker.getReplicationTest(),
+ false, // Lazy-create guards for new backups, creating now deadlocks
+ true // Backup is connected
+ ));
}
else {
QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
+ i->second->setConnected(true);
+ checkReady(i, l);
}
haBroker.addBroker(info);
}
@@ -171,14 +208,16 @@ void Primary::closed(broker::Connection& connection) {
Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- haBroker.removeBroker(info.getSystemId());
QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
+ haBroker.removeBroker(info.getSystemId());
+ BackupMap::iterator i = backups.find(info.getSystemId());
+ if (i != backups.end()) i->second->setConnected(false);
}
- // NOTE: we do not modify backups here, we only add to the known backups set
- // we never remove from it.
-
+ // NOTE: we do not remove from the backups map here, the backups map holds
+ // all the backups we know about whether connected or not.
+ //
// It is possible for a backup connection to be rejected while we are a backup,
- // but the closed is seen when we have become primary. Removing the entry
+ // but the closed is seen after we have become primary. Removing the entry
// from backups in this case would be incorrect.
}
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
index 3de579a88d..26883f4416 100644
--- a/qpid/cpp/src/qpid/ha/Primary.h
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -26,6 +26,7 @@
#include "BrokerInfo.h"
#include "qpid/sys/Mutex.h"
#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
#include <map>
#include <string>
@@ -38,6 +39,10 @@ class ConnectionObserver;
class ConfigurationObserver;
}
+namespace sys {
+class TimerTask;
+}
+
namespace ha {
class HaBroker;
class ReplicatingSubscription;
@@ -74,6 +79,9 @@ class Primary
boost::shared_ptr<QueueGuard> getGuard(const QueuePtr& q, const BrokerInfo&);
+ // Called in timer thread when the deadline for expected backups expires.
+ void timeoutExpectedBackups();
+
private:
typedef std::map<types::Uuid, boost::shared_ptr<RemoteBackup> > BackupMap;
typedef std::set<boost::shared_ptr<RemoteBackup> > BackupSet;
@@ -89,7 +97,7 @@ class Primary
* Set of expected backups that must be ready before we declare ourselves
* active
*/
- BackupSet initialBackups;
+ BackupSet expectedBackups;
/**
* Map of all the remote backups we know about: any expected backups plus
* all actual backups that have connected. We do not remove entries when a
@@ -98,6 +106,7 @@ class Primary
BackupMap backups;
boost::shared_ptr<broker::ConnectionObserver> connectionObserver;
boost::shared_ptr<broker::ConfigurationObserver> configurationObserver;
+ boost::intrusive_ptr<sys::TimerTask> timerTask;
static Primary* instance;
};
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 7e65b287b0..2b8a0077f5 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -31,21 +31,21 @@ namespace ha {
using sys::Mutex;
RemoteBackup::RemoteBackup(
- const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg) :
- logPrefix("Primary remote backup "+info.getLogId()+": "), brokerInfo(info), replicationTest(rt),
- createGuards(cg)
+ const BrokerInfo& info, broker::Broker& broker, ReplicationTest rt, bool cg, bool con) :
+ logPrefix("Primary remote backup "+info.getLogId()+": "),
+ brokerInfo(info), replicationTest(rt),
+ createGuards(cg), connected(con)
{
QPID_LOG(debug, logPrefix << "Guarding queues for backup broker.");
broker.getQueues().eachQueue(boost::bind(&RemoteBackup::initialQueue, this, _1));
}
-RemoteBackup::~RemoteBackup() {
- for (GuardMap::iterator i = guards.begin(); i != guards.end(); ++i)
- i->second->cancel();
-}
+RemoteBackup::~RemoteBackup() { cancel(); }
+
+void RemoteBackup::cancel() { guards.clear(); }
bool RemoteBackup::isReady() {
- return initialQueues.empty();
+ return connected && initialQueues.empty();
}
void RemoteBackup::initialQueue(const QueuePtr& q) {
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index f2e46c8042..8ea539b167 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -52,12 +52,17 @@ class RemoteBackup
typedef boost::shared_ptr<broker::Queue> QueuePtr;
/** Note: isReady() can be true after construction */
- RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt, bool createGuards);
+ RemoteBackup(const BrokerInfo& info, broker::Broker&, ReplicationTest rt,
+ bool createGuards, bool connected);
~RemoteBackup();
/** Return guard associated with a queue. Used to create ReplicatingSubscription. */
GuardPtr guard(const QueuePtr&);
+ /** Is the remote backup connected? */
+ void setConnected(bool b) { connected=b; }
+ bool isConnected() const { return connected; }
+
/** ReplicatingSubscription associated with queue is ready.
* Note: may set isReady()
*/
@@ -72,6 +77,9 @@ class RemoteBackup
/**@return true when all initial queues for this backup are ready. */
bool isReady();
+ /**Cancel all queue guards, called if we are timed out. */
+ void cancel();
+
BrokerInfo getBrokerInfo() const { return brokerInfo; }
private:
typedef std::map<QueuePtr, GuardPtr> GuardMap;
@@ -83,6 +91,7 @@ class RemoteBackup
GuardMap guards;
QueueSet initialQueues;
bool createGuards;
+ bool connected;
void initialQueue(const QueuePtr&);
};
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 213a5f64d5..1a612aee66 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -34,7 +34,7 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), replicateDefault(NONE)
+ Settings() : cluster(false), replicateDefault(NONE), backupTimeout(2)
{}
bool cluster; // True if we are a cluster member.
@@ -42,6 +42,7 @@ class Settings
std::string brokerUrl;
Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
+ double backupTimeout;
private:
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 3612837214..01dac2664d 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -869,9 +869,9 @@ class RecoveryTests(BrokerTest):
cluster[3].promote() # New primary, backups will be 1 and 2
cluster[3].wait_status("recovering")
- def trySync(s):
+ def assertSyncTimeout(s):
try:
- s.sync(timeout=.1)
+ s.sync(timeout=.01)
self.fail("Expected Timeout exception")
except Timeout: pass
@@ -879,18 +879,18 @@ class RecoveryTests(BrokerTest):
s2 = cluster.connect(3).session().sender("q2;{create:always}")
# Verify that messages sent are not completed
for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
- trySync(s1)
+ assertSyncTimeout(s1)
self.assertEqual(s1.unsettled(), 100)
- trySync(s2)
+ assertSyncTimeout(s2)
self.assertEqual(s2.unsettled(), 100)
# Verify we can receive even if sending is on hold:
cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
# Restart backups, verify queues are released only when both backups are up
cluster.restart(1)
- trySync(s1)
+ assertSyncTimeout(s1)
self.assertEqual(s1.unsettled(), 100)
- trySync(s2)
+ assertSyncTimeout(s2)
self.assertEqual(s2.unsettled(), 100)
self.assertEqual(cluster[3].ha_status(), "recovering")
cluster.restart(2)
@@ -904,7 +904,26 @@ class RecoveryTests(BrokerTest):
s1.session.connection.close()
s2.session.connection.close()
-
+ def test_expected_backup_timeout(self):
+ """Verify that we time-out expected backups and release held queues
+ after a configured interval
+ """
+ cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
+ cluster[0].wait_status("active") # Primary ready
+ for b in cluster[1:4]: b.wait_status("ready") # Backups ready
+ for i in [0,1]: cluster.kill(i, False)
+ cluster[2].promote() # New primary, backups will be 1 and 2
+ cluster[2].wait_status("recovering")
+ # Should not go active till the expected backup connects or times out.
+ self.assertEqual(cluster[2].ha_status(), "recovering")
+ # Messages should be held expected backup times out
+ s = cluster[2].connect().session().sender("q;{create:always}")
+ for i in xrange(100): s.send(str(i), sync=False)
+ # Verify message held initially.
+ try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
+ except Timeout: pass
+ s.sync(timeout=1) # And released after the timeout.
+ self.assertEqual(cluster[2].ha_status(), "active")
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)