summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/ha_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/ha_tests.py')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py1458
1 files changed, 0 insertions, 1458 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
deleted file mode 100755
index fdcb314751..0000000000
--- a/qpid/cpp/src/tests/ha_tests.py
+++ /dev/null
@@ -1,1458 +0,0 @@
-#!/usr/bin/env python
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
-import traceback
-from qpid.datatypes import uuid4, UUID
-from brokertest import *
-from ha_test import *
-from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG, INFO
-from qpidtoollibs import BrokerAgent, EventHelper
-
-log = getLogger(__name__)
-
-class HaBrokerTest(BrokerTest):
- """Base class for HA broker tests"""
-
-class ReplicationTests(HaBrokerTest):
- """Correctness tests for HA replication."""
-
- def test_replication(self):
- """Test basic replication of configuration and messages before and
- after backup has connected"""
-
- def setup(prefix, primary):
- """Create config, send messages on the primary p"""
- a = primary.agent
-
- def queue(name, replicate):
- a.addQueue(name, options={'qpid.replicate':replicate})
- return name
-
- def exchange(name, replicate, bindq, key):
- a.addExchange("fanout", name, options={'qpid.replicate':replicate})
- a.bind(name, bindq, key)
- return name
-
- # Test replication of messages
- p = primary.connect().session()
- s = p.sender(queue(prefix+"q1", "all"))
- for m in ["a", "b", "1"]: s.send(qm.Message(m))
- # Test replication of dequeue
- self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "a")
- p.acknowledge()
-
- p.sender(queue(prefix+"q2", "configuration")).send(qm.Message("2"))
- p.sender(queue(prefix+"q3", "none")).send(qm.Message("3"))
- p.sender(exchange(prefix+"e1", "all", prefix+"q1", "key1")).send(qm.Message("4"))
- p.sender(exchange(prefix+"e2", "configuration", prefix+"q2", "key2")).send(qm.Message("5"))
- # Test unbind
- p.sender(queue(prefix+"q4", "all")).send(qm.Message("6"))
- s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4", "key4"))
- s3.send(qm.Message("7"))
- a.unbind(prefix+"e4", prefix+"q4", "key4")
- p.sender(prefix+"e4").send(qm.Message("drop1")) # Should be dropped
-
- # Test replication of deletes
- queue(prefix+"dq", "all")
- exchange(prefix+"de", "all", prefix+"dq", "")
- a.delQueue(prefix+"dq")
- a.delExchange(prefix+"de")
-
- # Need a marker so we can wait till sync is done.
- queue(prefix+"x", "configuration")
-
- def verify(b, prefix, p):
- """Verify setup was replicated to backup b"""
- # Wait for configuration to replicate.
- wait_address(b.connection, prefix+"x");
- self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
-
- self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, "b")
- p.acknowledge()
- self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
-
- self.assert_browse_retry(b, prefix+"q2", []) # configuration only
- assert not valid_address(b.connection, prefix+"q3")
-
- # Verify exchange with replicate=all
- b.sender(prefix+"e1/key1").send(qm.Message(prefix+"e1"))
- self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
-
- # Verify exchange with replicate=configuration
- b.sender(prefix+"e2/key2").send(qm.Message(prefix+"e2"))
- self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
-
- b.sender(prefix+"e4/key4").send(qm.Message("drop2")) # Verify unbind.
- self.assert_browse_retry(b, prefix+"q4", ["6","7"])
-
- # Verify deletes
- assert not valid_address(b.connection, prefix+"dq")
- assert not valid_address(b.connection, prefix+"de")
-
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- cluster = HaCluster(self, 2)
- primary = cluster[0]
- backup = cluster[1]
-
- # Send messages before re-starting the backup, test catch-up replication.
- cluster.kill(1, promote_next=False, final=False)
- setup("1", primary)
- cluster.restart(1)
-
- # Send messages after re-starting the backup, to test steady-state replication.
- setup("2", primary)
-
- p = primary.connect().session()
-
- # Verify the data on the backup
- b = backup.connect_admin().session()
- verify(b, "1", p)
- verify(b, "2", p)
- # Test a series of messages, enqueue all then dequeue all.
- primary.agent.addQueue("foo")
- s = p.sender("foo")
- wait_address(b.connection, "foo")
- msgs = [str(i) for i in range(10)]
- for m in msgs: s.send(qm.Message(m))
- self.assert_browse_retry(p, "foo", msgs)
- self.assert_browse_retry(b, "foo", msgs)
- r = p.receiver("foo")
- for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
- p.acknowledge()
- self.assert_browse_retry(p, "foo", [])
- self.assert_browse_retry(b, "foo", [])
-
- # Another series, this time verify each dequeue individually.
- for m in msgs: s.send(qm.Message(m))
- self.assert_browse_retry(p, "foo", msgs)
- self.assert_browse_retry(b, "foo", msgs)
- for i in range(len(msgs)):
- self.assertEqual(msgs[i], r.fetch(timeout=0).content)
- p.acknowledge()
- self.assert_browse_retry(p, "foo", msgs[i+1:])
- self.assert_browse_retry(b, "foo", msgs[i+1:])
- finally: l.restore()
-
- def test_sync(self):
- primary = HaBroker(self, name="primary")
- primary.promote()
- p = primary.connect().session()
- s = p.sender("q;{create:always}")
- for m in [str(i) for i in range(0,10)]: s.send(m)
- s.sync()
- backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port())
- for m in [str(i) for i in range(10,20)]: s.send(m)
- s.sync()
- backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port())
- for m in [str(i) for i in range(20,30)]: s.send(m)
- s.sync()
-
- msgs = [str(i) for i in range(30)]
- b1 = backup1.connect_admin().session()
- backup1.assert_browse_backup("q", msgs)
- backup2.assert_browse_backup("q", msgs)
-
- def test_send_receive(self):
- """Verify sequence numbers of messages sent by qpid-send"""
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- brokers = HaCluster(self, 3)
- sender = self.popen(
- ["qpid-send",
- "--broker", brokers[0].host_port(),
- "--address", "q;{create:always}",
- "--messages=1000",
- "--content-string=x",
- "--connection-options={%s}"%self.protocol_option()
- ])
- receiver = self.popen(
- ["qpid-receive",
- "--broker", brokers[0].host_port(),
- "--address", "q;{create:always}",
- "--messages=990",
- "--timeout=10",
- "--connection-options={%s}"%self.protocol_option()
- ])
- self.assertEqual(sender.wait(), 0)
- self.assertEqual(receiver.wait(), 0)
- expect = [long(i) for i in range(991, 1001)]
- sn = lambda m: m.properties["sn"]
- brokers[1].assert_browse_backup("q", expect, transform=sn)
- brokers[2].assert_browse_backup("q", expect, transform=sn)
- finally: l.restore()
-
- def test_failover_python(self):
- """Verify that backups rejects connections and that fail-over works in python client"""
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- primary = HaBroker(self, name="primary")
- primary.promote()
- backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
- # Check that backup rejects normal connections
- try:
- backup.connect().session()
- self.fail("Expected connection to backup to fail")
- except qm.ConnectionError: pass
- # Check that admin connections are allowed to backup.
- backup.connect_admin().close()
-
- # Test discovery: should connect to primary after reject by backup
- c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()],
- reconnect=True)
- s = c.session()
- sender = s.sender("q;{create:always}")
- sender.send("foo", sync=True)
- s.sync()
- primary.kill()
- assert retry(lambda: not is_running(primary.pid))
- backup.promote()
- sender.send("bar")
- self.assert_browse_retry(s, "q", ["foo", "bar"])
- c.close()
- finally: l.restore()
-
-
- def test_heartbeat_python(self):
- """Verify that a python client with a heartbeat specified disconnects
- from a stalled broker and does not hang indefinitely."""
-
- broker = Broker(self)
- broker_addr = broker.host_port()
-
- # Case 1: Connect before stalling the broker, use the connection after stalling.
- c = qm.Connection(broker_addr, heartbeat=1)
- c.open()
- os.kill(broker.pid, signal.SIGSTOP) # Stall the broker
-
- def make_sender(): c.session().sender("foo")
- self.assertRaises(qm.ConnectionError, make_sender)
-
- # Case 2: Connect to a stalled broker
- c = qm.Connection(broker_addr, heartbeat=1)
- self.assertRaises(qm.ConnectionError, c.open)
-
- # Case 3: Re-connect to a stalled broker.
- broker2 = Broker(self)
- c = qm.Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1,
- reconnect=True, reconnect_urls=[broker_addr],
- reconnect_log=False) # Hide expected warnings
- c.open()
- broker2.kill() # Cause re-connection to broker
- self.assertRaises(qm.ConnectionError, make_sender)
-
- def test_failover_cpp(self):
- """Verify that failover works in the C++ client."""
- cluster = HaCluster(self, 2)
- cluster[0].connect().session().sender("q;{create:always}")
- cluster[1].wait_backup("q")
- # FIXME aconway 2014-02-21: using 0-10, there is a failover problem with 1.0
- sender = NumberedSender(cluster[0], url=cluster.url, queue="q",
- connection_options="reconnect:true,protocol:'amqp0-10'")
- receiver = NumberedReceiver(cluster[0], url=cluster.url, queue="q",
- connection_options="reconnect:true,protocol:'amqp0-10'")
- receiver.start()
- sender.start()
- assert retry(lambda: receiver.received > 10) # Wait for some messages to get thru
- cluster.kill(0)
- n = receiver.received
- assert retry(lambda: receiver.received > n + 10) # Verify we are still going
- sender.stop()
- receiver.stop()
-
- def test_backup_failover(self):
- """Verify that a backup broker fails over and recovers queue state"""
- brokers = HaCluster(self, 3)
- brokers[0].connect().session().sender("q;{create:always}").send("a")
- brokers.kill(0)
- brokers[1].connect().session().sender("q").send("b")
- brokers[2].assert_browse_backup("q", ["a","b"])
- s = brokers[1].connect().session()
- self.assertEqual("a", s.receiver("q").fetch().content)
- s.acknowledge()
- brokers[2].assert_browse_backup("q", ["b"])
-
- def test_empty_backup_failover(self):
- """Verify that a new primary becomes active with no queues.
- Regression test for QPID-5430"""
- brokers = HaCluster(self, 3)
- brokers.kill(0)
- brokers[1].wait_status("active")
-
- def test_qpid_config_replication(self):
- """Set up replication via qpid-config"""
- brokers = HaCluster(self,2)
- brokers[0].config_declare("q","all")
- brokers[0].connect().session().sender("q").send("foo")
- brokers[1].assert_browse_backup("q", ["foo"])
-
- def test_standalone_queue_replica(self):
- """Test replication of individual queues outside of cluster mode"""
- primary = HaBroker(self, name="primary", ha_cluster=False,
- args=["--ha-queue-replication=yes"]);
- pc = primary.connect()
- ps = pc.session().sender("q;{create:always}")
- pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False,
- args=["--ha-queue-replication=yes"])
- bs = backup.connect().session()
- br = bs.receiver("q;{create:always}")
-
- def srange(*args): return [str(i) for i in xrange(*args)]
-
- for m in srange(3): ps.send(m)
- # Set up replication with qpid-ha
- backup.replicate(primary.host_port(), "q")
- backup.assert_browse_backup("q", srange(3))
- for m in srange(3,6): ps.send(str(m))
- backup.assert_browse_backup("q", srange(6))
- self.assertEqual("0", pr.fetch().content)
- pr.session.acknowledge()
- backup.assert_browse_backup("q", srange(1,6))
-
- # Set up replication with qpid-config
- ps2 = pc.session().sender("q2;{create:always}")
- backup.config_replicate(primary.host_port(), "q2");
- ps2.send("x")
- backup.assert_browse_backup("q2", ["x"])
-
-
- def test_standalone_queue_replica_failover(self):
- """Test individual queue replication from a cluster to a standalone
- backup broker, verify it fails over."""
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- cluster = HaCluster(self, 2)
- primary = cluster[0]
- pc = cluster.connect(0)
- ps = pc.session().sender("q;{create:always}")
- pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False,
- args=["--ha-queue-replication=yes"])
- br = backup.connect().session().receiver("q;{create:always}")
- backup.replicate(cluster.url, "q")
- ps.send("a")
- ps.sync()
- backup.assert_browse_backup("q", ["a"])
- cluster.bounce(0)
- backup.assert_browse_backup("q", ["a"])
- ps.send("b")
- backup.assert_browse_backup("q", ["a", "b"])
- cluster[0].wait_status("ready")
- cluster.bounce(1)
- # FIXME aconway 2014-02-20: pr does not fail over with 1.0/swig
- if qm == qpid_messaging:
- print "WARNING: Skipping SWIG client failover bug"
- return
- self.assertEqual("a", pr.fetch().content)
- pr.session.acknowledge()
- backup.assert_browse_backup("q", ["b"])
- pc.close()
- br.close()
- finally: l.restore()
-
- def test_lvq(self):
- """Verify that we replicate to an LVQ correctly"""
- cluster = HaCluster(self, 2)
- s = cluster[0].connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
-
- def send(key,value,expect):
- s.send(qm.Message(content=value,properties={"lvq-key":key}))
- cluster[1].assert_browse_backup("lvq", expect)
-
- send("a", "a-1", ["a-1"])
- send("b", "b-1", ["a-1", "b-1"])
- send("a", "a-2", ["b-1", "a-2"])
- send("a", "a-3", ["b-1", "a-3"])
- send("c", "c-1", ["b-1", "a-3", "c-1"])
- send("c", "c-2", ["b-1", "a-3", "c-2"])
- send("b", "b-2", ["a-3", "c-2", "b-2"])
- send("c", "c-3", ["a-3", "b-2", "c-3"])
- send("d", "d-1", ["a-3", "b-2", "c-3", "d-1"])
-
- def test_ring(self):
- """Test replication with the ring queue policy"""
- """Verify that we replicate to an LVQ correctly"""
- cluster = HaCluster(self, 2)
- s = cluster[0].connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
- for i in range(10): s.send(qm.Message(str(i)))
- cluster[1].assert_browse_backup("q", [str(i) for i in range(5,10)])
-
- def test_reject(self):
- """Test replication with the reject queue policy"""
- cluster = HaCluster(self, 2)
- primary, backup = cluster
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
- try:
- for i in range(10): s.send(qm.Message(str(i)), sync=False)
- except qm.LinkError: pass
- backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
- try: s.session.connection.close()
- except: pass # Expect exception from broken session
-
- def test_priority(self):
- """Verify priority queues replicate correctly"""
- cluster = HaCluster(self, 2)
- session = cluster[0].connect().session()
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
- priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
- for p in priorities: s.send(qm.Message(priority=p))
- # Can't use browse_backup as browser sees messages in delivery order not priority.
- cluster[1].wait_backup("priority-queue")
- r = cluster[1].connect_admin().session().receiver("priority-queue")
- received = [r.fetch().priority for i in priorities]
- self.assertEqual(sorted(priorities, reverse=True), received)
-
- def test_priority_fairshare(self):
- """Verify priority queues replicate correctly"""
- cluster = HaCluster(self, 2)
- primary, backup = cluster
- session = primary.connect().session()
- levels = 8
- priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
- limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}
- limit_policy = ",".join(["'qpid.fairshare':5"] + ["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()])
- s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
- messages = [qm.Message(content=str(uuid4()), priority = p) for p in priorities]
- for m in messages: s.send(m)
- backup.wait_backup(s.target)
- r = backup.connect_admin().session().receiver("priority-queue")
- received = [r.fetch().content for i in priorities]
- sort = sorted(messages, key=lambda m: priority_level(m.priority, levels), reverse=True)
- fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), levels)]
- self.assertEqual(received, fair)
-
- def test_priority_ring(self):
- cluster = HaCluster(self, 2)
- primary, backup = cluster
- s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
- priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
- for p in priorities: s.send(qm.Message(priority=p))
- expect = sorted(priorities,reverse=True)[0:5]
- primary.assert_browse("q", expect, transform=lambda m: m.priority)
- backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
-
- def test_backup_acquired(self):
- """Verify that acquired messages are backed up, for all queue types."""
- class Test:
- def __init__(self, queue, arguments, expect):
- self.queue = queue
- self.address = "%s;{create:always,node:{x-declare:{arguments:{%s}}}}"%(
- self.queue, ",".join(arguments))
- self.expect = [str(i) for i in expect]
-
- def send(self, connection):
- """Send messages, then acquire one but don't acknowledge"""
- s = connection.session()
- for m in range(10): s.sender(self.address).send(str(m))
- s.receiver(self.address).fetch()
-
- def verify(self, brokertest, backup):
- backup.assert_browse_backup(self.queue, self.expect, msg=self.queue)
-
- tests = [
- Test("plain",[],range(10)),
- Test("ring", ["'qpid.policy_type':ring", "'qpid.max_count':5"], range(5,10)),
- Test("priority",["'qpid.priorities':10"], range(10)),
- Test("fairshare", ["'qpid.priorities':10,'qpid.fairshare':5"], range(10)),
- Test("lvq", ["'qpid.last_value_queue_key':lvq-key"], [9])
- ]
-
- cluster = HaCluster(self, 3)
- cluster.kill(2, final=False) # restart after messages are sent to test catch-up
-
- c = cluster[0].connect()
- for t in tests: t.send(c) # Send messages, leave one unacknowledged.
-
- cluster.restart(2)
- cluster[2].wait_status("ready")
-
- # Verify acquired message was replicated
- for t in tests: t.verify(self, cluster[1])
- for t in tests: t.verify(self, cluster[2])
-
- def test_replicate_default(self):
- """Make sure we don't replicate if ha-replicate is unspecified or none"""
- cluster1 = HaCluster(self, 2, ha_replicate=None)
- cluster1[1].wait_status("ready")
- c1 = cluster1[0].connect().session().sender("q;{create:always}")
- cluster2 = HaCluster(self, 2, ha_replicate="none")
- cluster2[1].wait_status("ready")
- cluster2[0].connect().session().sender("q;{create:always}")
- time.sleep(.1) # Give replication a chance.
- # Expect queues not to be found
- self.assertRaises(qm.NotFound, cluster1[1].connect_admin().session().receiver, "q")
- self.assertRaises(qm.NotFound, cluster2[1].connect_admin().session().receiver, "q")
-
- def test_replicate_binding(self):
- """Verify that binding replication can be disabled"""
- cluster = HaCluster(self, 2)
- primary, backup = cluster[0], cluster[1]
- ps = primary.connect().session()
- a = primary.agent
- a.addExchange("fanout", "ex")
- a.addQueue("q")
- a.bind("ex", "q", options={'qpid.replicate':'none'})
- backup.wait_backup("q")
-
- primary.kill()
- assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
- backup.promote()
- bs = backup.connect_admin().session()
- bs.sender("ex").send(qm.Message("msg"))
- self.assert_browse_retry(bs, "q", [])
-
- def test_invalid_replication(self):
- """Verify that we reject an attempt to declare a queue with invalid replication value."""
- cluster = HaCluster(self, 1, ha_replicate="all")
- self.assertRaises(Exception, cluster[0].connect().session().sender,
- "q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
-
- def test_exclusive_queue(self):
- """Ensure that we can back-up exclusive queues, i.e. the replicating
- subscriptions are exempt from the exclusivity"""
- cluster = HaCluster(self, 2)
- def test(addr):
- c = cluster[0].connect()
- q = addr.split(";")[0]
- r = c.session().receiver(addr)
- self.assertRaises(qm.LinkError, c.session().receiver, addr)
- s = c.session().sender(q).send(q)
- cluster[1].assert_browse_backup(q, [q])
- test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
- if qm == qpid.messaging: # FIXME aconway 2014-02-20: swig client no exclusive subscribe
- test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
-
- def test_auto_delete_exclusive(self):
- """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
- cluster = HaCluster(self, 2)
- s0 = cluster[0].connect().session()
- s0.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
- s0.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
- ad = s0.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
- s0.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
- s0.receiver("q;{create:always}")
-
- s1 = cluster[1].connect_admin().session()
- cluster[1].wait_backup("q")
- assert not valid_address(s1.connection, "exad")
- assert valid_address(s1.connection, "ex")
- assert valid_address(s1.connection, "ad")
- assert valid_address(s1.connection, "time")
-
- # Verify that auto-delete queues are not kept alive by
- # replicating subscriptions
- ad.close()
- s0.sync()
- assert not valid_address(s0.connection, "ad")
-
- def test_broker_info(self):
- """Check that broker information is correctly published via management"""
- cluster = HaCluster(self, 3)
-
- def ha_broker(broker):
- ha_broker = broker.agent.getHaBroker();
- ha_broker.update()
- return ha_broker
-
- for broker in cluster: # Make sure HA system-id matches broker's
- self.assertEqual(ha_broker(broker).systemId, UUID(broker.agent.getBroker().systemRef))
-
- # Check that all brokers have the same membership as the cluster
- def check_ids(broker):
- cluster_ids = set([ ha_broker(b).systemId for b in cluster])
- broker_ids = set([m["system-id"] for m in ha_broker(broker).members])
- assert retry(lambda: cluster_ids == broker_ids, 1), "%s != %s on %s"%(cluster_ids, broker_ids, broker)
-
- for broker in cluster: check_ids(broker)
-
- # Add a new broker, check it is updated everywhere
- b = cluster.start()
- for broker in cluster: check_ids(broker)
-
- def test_auth(self):
- """Verify that authentication does not interfere with replication."""
- # TODO aconway 2012-07-09: generate test sasl config portably for cmake
- sasl_config=os.path.join(self.rootdir, "sasl_config")
- if not os.path.exists(sasl_config):
- print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config
- return
- acl=os.path.join(os.getcwd(), "policy.acl")
- aclf=file(acl,"w")
- # Minimum set of privileges required for the HA user.
- aclf.write("""
-# HA user
-acl allow zag@QPID access queue
-acl allow zag@QPID create queue
-acl allow zag@QPID consume queue
-acl allow zag@QPID delete queue
-acl allow zag@QPID access exchange
-acl allow zag@QPID create exchange
-acl allow zag@QPID bind exchange
-acl allow zag@QPID publish exchange
-acl allow zag@QPID delete exchange
-acl allow zag@QPID access method
-acl allow zag@QPID create link
-acl allow zag@QPID access query
-# Normal user
-acl allow zig@QPID all all
-acl deny all all
- """)
- aclf.close()
- cluster = HaCluster(
- self, 2,
- args=["--auth", "yes", "--sasl-config", sasl_config,
- "--acl-file", acl,
- "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
- ],
- client_credentials=Credentials("zag", "zag", "PLAIN"))
- c = cluster[0].connect(username="zig", password="zig")
- s0 = c.session();
- a = cluster[0].agent
- a.addQueue("q")
- a.addExchange("fanout", "ex")
- a.bind("ex", "q", "")
- s0.sender("ex").send("foo");
-
- # Transactions should be done over the tx_protocol
- c = cluster[0].connect(protocol=self.tx_protocol, username="zig", password="zig")
- s1 = c.session(transactional=True)
- s1.sender("ex").send("foo-tx");
- cluster[1].assert_browse_backup("q", ["foo"])
- s1.commit()
- cluster[1].assert_browse_backup("q", ["foo", "foo-tx"])
-
- def test_alternate_exchange(self):
- """Verify that alternate-exchange on exchanges and queues is propagated
- to new members of a cluster. """
- cluster = HaCluster(self, 2)
- s = cluster[0].connect().session()
- # altex exchange: acts as alternate exchange
- a = cluster[0].agent
- a.addExchange("fanout", "altex")
- # altq queue bound to altex, collect re-routed messages.
- a.addQueue("altq")
- a.bind("altex", "altq", "")
- # ex exchange with alternate-exchange altex and no queues bound
- a.addExchange("direct", "ex", {"alternate-exchange":"altex"})
- # create queue q with alternate-exchange altex
- a.addQueue("q", {"alternate-exchange":"altex"})
- # create a bunch of exchanges to ensure we don't clean up prematurely if the
- # response comes in multiple fragments.
- for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i)
-
- def verify(broker):
- c = broker.connect()
- s = c.session()
- # Verify unmatched message goes to ex's alternate.
- s.sender("ex").send("foo")
- altq = s.receiver("altq")
- self.assertEqual("foo", altq.fetch(timeout=0).content)
- s.acknowledge()
- # Verify rejected message goes to q's alternate.
- s.sender("q").send("bar")
- msg = s.receiver("q").fetch(timeout=0)
- self.assertEqual("bar", msg.content)
- s.acknowledge(msg, qm.Disposition(qm.REJECTED)) # Reject the message
- self.assertEqual("bar", altq.fetch(timeout=0).content)
- s.acknowledge()
- s.sync() # Make sure backups are caught-up.
- c.close()
-
- # Sanity check: alternate exchanges on original broker
- verify(cluster[0])
- a = cluster[0].agent
- # Altex is in use as an alternate exchange, we should get an exception
- self.assertRaises(Exception, a.delExchange, "altex")
- # Check backup that was connected during setup.
- def wait(broker):
- broker.wait_status("ready")
- for a in ["q", "ex", "altq", "altex"]:
- broker.wait_backup(a)
- wait(cluster[1])
- cluster.bounce(0)
- verify(cluster[1])
-
- # Check a newly started backup.
- cluster.start()
- wait(cluster[2])
- cluster.bounce(1)
- verify(cluster[2])
-
- # Check that alt-exchange in-use count is replicated
- a = cluster[2].agent
- self.assertRaises(Exception, a.delExchange, "altex")
- a.delQueue("q")
- self.assertRaises(Exception, a.delExchange, "altex")
- a.delExchange("ex")
- a.delExchange("altex")
-
- def test_priority_reroute(self):
- """Regression test for QPID-4262, rerouting messages from a priority queue
- to itself causes a crash"""
- cluster = HaCluster(self, 2)
- primary = cluster[0]
- session = primary.connect().session()
- a = primary.agent
- a.addQueue("pq", {'qpid.priorities':10})
- a.bind("amq.fanout", "pq")
- s = session.sender("pq")
- for m in xrange(100): s.send(qm.Message(str(m), priority=m%10))
- pq = QmfAgent(primary.host_port()).getQueue("pq")
- pq.reroute(request=0, useAltExchange=False, exchange="amq.fanout")
- # Verify that consuming is in priority order
- expect = [str(10*i+p) for p in xrange(9,-1,-1) for i in xrange(0,10) ]
- actual = [m.content for m in primary.get_messages("pq", 100)]
- self.assertEqual(expect, actual)
-
- def test_delete_missing_response(self):
- """Check that a backup correctly deletes leftover queues and exchanges that are
- missing from the initial reponse set."""
- # This test is a bit contrived, we set up the situation on backup brokers
- # and then promote one.
- cluster = HaCluster(self, 2, promote=False)
-
- # cluster[0] Will be the primary
- s = cluster[0].connect_admin().session()
- s.sender("q1;{create:always}")
- s.sender("e1;{create:always, node:{type:topic}}")
-
- # cluster[1] will be the backup, has extra queues/exchanges
- xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
- node = "node:{%s}"%(xdecl)
- s = cluster[1].connect_admin().session()
- s.sender("q1;{create:always, %s}"%(node))
- s.sender("q2;{create:always, %s}"%(node))
- s.sender("e1;{create:always, node:{type:topic, %s}}"%(xdecl))
- s.sender("e2;{create:always, node:{type:topic, %s}}"%(xdecl))
- for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a)
-
- cluster[0].promote()
- # Verify the backup deletes the surplus queue and exchange
- cluster[1].wait_status("ready")
- s = cluster[1].connect_admin().session()
- self.assertRaises(qm.NotFound, s.receiver, ("q2"));
- self.assertRaises(qm.NotFound, s.receiver, ("e2"));
-
-
- def test_delete_qpid_4285(self):
- """Regression test for QPID-4285: on deleting a queue it gets stuck in a
- partially deleted state and causes replication errors."""
- cluster = HaCluster(self,2)
- s = cluster[0].connect().session()
- s.receiver("q;{create:always}")
- cluster[1].wait_backup("q")
- cluster.kill(0) # Make the backup take over.
- s = cluster[1].connect().session()
- cluster[1].agent.delQueue("q") # Delete q on new primary
- self.assertRaises(qm.NotFound, s.receiver, "q")
- assert not cluster[1].agent.getQueue("q") # Should not be in QMF
-
- def test_auto_delete_failover(self):
- """Test auto-delete queues. Verify that:
- - queues auto-deleted on the primary are deleted on the backup.
- - auto-delete queues with/without timeout are deleted after a failover correctly
- - auto-delete queues never used (subscribe to) to are not deleted
- - messages are correctly routed to the alternate exchange.
- """
- cluster = HaCluster(self, 3)
- s = cluster[0].connect().session()
- a = cluster[0].agent
-
- def setup(q, timeout=None):
- # Create alternate exchange, auto-delete queue and queue bound to alt. ex.
- a.addExchange("fanout", q+"-altex")
- args = {"auto-delete":True, "alternate-exchange":q+"-altex"}
- if timeout is not None: args['qpid.auto_delete_timeout'] = timeout
- a.addQueue(q, args)
- a.addQueue(q+"-altq")
- a.bind("%s-altex"%q, "%s-altq"%q)
-
- for args in [["q1"],["q2",0],["q3",1],["q4"],["q5"]]: setup(*args)
- receivers = []
- for i in xrange(1,5): # Don't use q5
- q = "q%s"%i
- receivers.append(s.receiver(q)) # Subscribe
- qs = s.sender(q); qs.send(q); qs.close() # Send q name as message
-
- receivers[3].close() # Trigger auto-delete for q4
- for b in cluster[1:3]: b.wait_no_queue("q4") # Verify deleted on backups
-
- cluster[0].kill(final=False) # Kill primary
- cluster[2].promote()
- cluster.restart(0)
- cluster[2].wait_queue("q3") # Not yet auto-deleted, 1 sec timeout.
- for b in cluster:
- for q in ["q%s"%i for i in xrange(1,5)]:
- b.wait_no_queue(q,timeout=2, msg=str(b)) # auto-deleted
- b.assert_browse_backup("%s-altq"%q, [q]) # Routed to alternate
- cluster[2].wait_queue("q5") # Not auto-deleted, never subscribed
- cluster[2].connect().session().receiver("q5").close()
- cluster[2].wait_no_queue("q5")
-
- def test_auto_delete_close(self):
- """Verify auto-delete queues are deleted on backup if auto-deleted
- on primary"""
- cluster=HaCluster(self, 2)
-
- # Create altex to use as alternate exchange, with altq bound to it
- a = cluster[0].agent
- a.addExchange("fanout", "altex")
- a.addQueue("altq", {"auto-delete":True})
- a.bind("altex", "altq")
-
- p = cluster[0].connect().session()
- r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex'}}}")
- s = p.sender("adq1")
- for m in ["aa","bb","cc"]: s.send(m)
- s.close()
- cluster[1].wait_queue("adq1")
- r.close() # trigger auto-delete of adq1
- cluster[1].wait_no_queue("adq1")
- cluster[1].assert_browse_backup("altq", ["aa","bb","cc"])
-
- def test_expired(self):
- """Regression test for QPID-4379: HA does not properly handle expired messages"""
- # Race between messages expiring and HA replicating consumer.
- cluster = HaCluster(self, 2)
- s = cluster[0].connect().session().sender("q;{create:always}", capacity=2)
- def send_ttl_messages():
- for i in xrange(100): s.send(qm.Message(str(i), ttl=0.001))
- send_ttl_messages()
- cluster.start()
- send_ttl_messages()
-
- def test_missed_recreate(self):
- """If a queue or exchange is destroyed and one with the same name re-created
- while a backup is disconnected, the backup should also delete/recreate
- the object when it re-connects"""
- cluster = HaCluster(self, 3)
- sn = cluster[0].connect().session()
- # Create a queue with messages
- s = sn.sender("qq;{create:always}")
- msgs = [str(i) for i in xrange(3)]
- for m in msgs: s.send(m)
- cluster[1].assert_browse_backup("qq", msgs)
- cluster[2].assert_browse_backup("qq", msgs)
- # Set up an exchange with a binding.
- a = cluster[0].agent
- a.addExchange("fanout", "xx")
- a.addQueue("xxq")
- a.bind("xx", "xxq", "xxq")
- cluster[1].wait_address("xx")
- self.assertEqual(cluster[1].agent.getExchange("xx").values["bindingCount"], 1)
- cluster[2].wait_address("xx")
- self.assertEqual(cluster[2].agent.getExchange("xx").values["bindingCount"], 1)
-
- # Simulate the race by re-creating the objects before promoting the new primary
- cluster.kill(0, promote_next=False)
- xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
- node = "node:{%s}"%(xdecl)
- sn = cluster[1].connect_admin().session()
- a = cluster[1].agent
- a.delQueue("qq", if_empty=False)
- s = sn.sender("qq;{create:always, %s}"%(node))
- s.send("foo")
- a.delExchange("xx")
- sn.sender("xx;{create:always,node:{type:topic,%s}}"%(xdecl))
- cluster[1].promote()
- cluster[1].wait_status("active")
- # Verify we are not still using the old objects on cluster[2]
- cluster[2].assert_browse_backup("qq", ["foo"])
- cluster[2].wait_address("xx")
- self.assertEqual(cluster[2].agent.getExchange("xx").values["bindingCount"], 0)
-
- def test_resource_limit_bug(self):
- """QPID-5666 Regression test: Incorrect resource limit exception for queue creation."""
- cluster = HaCluster(self, 3)
- qs = ["q%s"%i for i in xrange(10)]
- a = cluster[0].agent
- a.addQueue("q")
- cluster[1].wait_backup("q")
- cluster.kill(0)
- cluster[1].promote()
- cluster[1].wait_status("active")
- a = cluster[1].agent
- a.delQueue("q")
- a.addQueue("q")
-
-def fairshare(msgs, limit, levels):
- """
- Generator to return prioritised messages in expected order for a given fairshare limit
- """
- count = 0
- last_priority = None
- postponed = []
- while msgs or postponed:
- if not msgs:
- msgs = postponed
- count = 0
- last_priority = None
- postponed = [ ]
- msg = msgs.pop(0)
- if last_priority and priority_level(msg.priority, levels) == last_priority:
- count += 1
- else:
- last_priority = priority_level(msg.priority, levels)
- count = 1
- l = limit(last_priority)
- if (l and count > l):
- postponed.append(msg)
- else:
- yield msg
- return
-
-def priority_level(value, levels):
- """
- Method to determine which of a distinct number of priority levels
- a given value falls into.
- """
- offset = 5-math.ceil(levels/2.0)
- return min(max(value - offset, 0), levels-1)
-
-class LongTests(HaBrokerTest):
- """Tests that can run for a long time if -DDURATION=<minutes> is set"""
-
- def duration(self):
- d = self.config.defines.get("DURATION")
- if d: return float(d)*60
- else: return 3 # Default is to be quick
-
- def test_failover_send_receive(self):
- """Test failover with continuous send-receive"""
- brokers = HaCluster(self, 3)
-
- # Start sender and receiver threads
- n = 10
- senders = [
- NumberedSender(
- brokers[0], url=brokers.url,max_depth=50,
- queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
-
- receivers = [
- NumberedReceiver(
- brokers[0], url=brokers.url, sender=senders[i],
- queue="test%s"%(i), args=["--capacity=10"]) for i in xrange(n)]
-
- for r in receivers: r.start()
- for s in senders: s.start()
-
- def wait_passed(r, n):
- """Wait for receiver r to pass n"""
- def check():
- r.check() # Verify no exceptions
- return r.received > n + 100
- assert retry(check), "Stalled %s waiting for %s, sent %s"%(
- r.queue, n, [s for s in senders if s.queue==r.queue][0].sent)
-
- for r in receivers: wait_passed(r, 0)
-
- # Kill and restart brokers in a cycle:
- endtime = time.time() + self.duration()
- i = 0
- primary = 0
- try:
- try:
- while time.time() < endtime or i < 3: # At least 3 iterations
- # Precondition: All 3 brokers running,
- # primary = index of promoted primary
- # one or two backups are running,
- for s in senders: s.sender.assert_running()
- for r in receivers: r.receiver.assert_running()
- checkpoint = [ r.received+10 for r in receivers ]
- victim = random.choice([0,1,2,primary]) # Give the primary a better chance.
- if victim == primary:
- # Don't kill primary till it is active and the next
- # backup is ready, otherwise we can lose messages.
- brokers[victim].wait_status("active")
- next = (victim+1)%3
- brokers[next].wait_status("ready")
- brokers.bounce(victim) # Next one is promoted
- primary = next
- else:
- brokers.bounce(victim, promote_next=False)
-
- # Make sure we are not stalled
- map(wait_passed, receivers, checkpoint)
- # Run another checkpoint to ensure things work in this configuration
- checkpoint = [ r.received+10 for r in receivers ]
- map(wait_passed, receivers, checkpoint)
- i += 1
- except:
- traceback.print_exc()
- raise
- finally:
- for s in senders: s.stop()
- for r in receivers: r.stop()
- dead = filter(lambda b: not b.is_running(), brokers)
- if dead: raise Exception("Brokers not running: %s"%dead)
-
- def test_tx_send_receive(self):
- brokers = HaCluster(self, 3)
- sender = self.popen(
- ["qpid-send",
- "--broker", brokers[0].host_port(),
- "--address", "q;{create:always}",
- "--messages=1000",
- "--tx=10",
- "--connection-options={protocol:%s}" % self.tx_protocol
- ])
- receiver = self.popen(
- ["qpid-receive",
- "--broker", brokers[0].host_port(),
- "--address", "q;{create:always}",
- "--messages=990",
- "--timeout=10",
- "--tx=10",
- "--connection-options={protocol:%s}" % self.tx_protocol
- ])
- self.assertEqual(sender.wait(), 0)
- self.assertEqual(receiver.wait(), 0)
- expect = [long(i) for i in range(991, 1001)]
- sn = lambda m: m.properties["sn"]
- brokers[0].assert_browse("q", expect, transform=sn)
- brokers[1].assert_browse_backup("q", expect, transform=sn)
- brokers[2].assert_browse_backup("q", expect, transform=sn)
-
-
- def test_qmf_order(self):
- """QPID 4402: HA QMF events can be out of order.
- This test mimics the test described in the JIRA. Two threads repeatedly
- declare the same auto-delete queue and close their connection.
- """
- broker = Broker(self)
- class Receiver(Thread):
- def __init__(self, qname):
- Thread.__init__(self)
- self.qname = qname
- self.stopped = False
-
- def run(self):
- while not self.stopped:
- self.connection = broker.connect()
- try:
- self.connection.session().receiver(
- self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}")
- except qm.NotFound: pass # Can occur occasionally, not an error.
- try: self.connection.close()
- except: pass
-
- class QmfObject(object):
- """Track existance of an object and validate QMF events"""
- def __init__(self, type_name, name_field, name):
- self.type_name, self.name_field, self.name = type_name, name_field, name
- self.exists = False
-
- def qmf_event(self, event):
- content = event.content[0]
- event_type = content['_schema_id']['_class_name']
- values = content['_values']
- if event_type == self.type_name+"Declare" and values[self.name_field] == self.name:
- disp = values['disp']
- log.debug("Event %s: disp=%s exists=%s"%(
- event_type, values['disp'], self.exists))
- if self.exists: assert values['disp'] == 'existing'
- else: assert values['disp'] == 'created'
- self.exists = True
- elif event_type == self.type_name+"Delete" and values[self.name_field] == self.name:
- log.debug("Event %s: exists=%s"%(event_type, self.exists))
- assert self.exists
- self.exists = False
-
- # Verify order of QMF events.
- helper = EventHelper()
- r = broker.connect().session().receiver(helper.eventAddress())
- threads = [Receiver("qq"), Receiver("qq")]
- for t in threads: t.start()
- queue = QmfObject("queue", "qName", "qq")
- finish = time.time() + self.duration()
- try:
- while time.time() < finish:
- queue.qmf_event(r.fetch())
- finally:
- for t in threads: t.stopped = True; t.join()
-
- def test_max_queues(self):
- """Verify that we behave properly if we try to exceed the max number
- of replicated queues - currently limited by the max number of channels
- in the replication link"""
- # This test is very slow (3 mins), skip it unless duration() > 1 minute.
- if self.duration() < 60: return
- # This test is written in C++ for speed, it takes a long time
- # to create 64k queues in python. See ha_test_max_queues.cpp.
- cluster = HaCluster(self, 2)
- test = self.popen(["ha_test_max_queues", cluster[0].host_port()])
- self.assertEqual(test.wait(), 0)
-
-class RecoveryTests(HaBrokerTest):
- """Tests for recovery after a failure."""
-
- def test_queue_hold(self):
- """Verify that the broker holds queues without sufficient backup,
- i.e. does not complete messages sent to those queues."""
-
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- # We don't want backups to time out for this test, set long timeout.
- cluster = HaCluster(self, 4, args=["--ha-backup-timeout=120"]);
- # Wait for the primary to be ready
- cluster[0].wait_status("active")
- for b in cluster[1:4]: b.wait_status("ready")
- # Create a queue before the failure.
- # FIXME aconway 2014-02-20: SWIG client doesn't respect sync=False
- s1 = cluster.connect(0, native=True).session().sender("q1;{create:always}")
- for b in cluster: b.wait_backup("q1")
- for i in xrange(10): s1.send(str(i), timeout=0.1)
-
- # Kill primary and 2 backups
- cluster[3].wait_status("ready")
- for i in [0,1,2]: cluster.kill(i, promote_next=False, final=False)
- cluster[3].promote() # New primary, backups will be 1 and 2
- cluster[3].wait_status("recovering")
-
- def assertSyncTimeout(s):
- self.assertRaises(qpid.messaging.Timeout, s.sync, timeout=.01)
-
- # Create a queue after the failure
- # FIXME aconway 2014-02-20: SWIG client doesn't respect sync=False
- s2 = cluster.connect(3, native=True).session().sender("q2;{create:always}")
-
- # Verify that messages sent are not completed
- for i in xrange(10,20):
- s1.send(str(i), sync=False, timeout=0.1);
- s2.send(str(i), sync=False, timeout=0.1)
-
- assertSyncTimeout(s1)
- self.assertEqual(s1.unsettled(), 10)
- assertSyncTimeout(s2)
- self.assertEqual(s2.unsettled(), 10)
-
- # Verify we can receive even if sending is on hold:
- cluster[3].assert_browse("q1", [str(i) for i in range(10)])
-
- # Restart backups, verify queues are released only when both backups are up
- cluster.restart(1)
- assertSyncTimeout(s1)
- self.assertEqual(s1.unsettled(), 10)
- assertSyncTimeout(s2)
- self.assertEqual(s2.unsettled(), 10)
- cluster.restart(2)
- cluster.restart(0)
-
- # Verify everything is up to date and active
- def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0;
- assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
- assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
- cluster[1].assert_browse_backup("q1", [str(i) for i in range(10)+range(10,20)])
- cluster[1].assert_browse_backup("q2", [str(i) for i in range(10,20)])
- cluster[3].wait_status("active"),
- s1.session.connection.close()
- s2.session.connection.close()
- finally: l.restore()
-
- def test_expected_backup_timeout(self):
- """Verify that we time-out expected backups and release held queues
- after a configured interval. Verify backup is demoted to catch-up,
- but can still rejoin.
- """
- cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
- for i in [0,1]: cluster.kill(i, False)
- cluster[2].promote() # New primary, expected backup will be 1
- # Should not go active till the expected backup connects or times out.
- cluster[2].wait_status("recovering")
- # Messages should be held till expected backup times out
- ss = cluster[2].connect().session()
- s = ss.sender("q;{create:always}")
- s.send("foo", sync=False)
- self.assertEqual(s.unsettled(), 1) # Verify message not settled immediately.
- s.sync(timeout=1) # And settled after timeout.
- cluster[2].wait_status("active")
-
- def test_join_ready_cluster(self):
- """If we join a cluster where the primary is dead, the new primary is
- not yet promoted and there are ready backups then we should refuse
- promotion so that one of the ready backups can be chosen."""
- cluster = HaCluster(self, 2)
- cluster[0].wait_status("active")
- cluster[1].wait_status("ready")
- cluster.bounce(0, promote_next=False)
- self.assertRaises(Exception, cluster[0].promote)
- os.kill(cluster[1].pid, signal.SIGSTOP) # Test for timeout if unresponsive.
- cluster.bounce(0, promote_next=False)
- cluster[0].promote()
-
- def test_stalled_backup(self):
- """Make sure that a stalled backup broker does not stall the primary"""
- cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"])
- os.kill(cluster[1].pid, signal.SIGSTOP)
- s = cluster[0].connect().session()
- s.sender("q;{create:always}").send("x")
- self.assertEqual("x", s.receiver("q").fetch(0).content)
-
-class StoreTests(HaBrokerTest):
- """Test for HA with persistence."""
-
- def check_skip(self):
- if not BrokerTest.store_lib:
- print "WARNING: skipping HA+store tests, no store lib found."
- return not BrokerTest.store_lib
-
- def test_store_recovery(self):
- """Verify basic store and recover functionality"""
- if self.check_skip(): return
- cluster = HaCluster(self, 1)
- sn = cluster[0].connect().session()
- # Create queue qq, exchange exx and binding between them
- s = sn.sender("qq;{create:always,node:{durable:true}}")
- sk = sn.sender("exx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}}}")
- cluster[0].agent.bind("exx", "qq", "k")
- for m in ["foo", "bar", "baz"]: s.send(qm.Message(m, durable=True))
- r = cluster[0].connect().session().receiver("qq")
- self.assertEqual(r.fetch().content, "foo")
- r.session.acknowledge()
- # Sending this message is a hack to flush the dequeue operation on qq.
- s.send(qm.Message("flush", durable=True))
-
- def verify(broker, x_count):
- sn = broker.connect().session()
- assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"])
- sn.sender("exx/k").send(qm.Message("x", durable=True))
- assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"])
-
- verify(cluster[0], 0) # Sanity check
- cluster.bounce(0)
- cluster[0].wait_status("active")
- verify(cluster[0], 1) # Loaded from store
- cluster.start()
- cluster[1].wait_status("ready")
- cluster.kill(0)
- cluster[1].wait_status("active")
- verify(cluster[1], 2)
- cluster.bounce(1, promote_next=False)
- cluster[1].promote()
- cluster[1].wait_status("active")
- verify(cluster[1], 3)
-
- def test_catchup_store(self):
- """Verify that a backup erases queue data from store recovery before
- doing catch-up from the primary."""
- if self.check_skip(): return
- cluster = HaCluster(self, 2)
- sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
- s1 = sn.sender("q1;{create:always,node:{durable:true}}")
- for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True))
- s2 = sn.sender("q2;{create:always,node:{durable:true}}")
- sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}}}")
- cluster[0].agent.bind("ex", "q2", "k2")
- sk2.send(qm.Message("hello", durable=True))
- # Wait for backup to catch up.
- cluster[1].assert_browse_backup("q1", ["foo","bar"])
- cluster[1].assert_browse_backup("q2", ["hello"])
- # Make changes that the backup doesn't see
- cluster.kill(1, promote_next=False, final=False)
- r1 = cluster[0].connect(heartbeat=HaBroker.heartbeat).session().receiver("q1")
- for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
- r1.session.acknowledge()
- for m in ["x","y","z"]: s1.send(qm.Message(m, durable=True))
- cluster[0].agent.unbind("ex", "q2", "k2")
- cluster[0].agent.bind("ex", "q1", "k1")
- # Restart both brokers from store to get inconsistent sequence numbering.
- cluster.bounce(0, promote_next=False)
- cluster[0].promote()
- cluster[0].wait_status("active")
- cluster.restart(1)
- cluster[1].wait_status("ready")
-
- # Verify state
- cluster[0].assert_browse("q1", ["x","y","z"])
- cluster[1].assert_browse_backup("q1", ["x","y","z"])
-
- sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
- sn.sender("ex/k1").send("boo")
- cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
- cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
- sn.sender("ex/k2").send("hoo") # q2 was unbound so this should be dropped.
- sn.sender("q2").send("end") # mark the end of the queue for assert_browse
- cluster[0].assert_browse("q2", ["hello", "end"])
- cluster[1].assert_browse_backup("q2", ["hello", "end"])
-
-def open_read(name):
- try:
- f = open(name)
- return f.read()
- finally: f.close()
-
-class TransactionTests(HaBrokerTest):
-
- def tx_simple_setup(self, cluster, broker=0):
- """Start a transaction, remove messages from queue a, add messages to queue b"""
- c = cluster.connect(broker, protocol=self.tx_protocol)
- # Send messages to a, no transaction.
- sa = c.session().sender("a;{create:always,node:{durable:true}}")
- tx_msgs = ["x","y","z"]
- for m in tx_msgs: sa.send(qm.Message(content=m, durable=True))
- sa.close()
-
- # Receive messages from a, in transaction.
- tx = c.session(transactional=True)
- txr = tx.receiver("a")
- tx_msgs2 = [txr.fetch(1).content for i in xrange(3)]
- self.assertEqual(tx_msgs, tx_msgs2)
-
- # Send messages to b, transactional, mixed with non-transactional.
- sb = c.session().sender("b;{create:always,node:{durable:true}}")
- txs = tx.sender("b")
- msgs = [str(i) for i in xrange(3)]
- for tx_m,m in zip(tx_msgs2, msgs):
- txs.send(tx_m);
- sb.send(m)
- sb.close()
- return tx
-
- def test_tx_simple_commit(self):
- cluster = HaCluster(self, 2, test_store=True, wait=True)
- tx = self.tx_simple_setup(cluster)
- tx.sync()
- tx.acknowledge()
- # Pre transaction - messages are acquired on primary but not yet dequeued
- # so still there on backup.
- cluster[0].assert_browse_backup("a", [])
- cluster[1].assert_browse_backup("a", ['x', 'y', 'z'])
- for b in cluster:
- b.assert_browse_backup("b", ['0', '1', '2'])
- tx.commit()
- tx.sync()
- tx.close()
-
- # Post transaction: all synced.
- for b in cluster:
- b.assert_browse_backup("a", [])
- b.assert_browse_backup("b", ['0', '1', '2', "x","y","z"])
-
- # Verify non-tx dequeue is replicated correctly
- c = cluster.connect(0, protocol=self.tx_protocol)
- r = c.session().receiver("b")
- ri = receiver_iter(r, timeout=1)
- self.assertEqual(['0', '1', '2', 'x', 'y', 'z'], [m.content for m in ri])
- r.session.acknowledge()
- for b in cluster: b.assert_browse_backup("b", [], msg=b)
- c.close()
- tx.connection.close()
-
- def test_tx_simple_rollback(self):
- cluster = HaCluster(self, 2, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.sync()
- tx.acknowledge()
- tx.rollback()
-
- for b in cluster:
- b.assert_browse_backup("a", ["x","y","z"])
- b.assert_browse_backup("b", ['0', '1', '2'])
-
- tx.close()
- tx.connection.close()
-
-
- def test_tx_simple_failure(self):
- """Verify we throw TransactionAborted if there is a fail-over during a transaction"""
- cluster = HaCluster(self, 3, test_store=True)
- tx = self.tx_simple_setup(cluster)
- tx.sync()
- tx_queues = cluster[0].agent.tx_queues()
- tx.acknowledge()
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- cluster.bounce(0) # Should cause roll-back
- tx.connection.session() # Wait for reconnect
- self.assertRaises(qm.TransactionAborted, tx.sync)
- self.assertRaises(qm.TransactionAborted, tx.commit)
- try: tx.connection.close()
- except qm.TransactionAborted: pass # Occasionally get exception on close.
- for b in cluster:
- b.assert_browse_backup("a", ["x","y","z"])
- b.assert_browse_backup("b", ['0', '1', '2'])
- finally: l.restore()
-
- def test_tx_join_leave(self):
- """Test cluster members joining/leaving cluster.
- Also check that tx-queues are cleaned up at end of transaction."""
-
- cluster = HaCluster(self, 3)
-
- # Leaving
- tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
- s = tx.sender("q;{create:always}")
- s.send("a", sync=True)
- cluster[1].kill(final=False)
- s.send("b")
- tx.commit()
- tx.connection.close()
- for b in [cluster[0],cluster[2]]:
- b.assert_browse_backup("q", ["a","b"], msg=b)
- # Joining
- tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
- s = tx.sender("q;{create:always}")
- s.send("foo")
- cluster.restart(1) # Not a part of the current transaction.
- tx.commit()
- tx.connection.close()
- # The new member is not in the tx but receives the results normal replication.
- for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
-
- def test_tx_block_threads(self):
- """Verify that TXs blocked in commit don't deadlock."""
- cluster = HaCluster(self, 2, args=["--worker-threads=2"], test_store=True)
- n = 10 # Number of concurrent transactions
- sessions = [cluster.connect(0, protocol=self.tx_protocol).session(transactional=True) for i in xrange(n)]
- # Have the store delay the response for 10s
- for s in sessions:
- sn = s.sender("qq;{create:always,node:{durable:true}}")
- sn.send(qm.Message("foo", durable=True))
- threads = [ Thread(target=s.commit) for s in sessions]
- for t in threads: t.start()
- cluster[0].ready(timeout=1) # Check for deadlock
- for b in cluster: b.assert_browse_backup('qq', ['foo']*n)
- for t in threads: t.join()
- for s in sessions: s.connection.close()
-
- def test_other_tx_tests(self):
- try:
- import qpid_tests.broker_0_10
- except ImportError:
- raise Skipped("Tests not found")
- cluster = HaCluster(self, 3)
- if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"]
- self.popen(["qpid-txtest2", "--broker", cluster[0].host_port()]).assert_exit_ok()
- print
- self.popen(["qpid-python-test",
- "-m", "qpid_tests.broker_0_10",
- "-m", "qpid_tests.broker_1_0",
- "-b", "localhost:%s"%(cluster[0].port()),
- "*.tx.*"], stdout=None, stderr=None).assert_exit_ok()
-
-if __name__ == "__main__":
- outdir = "ha_tests.tmp"
- shutil.rmtree(outdir, True)
- os.execvp("qpid-python-test",
- ["qpid-python-test", "-m", "ha_tests", "-DOUTDIR=%s"%outdir]
- + sys.argv[1:])
-