diff options
| author | Alan Conway <aconway@apache.org> | 2012-02-29 23:38:00 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-02-29 23:38:00 +0000 |
| commit | 3fb61f5354df0b77325e4fc88aa59213d3000a8e (patch) | |
| tree | 2fe1900c3e715586e75f1d7ba2687b2a7e3f5547 /qpid/cpp/src | |
| parent | c71af5478c87527b4bd0eb9e0e4e37a9b151ea92 (diff) | |
| download | qpid-python-3fb61f5354df0b77325e4fc88aa59213d3000a8e.tar.gz | |
QPID-3603: HA support for stand-alone replication.
- New management method HaBroker.replicate to enable replication.
- qpid-ha tool can enable replication of queues.
- qpid-config tool can create queues with replication enabled.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 33 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/HaPlugin.cpp | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Settings.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/management-schema.xml | 5 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 170 |
6 files changed, 175 insertions, 48 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 4af1e6d6bd..56a90e7fb7 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -206,11 +206,9 @@ void Link::closed(int, std::string text) QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text); connection = 0; - if (state == STATE_OPERATIONAL) { stringstream addr; addr << host << ":" << port; - QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str()); if (!hideManagement() && agent) agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } @@ -405,7 +403,6 @@ uint Link::nextChannel() void Link::notifyConnectionForced(const string text) { Mutex::ScopedLock mutex(lock); - setStateLH(STATE_FAILED); if (!hideManagement()) mgmtObject->set_lastError(text); diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index d92749abeb..f909aca44f 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -25,8 +25,11 @@ #include "ReplicatingSubscription.h" #include "qpid/Exception.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/Link.h" +#include "qpid/broker/Queue.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/ha/Package.h" +#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h" @@ -50,7 +53,6 @@ const std::string BACKUP="backup"; HaBroker::HaBroker(broker::Broker& b, const Settings& s) : broker(b), settings(s), - backup(new Backup(b, s)), mgmtObject(0) { // Register a factory for replicating subscriptions. @@ -72,6 +74,9 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s) sys::Mutex::ScopedLock l(lock); if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); + + // If we are in a cluster, we start in backup mode. + if (settings.cluster) backup.reset(new Backup(b, s)); } HaBroker::~HaBroker() {} @@ -81,8 +86,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { if (backup.get()) { // I am a backup - // FIXME aconway 2012-01-26: create primary state before resetting backup - // as that allows client connections. + // NOTE: resetting backup allows client connections, so any + // primary state should be set up here before backup.reset() backup.reset(); QPID_LOG(notice, "HA: Primary promoted from backup"); mgmtObject->set_status(PRIMARY); @@ -100,7 +105,27 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: { setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l); break; - } + } + case _qmf::HaBroker::METHOD_REPLICATE: { + _qmf::ArgsHaBrokerReplicate& bq_args = + dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); + QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker); + + boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); + Url url(bq_args.i_broker); + string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; + std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare( + url[0].host, url[0].port, protocol, + false, // durable + settings.mechanism, settings.username, settings.password); + boost::shared_ptr<broker::Link> link = result.first; + link->setUrl(url); + // Create a queue replicator + boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + broker.getExchanges().registerExchange(qr); + qr->activate(); + break; + } default: return Manageable::STATUS_UNKNOWN_METHOD; diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index b3080330fb..6a43b591b0 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -31,7 +31,7 @@ struct Options : public qpid::Options { Settings& settings; Options(Settings& s) : qpid::Options("HA Options"), settings(s) { addOptions() - ("ha-cluster", optValue(settings.enabled, "yes|no"), + ("ha-cluster", optValue(settings.cluster, "yes|no"), "Join a HA active/passive cluster.") ("ha-brokers", optValue(settings.brokerUrl,"URL"), "URL that backup brokers use to connect and fail over.") @@ -63,11 +63,7 @@ struct HaPlugin : public Plugin { void initialize(Plugin::Target& target) { broker::Broker* broker = dynamic_cast<broker::Broker*>(&target); - if (broker && settings.enabled) { - QPID_LOG(notice, "HA: Enabled"); - haBroker.reset(new ha::HaBroker(*broker, settings)); - } else - QPID_LOG(notice, "HA: Disabled"); + if (broker) haBroker.reset(new ha::HaBroker(*broker, settings)); } }; diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index 52a64c8330..7df18b4ef4 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -33,8 +33,8 @@ namespace ha { class Settings { public: - Settings() : enabled(false), expectedBackups(0) {} - bool enabled; + Settings() : cluster(false), expectedBackups(0) {} + bool cluster; // True if we are a cluster member. std::string clientUrl; std::string brokerUrl; size_t expectedBackups; diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml index 05ed5f02ce..9a815b346c 100644 --- a/qpid/cpp/src/qpid/ha/management-schema.xml +++ b/qpid/cpp/src/qpid/ha/management-schema.xml @@ -47,6 +47,11 @@ <method name="setExpectedBackups" desc="Set number of backups expected"> <arg name="expectedBackups" type="uint16" dir="I"/> </method> + + <method name="replicate" desc="Replicate from a remote queue to the local broker."> + <arg name="broker" type="sstr" dir="I"/> + <arg name="queue" type="sstr" dir="I"/> + </method> </class> </schema> diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 264a636f29..18f47b17c5 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -24,34 +24,81 @@ from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG - +from qpidtoollibs.broker import BrokerAgent log = getLogger("qpid.ha-tests") class HaBroker(Broker): - def __init__(self, test, args=[], broker_url=None, **kwargs): + def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - args=["--load-module", BrokerTest.ha_lib, - # FIXME aconway 2012-02-13: workaround slow link failover. - "--link-maintenace-interval=0.1", - "--ha-cluster=yes"] - if broker_url: args += [ "--ha-brokers", broker_url ] + args = copy(args) + args.extend(["--load-module", BrokerTest.ha_lib, + # FIXME aconway 2012-02-13: workaround slow link failover. + "--link-maintenace-interval=0.1", + "--ha-cluster=%s"%ha_cluster]) + if broker_url: args.extend([ "--ha-brokers", broker_url ]) Broker.__init__(self, test, args, **kwargs) + self.commands=os.getenv("PYTHON_COMMANDS") + assert os.path.isdir(self.commands) def promote(self): - assert os.system("$QPID_HA_EXEC promote -b %s"%(self.host_port())) == 0 + assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0 def set_client_url(self, url): assert os.system( - "$QPID_HA_EXEC set --public-brokers=%s -b %s"%(url,self.host_port())) == 0 + "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0 def set_broker_url(self, url): assert os.system( - "$QPID_HA_EXEC set --brokers=%s -b %s"%(url, self.host_port())) == 0 + "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0 + + def replicate(self, from_broker, queue): + assert os.system( + "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + + def config_replicate(self, from_broker, queue): + assert os.system( + "%s/qpid-config --broker=%s add queue --replicate-from %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + + def config_declare(self, queue, replication): + assert os.system( + "%s/qpid-config --broker=%s add queue %s --replication %s"%(self.commands, self.host_port(), queue, replication)) == 0 + +class HaCluster(object): + _cluster_count = 0 + + def __init__(self, test, n, **kwargs): + """Start a cluster of n brokers""" + self.test = test + self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)] + HaCluster._cluster_count += 1 + self[0].promote() + self.url = ",".join([b.host_port() for b in self]) + for b in self: b.set_broker_url(self.url) + + def connect(self, i): + """Connect with reconnect_urls""" + return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) -def set_broker_urls(brokers): - url = ",".join([b.host_port() for b in brokers]) - for b in brokers: b.set_broker_url(url) + def kill(self, i): + """Kill broker i, promote broker i+1""" + self[i].kill() + self[i].expect = EXPECT_EXIT_FAIL + self[(i+1) % len(self)].promote() + + def bounce(self, i): + """Stop and restart a broker in a cluster.""" + self.kill(i) + b = self[i] + self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url) + + # Behave like a list of brokers. + def __len__(self): return len(self._brokers) + def __getitem__(self,index): return self._brokers[index] + def __iter__(self): return self._brokers.__iter__() + + +def qr_node(value="messages"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value class ShortTests(BrokerTest): """Short HA functionality tests.""" @@ -92,6 +139,8 @@ class ShortTests(BrokerTest): """Test basic replication of configuration and messages before and after backup has connected""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + def queue(name, replicate): return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate) @@ -177,12 +226,9 @@ class ShortTests(BrokerTest): self.assert_browse_retry(p, "foo", msgs[i+1:]) self.assert_browse_retry(b, "foo", msgs[i+1:]) - def qpid_replicate(self, value="messages"): - return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value - def test_sync(self): def queue(name, replicate): - return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate)) + return "%s;{create:always,%s}"%(name, qr_node(replicate)) primary = HaBroker(self, name="primary") primary.promote() p = primary.connect().session() @@ -206,6 +252,7 @@ class ShortTests(BrokerTest): def test_send_receive(self): """Verify sequence numbers of messages sent by qpid-send""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary") primary.promote() backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) @@ -213,14 +260,14 @@ class ShortTests(BrokerTest): sender = self.popen( ["qpid-send", "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), + "--address", "q;{create:always,%s}"%(qr_node("messages")), "--messages=1000", "--content-string=x" ]) receiver = self.popen( ["qpid-receive", "--broker", primary.host_port(), - "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")), + "--address", "q;{create:always,%s}"%(qr_node("messages")), "--messages=990", "--timeout=10" ]) @@ -239,7 +286,7 @@ class ShortTests(BrokerTest): def test_failover_python(self): """Verify that backups rejects connections and that fail-over works in python client""" - getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) @@ -254,7 +301,7 @@ class ShortTests(BrokerTest): # Test discovery: should connect to primary after reject by backup c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True) s = c.session() - sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate())) + sender = s.sender("q;{create:always,%s}"%(qr_node())) self.wait_backup(backup, "q") sender.send("foo") primary.kill() @@ -269,7 +316,7 @@ class ShortTests(BrokerTest): primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) - primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate())) + primary.connect().session().sender("q;{create:always,%s}"%(qr_node())) self.wait_backup(backup, "q") sender = NumberedSender(primary, url=url, queue="q", failover_updates = False) @@ -288,19 +335,75 @@ class ShortTests(BrokerTest): receiver.stop() def test_backup_failover(self): - brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) - for name in ["a","b","c"] ] - url = ",".join([b.host_port() for b in brokers]) - for b in brokers: b.set_broker_url(url) - brokers[0].promote() + """Verify that a backup broker fails over and recovers queue state""" + brokers = HaCluster(self, 3) brokers[0].connect().session().sender( - "q;{create:always,%s}"%(self.qpid_replicate())).send("a") + "q;{create:always,%s}"%(qr_node())).send("a") for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"]) - brokers[0].kill() - brokers[2].promote() # c must fail over to b. - brokers[2].connect().session().sender("q").send("b") - self.assert_browse_backup(brokers[1], "q", ["a","b"]) - for b in brokers[1:]: b.kill() + brokers[0].expect = EXPECT_EXIT_FAIL + brokers.kill(0) + brokers[1].connect().session().sender("q").send("b") + self.assert_browse_backup(brokers[2], "q", ["a","b"]) + s = brokers[1].connect().session() + self.assertEqual("a", s.receiver("q").fetch().content) + s.acknowledge() + self.assert_browse_backup(brokers[2], "q", ["b"]) + + def test_qpid_config_replication(self): + """Set up replication via qpid-config""" + brokers = HaCluster(self,2) + brokers[0].config_declare("q","messages") + brokers[0].connect().session().sender("q").send("foo") + self.assert_browse_backup(brokers[1], "q", ["foo"]) + + def test_standalone_queue_replica(self): + """Test replication of individual queues outside of cluster mode""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"]) + pc = primary.connect() + ps = pc.session().sender("q;{create:always}") + pr = pc.session().receiver("q;{create:always}") + backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"]) + br = backup.connect().session().receiver("q;{create:always}") + + # Set up replication with qpid-ha + backup.replicate(primary.host_port(), "q") + ps.send("a") + self.assert_browse_backup(backup, "q", ["a"]) + ps.send("b") + self.assert_browse_backup(backup, "q", ["a", "b"]) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + self.assert_browse_backup(backup, "q", ["b"]) + + # Set up replication with qpid-config + ps2 = pc.session().sender("q2;{create:always}") + backup.config_replicate(primary.host_port(), "q2"); + ps2.send("x") + self.assert_browse_backup(backup, "q2", ["x"]) + + + def test_queue_replica_failover(self): + """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over.""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + cluster = HaCluster(self, 2) + primary = cluster[0] + pc = cluster.connect(0) + ps = pc.session().sender("q;{create:always,%s}"%qr_node("messages")) + pr = pc.session().receiver("q;{create:always,%s}"%qr_node("messages")) + backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"]) + br = backup.connect().session().receiver("q;{create:always}") + backup.replicate(cluster.url, "q") + ps.send("a") + self.assert_browse_backup(backup, "q", ["a"]) + cluster.bounce(0) + self.assert_browse_backup(backup, "q", ["a"]) + ps.send("b") + self.assert_browse_backup(backup, "q", ["a", "b"]) + cluster.bounce(1) + self.assertEqual("a", pr.fetch().content) + pr.session.acknowledge() + self.assert_browse_backup(backup, "q", ["b"]) def test_lvq(self): """Verify that we replicate to an LVQ correctly""" @@ -328,6 +431,7 @@ class ShortTests(BrokerTest): self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)]) def test_reject(self): + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary") primary.promote() backup = HaBroker(self, name="backup", broker_url=primary.host_port()) |
