summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
committerAlan Conway <aconway@apache.org>2012-02-29 23:38:00 +0000
commit3fb61f5354df0b77325e4fc88aa59213d3000a8e (patch)
tree2fe1900c3e715586e75f1d7ba2687b2a7e3f5547 /qpid/cpp/src
parentc71af5478c87527b4bd0eb9e0e4e37a9b151ea92 (diff)
downloadqpid-python-3fb61f5354df0b77325e4fc88aa59213d3000a8e.tar.gz
QPID-3603: HA support for stand-alone replication.
- New management method HaBroker.replicate to enable replication. - qpid-ha tool can enable replication of queues. - qpid-config tool can create queues with replication enabled. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295339 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp33
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp8
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h4
-rw-r--r--qpid/cpp/src/qpid/ha/management-schema.xml5
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py170
6 files changed, 175 insertions, 48 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 4af1e6d6bd..56a90e7fb7 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -206,11 +206,9 @@ void Link::closed(int, std::string text)
QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
connection = 0;
-
if (state == STATE_OPERATIONAL) {
stringstream addr;
addr << host << ":" << port;
- QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
if (!hideManagement() && agent)
agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
}
@@ -405,7 +403,6 @@ uint Link::nextChannel()
void Link::notifyConnectionForced(const string text)
{
Mutex::ScopedLock mutex(lock);
-
setStateLH(STATE_FAILED);
if (!hideManagement())
mgmtObject->set_lastError(text);
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index d92749abeb..f909aca44f 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -25,8 +25,11 @@
#include "ReplicatingSubscription.h"
#include "qpid/Exception.h"
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/ha/Package.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
@@ -50,7 +53,6 @@ const std::string BACKUP="backup";
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
: broker(b),
settings(s),
- backup(new Backup(b, s)),
mgmtObject(0)
{
// Register a factory for replicating subscriptions.
@@ -72,6 +74,9 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
sys::Mutex::ScopedLock l(lock);
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
+
+ // If we are in a cluster, we start in backup mode.
+ if (settings.cluster) backup.reset(new Backup(b, s));
}
HaBroker::~HaBroker() {}
@@ -81,8 +86,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
if (backup.get()) { // I am a backup
- // FIXME aconway 2012-01-26: create primary state before resetting backup
- // as that allows client connections.
+ // NOTE: resetting backup allows client connections, so any
+ // primary state should be set up here before backup.reset()
backup.reset();
QPID_LOG(notice, "HA: Primary promoted from backup");
mgmtObject->set_status(PRIMARY);
@@ -100,7 +105,27 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
break;
- }
+ }
+ case _qmf::HaBroker::METHOD_REPLICATE: {
+ _qmf::ArgsHaBrokerReplicate& bq_args =
+ dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
+ QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker);
+
+ boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+ Url url(bq_args.i_broker);
+ string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+ std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
+ url[0].host, url[0].port, protocol,
+ false, // durable
+ settings.mechanism, settings.username, settings.password);
+ boost::shared_ptr<broker::Link> link = result.first;
+ link->setUrl(url);
+ // Create a queue replicator
+ boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ broker.getExchanges().registerExchange(qr);
+ qr->activate();
+ break;
+ }
default:
return Manageable::STATUS_UNKNOWN_METHOD;
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index b3080330fb..6a43b591b0 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -31,7 +31,7 @@ struct Options : public qpid::Options {
Settings& settings;
Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
addOptions()
- ("ha-cluster", optValue(settings.enabled, "yes|no"),
+ ("ha-cluster", optValue(settings.cluster, "yes|no"),
"Join a HA active/passive cluster.")
("ha-brokers", optValue(settings.brokerUrl,"URL"),
"URL that backup brokers use to connect and fail over.")
@@ -63,11 +63,7 @@ struct HaPlugin : public Plugin {
void initialize(Plugin::Target& target) {
broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
- if (broker && settings.enabled) {
- QPID_LOG(notice, "HA: Enabled");
- haBroker.reset(new ha::HaBroker(*broker, settings));
- } else
- QPID_LOG(notice, "HA: Disabled");
+ if (broker) haBroker.reset(new ha::HaBroker(*broker, settings));
}
};
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index 52a64c8330..7df18b4ef4 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -33,8 +33,8 @@ namespace ha {
class Settings
{
public:
- Settings() : enabled(false), expectedBackups(0) {}
- bool enabled;
+ Settings() : cluster(false), expectedBackups(0) {}
+ bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
size_t expectedBackups;
diff --git a/qpid/cpp/src/qpid/ha/management-schema.xml b/qpid/cpp/src/qpid/ha/management-schema.xml
index 05ed5f02ce..9a815b346c 100644
--- a/qpid/cpp/src/qpid/ha/management-schema.xml
+++ b/qpid/cpp/src/qpid/ha/management-schema.xml
@@ -47,6 +47,11 @@
<method name="setExpectedBackups" desc="Set number of backups expected">
<arg name="expectedBackups" type="uint16" dir="I"/>
</method>
+
+ <method name="replicate" desc="Replicate from a remote queue to the local broker.">
+ <arg name="broker" type="sstr" dir="I"/>
+ <arg name="queue" type="sstr" dir="I"/>
+ </method>
</class>
</schema>
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 264a636f29..18f47b17c5 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -24,34 +24,81 @@ from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG
-
+from qpidtoollibs.broker import BrokerAgent
log = getLogger("qpid.ha-tests")
class HaBroker(Broker):
- def __init__(self, test, args=[], broker_url=None, **kwargs):
+ def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
- args=["--load-module", BrokerTest.ha_lib,
- # FIXME aconway 2012-02-13: workaround slow link failover.
- "--link-maintenace-interval=0.1",
- "--ha-cluster=yes"]
- if broker_url: args += [ "--ha-brokers", broker_url ]
+ args = copy(args)
+ args.extend(["--load-module", BrokerTest.ha_lib,
+ # FIXME aconway 2012-02-13: workaround slow link failover.
+ "--link-maintenace-interval=0.1",
+ "--ha-cluster=%s"%ha_cluster])
+ if broker_url: args.extend([ "--ha-brokers", broker_url ])
Broker.__init__(self, test, args, **kwargs)
+ self.commands=os.getenv("PYTHON_COMMANDS")
+ assert os.path.isdir(self.commands)
def promote(self):
- assert os.system("$QPID_HA_EXEC promote -b %s"%(self.host_port())) == 0
+ assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
def set_client_url(self, url):
assert os.system(
- "$QPID_HA_EXEC set --public-brokers=%s -b %s"%(url,self.host_port())) == 0
+ "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
def set_broker_url(self, url):
assert os.system(
- "$QPID_HA_EXEC set --brokers=%s -b %s"%(url, self.host_port())) == 0
+ "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
+
+ def replicate(self, from_broker, queue):
+ assert os.system(
+ "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+ def config_replicate(self, from_broker, queue):
+ assert os.system(
+ "%s/qpid-config --broker=%s add queue --replicate-from %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+ def config_declare(self, queue, replication):
+ assert os.system(
+ "%s/qpid-config --broker=%s add queue %s --replication %s"%(self.commands, self.host_port(), queue, replication)) == 0
+
+class HaCluster(object):
+ _cluster_count = 0
+
+ def __init__(self, test, n, **kwargs):
+ """Start a cluster of n brokers"""
+ self.test = test
+ self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+ HaCluster._cluster_count += 1
+ self[0].promote()
+ self.url = ",".join([b.host_port() for b in self])
+ for b in self: b.set_broker_url(self.url)
+
+ def connect(self, i):
+ """Connect with reconnect_urls"""
+ return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
-def set_broker_urls(brokers):
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
+ def kill(self, i):
+ """Kill broker i, promote broker i+1"""
+ self[i].kill()
+ self[i].expect = EXPECT_EXIT_FAIL
+ self[(i+1) % len(self)].promote()
+
+ def bounce(self, i):
+ """Stop and restart a broker in a cluster."""
+ self.kill(i)
+ b = self[i]
+ self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+
+ # Behave like a list of brokers.
+ def __len__(self): return len(self._brokers)
+ def __getitem__(self,index): return self._brokers[index]
+ def __iter__(self): return self._brokers.__iter__()
+
+
+def qr_node(value="messages"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
class ShortTests(BrokerTest):
"""Short HA functionality tests."""
@@ -92,6 +139,8 @@ class ShortTests(BrokerTest):
"""Test basic replication of configuration and messages before and
after backup has connected"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+
def queue(name, replicate):
return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
@@ -177,12 +226,9 @@ class ShortTests(BrokerTest):
self.assert_browse_retry(p, "foo", msgs[i+1:])
self.assert_browse_retry(b, "foo", msgs[i+1:])
- def qpid_replicate(self, value="messages"):
- return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-
def test_sync(self):
def queue(name, replicate):
- return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
+ return "%s;{create:always,%s}"%(name, qr_node(replicate))
primary = HaBroker(self, name="primary")
primary.promote()
p = primary.connect().session()
@@ -206,6 +252,7 @@ class ShortTests(BrokerTest):
def test_send_receive(self):
"""Verify sequence numbers of messages sent by qpid-send"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary")
primary.promote()
backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
@@ -213,14 +260,14 @@ class ShortTests(BrokerTest):
sender = self.popen(
["qpid-send",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--address", "q;{create:always,%s}"%(qr_node("messages")),
"--messages=1000",
"--content-string=x"
])
receiver = self.popen(
["qpid-receive",
"--broker", primary.host_port(),
- "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+ "--address", "q;{create:always,%s}"%(qr_node("messages")),
"--messages=990",
"--timeout=10"
])
@@ -239,7 +286,7 @@ class ShortTests(BrokerTest):
def test_failover_python(self):
"""Verify that backups rejects connections and that fail-over works in python client"""
- getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
@@ -254,7 +301,7 @@ class ShortTests(BrokerTest):
# Test discovery: should connect to primary after reject by backup
c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
s = c.session()
- sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ sender = s.sender("q;{create:always,%s}"%(qr_node()))
self.wait_backup(backup, "q")
sender.send("foo")
primary.kill()
@@ -269,7 +316,7 @@ class ShortTests(BrokerTest):
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
- primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
+ primary.connect().session().sender("q;{create:always,%s}"%(qr_node()))
self.wait_backup(backup, "q")
sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
@@ -288,19 +335,75 @@ class ShortTests(BrokerTest):
receiver.stop()
def test_backup_failover(self):
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
- for name in ["a","b","c"] ]
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
- brokers[0].promote()
+ """Verify that a backup broker fails over and recovers queue state"""
+ brokers = HaCluster(self, 3)
brokers[0].connect().session().sender(
- "q;{create:always,%s}"%(self.qpid_replicate())).send("a")
+ "q;{create:always,%s}"%(qr_node())).send("a")
for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
- brokers[0].kill()
- brokers[2].promote() # c must fail over to b.
- brokers[2].connect().session().sender("q").send("b")
- self.assert_browse_backup(brokers[1], "q", ["a","b"])
- for b in brokers[1:]: b.kill()
+ brokers[0].expect = EXPECT_EXIT_FAIL
+ brokers.kill(0)
+ brokers[1].connect().session().sender("q").send("b")
+ self.assert_browse_backup(brokers[2], "q", ["a","b"])
+ s = brokers[1].connect().session()
+ self.assertEqual("a", s.receiver("q").fetch().content)
+ s.acknowledge()
+ self.assert_browse_backup(brokers[2], "q", ["b"])
+
+ def test_qpid_config_replication(self):
+ """Set up replication via qpid-config"""
+ brokers = HaCluster(self,2)
+ brokers[0].config_declare("q","messages")
+ brokers[0].connect().session().sender("q").send("foo")
+ self.assert_browse_backup(brokers[1], "q", ["foo"])
+
+ def test_standalone_queue_replica(self):
+ """Test replication of individual queues outside of cluster mode"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"])
+ pc = primary.connect()
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ br = backup.connect().session().receiver("q;{create:always}")
+
+ # Set up replication with qpid-ha
+ backup.replicate(primary.host_port(), "q")
+ ps.send("a")
+ self.assert_browse_backup(backup, "q", ["a"])
+ ps.send("b")
+ self.assert_browse_backup(backup, "q", ["a", "b"])
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ self.assert_browse_backup(backup, "q", ["b"])
+
+ # Set up replication with qpid-config
+ ps2 = pc.session().sender("q2;{create:always}")
+ backup.config_replicate(primary.host_port(), "q2");
+ ps2.send("x")
+ self.assert_browse_backup(backup, "q2", ["x"])
+
+
+ def test_queue_replica_failover(self):
+ """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ cluster = HaCluster(self, 2)
+ primary = cluster[0]
+ pc = cluster.connect(0)
+ ps = pc.session().sender("q;{create:always,%s}"%qr_node("messages"))
+ pr = pc.session().receiver("q;{create:always,%s}"%qr_node("messages"))
+ backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+ br = backup.connect().session().receiver("q;{create:always}")
+ backup.replicate(cluster.url, "q")
+ ps.send("a")
+ self.assert_browse_backup(backup, "q", ["a"])
+ cluster.bounce(0)
+ self.assert_browse_backup(backup, "q", ["a"])
+ ps.send("b")
+ self.assert_browse_backup(backup, "q", ["a", "b"])
+ cluster.bounce(1)
+ self.assertEqual("a", pr.fetch().content)
+ pr.session.acknowledge()
+ self.assert_browse_backup(backup, "q", ["b"])
def test_lvq(self):
"""Verify that we replicate to an LVQ correctly"""
@@ -328,6 +431,7 @@ class ShortTests(BrokerTest):
self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
def test_reject(self):
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary")
primary.promote()
backup = HaBroker(self, name="backup", broker_url=primary.host_port())