summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py10
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py36
2 files changed, 43 insertions, 3 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 9eebfa952f..d7885d9622 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -142,7 +142,9 @@ class HaBroker(Broker):
# FIXME aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
assert subprocess.call(
- [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
+ [self.qpid_config_path, "--broker", self.host_port()]+args,
+ stdout=1, stderr=subprocess.STDOUT
+ ) == 0
def config_replicate(self, from_broker, queue):
self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
@@ -160,12 +162,14 @@ class HaBroker(Broker):
else:
return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
- def wait_backup(self, address):
- """Wait for address to become valid on a backup broker."""
+ def wait_address(self, address):
+ """Wait for address to become valid on the broker."""
bs = self.connect_admin().session()
try: wait_address(bs, address)
finally: bs.connection.close()
+ def wait_backup(self, address): self.wait_address(address)
+
def assert_browse(self, queue, expected, **kwargs):
"""Verify queue contents by browsing."""
bs = self.connect().session()
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 60c68730f8..d25e68b29c 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -270,6 +270,7 @@ class ReplicationTests(HaBrokerTest):
def test_qpid_config_replication(self):
"""Set up replication via qpid-config"""
brokers = HaCluster(self,2)
+ brokers[0].wait_status("active")
brokers[0].config_declare("q","all")
brokers[0].connect().session().sender("q").send("foo")
brokers[1].assert_browse_backup("q", ["foo"])
@@ -830,6 +831,41 @@ acl deny all all
verify_qmf_events("q2")
finally: l.restore()
+ def test_missed_recreate(self):
+ """If a queue or exchange is destroyed and one with the same name re-created
+ while a backup is disconnected, the backup should also delete/recreate
+ the object when it re-connects"""
+ cluster = HaCluster(self, 3)
+ sn = cluster[0].connect().session()
+ # Create a queue with messages
+ s = sn.sender("qq;{create:always}")
+ msgs = [str(i) for i in xrange(3)]
+ for m in msgs: s.send(m)
+ cluster[1].assert_browse_backup("qq", msgs)
+ cluster[2].assert_browse_backup("qq", msgs)
+ # Set up an exchange with a binding.
+ sn.sender("xx;{create:always,node:{type:topic}}")
+ sn.sender("xxq;{create:always,node:{x-bindings:[{exchange:'xx',queue:'xxq',key:xxq}]}}")
+ cluster[1].wait_address("xx")
+ self.assertEqual(cluster[1].agent().getExchange("xx").values["bindingCount"], 1)
+ cluster[2].wait_address("xx")
+ self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1)
+
+ # Simulate the race by re-creating the objects before promoting the new primary
+ cluster.kill(0, False)
+ sn = cluster[1].connect_admin().session()
+ sn.sender("qq;{delete:always}").close()
+ s = sn.sender("qq;{create:always}")
+ s.send("foo")
+ sn.sender("xx;{delete:always}").close()
+ sn.sender("xx;{create:always,node:{type:topic}}")
+ cluster[1].promote()
+ cluster[1].wait_status("active")
+ # Verify we are not still using the old objects on cluster[2]
+ cluster[2].assert_browse_backup("qq", ["foo"])
+ cluster[2].wait_address("xx")
+ self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 0)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit