diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-15 21:05:34 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-15 21:05:34 +0000 |
| commit | 55305747e6e7f931756bfa21460c37e350f5ea0f (patch) | |
| tree | c6b826cac45092f95c428671a13024aad62b3400 /qpid/cpp/src/tests | |
| parent | 80a0832a2fe775ff217e6353f003226eb3f18d89 (diff) | |
| download | qpid-python-55305747e6e7f931756bfa21460c37e350f5ea0f.tar.gz | |
QPID-3603: HA broker backup/primary ready checks.
- Introduce HA broker state machien
- Inform backup queues when ready.
- Incomplete implementation of backup ready check.
- does not count correctly after a failover, see countUnready.
- Existing replicator bridges updated out of sync with BrokerReplicator initialize.
- Does not handle multi-messages responses.
- Newly promoted HA primary waits for backups to be ready before accepting clients.
- Uniform log prefixes for HA messages.
- qpid-ha tests, call qpid-ha python code directly.
- Move excluder from Backup to HaBroker, it is also used in PROMOTING.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338889 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
| -rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 18 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 164 |
2 files changed, 125 insertions, 57 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 8255fbe9ac..257ac68b74 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -77,17 +77,19 @@ def error_line(filename, n=1): return ":\n" + "".join(result) def retry(function, timeout=10, delay=.01): - """Call function until it returns True or timeout expires. - Double the delay for each retry. Return True if function - returns true, False if timeout expires.""" + """Call function until it returns a true value or timeout expires. + Double the delay for each retry. Returns what function returns if + true, None if timeout expires.""" deadline = time.time() + timeout - while not function(): + ret = None + while not ret: + ret = function() remaining = deadline - time.time() if remaining <= 0: return False delay = min(delay, remaining) time.sleep(delay) delay *= 2 - return True + return ret class AtomicCounter: def __init__(self): @@ -298,9 +300,9 @@ class Broker(Popen): # Read port from broker process stdout if not already read. if (self._port == 0): try: self._port = int(self.stdout.readline()) - except ValueError: - raise Exception("Can't get port for broker %s (%s)%s" % - (self.name, self.pname, error_line(self.log,5))) + except ValueError, e: + raise Exception("Can't get port for broker %s (%s)%s: %s" % + (self.name, self.pname, error_line(self.log,5), e)) return self._port def unexpected(self,msg): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 827cb7dca9..15137a0c5f 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -28,13 +28,23 @@ from qpidtoollibs import BrokerAgent log = getLogger(__name__) +class QmfHaBroker(object): + def __init__(self, address): + self.connection = Connection.establish( + address, client_properties={"qpid.ha-admin":1}) + self.qmf = BrokerAgent(self.connection) + self.ha_broker = self.qmf.getHaBroker() + if not self.ha_broker: + raise Exception("HA module is not loaded on broker at %s"%address) + class HaBroker(Broker): def __init__(self, test, args=[], broker_url=None, ha_cluster=True, ha_replicate="all", **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=info+", "--log-enable=debug+:ha::", + "--log-enable=info+", + "--log-enable=debug+:ha::", # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] @@ -42,32 +52,31 @@ class HaBroker(Broker): args += [ "--ha-replicate=%s"%ha_replicate ] if broker_url: args.extend([ "--ha-brokers", broker_url ]) Broker.__init__(self, test, args, **kwargs) - self.commands=os.getenv("PYTHON_COMMANDS") - assert os.path.isdir(self.commands) + self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") + assert os.path.exists(self.qpid_ha_path) + self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") + assert os.path.exists(self.qpid_config_path) getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + self.qpid_ha_script=import_script(self.qpid_ha_path) - def promote(self): - assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0 - - def set_client_url(self, url): - assert os.system( - "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0 + def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args) - def set_broker_url(self, url): - assert os.system( - "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0 + def promote(self): self.qpid_ha(["promote"]) + def set_client_url(self, url): self.qpid_ha(["set", "--public-brokers", url]) + def set_broker_url(self, url): self.qpid_ha(["set", "--brokers", url]) + def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + def ha_status(self): QmfHaBroker(self.host_port()).ha_broker.status - def replicate(self, from_broker, queue): - assert os.system( - "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + # FIXME aconway 2012-05-01: do direct python call to qpid-config code. + def qpid_config(self, args): + assert subprocess.call( + [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 def config_replicate(self, from_broker, queue): - assert os.system( - "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) def config_declare(self, queue, replication): - assert os.system( - "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0 + self.qpid_config(["add", "queue", queue, "--replicate", replication]) def connect_admin(self, **kwargs): return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) @@ -86,17 +95,47 @@ class HaBroker(Broker): assert_browse_retry(bs, queue, expected, **kwargs) finally: bs.connection.close() + def assert_connect_fail(self): + try: + self.connect() + self.test.fail("Expected ConnectionError") + except ConnectionError: pass + + def connect_retry(self): + def try_connect(): + try: return self.connect() + except ConnectionError: return None + c = retry(try_connect) + if c: return c + else: self.test.fail("Failed to connect") + class HaCluster(object): _cluster_count = 0 def __init__(self, test, n, **kwargs): """Start a cluster of n brokers""" self.test = test - self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)] + self.kwargs = kwargs + self._brokers = [] + self.id = HaCluster._cluster_count HaCluster._cluster_count += 1 + for i in xrange(n): self.start(False) + self.update_urls() + self[0].promote() + + def start(self, update_urls=True): + """Start a new broker in the cluster""" + b = HaBroker( + self.test, + name="broker%s-%s"%(self.id, len(self._brokers)), + **self.kwargs) + self._brokers.append(b) + if update_urls: self.update_urls() + return b + + def update_urls(self): self.url = ",".join([b.host_port() for b in self]) for b in self: b.set_broker_url(self.url) - self[0].promote() def connect(self, i): """Connect with reconnect_urls""" @@ -108,11 +147,15 @@ class HaCluster(object): self[i].expect = EXPECT_EXIT_FAIL self[(i+1) % len(self)].promote() + def restart(self, i): + b = self._brokers[i] + self._brokers[i] = HaBroker( + self.test, name=b.name, port=b.port(), broker_url=self.url, **self.kwargs) + def bounce(self, i): """Stop and restart a broker in a cluster.""" self.kill(i) - b = self[i] - self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url) + self.restart(i) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -344,6 +387,7 @@ class ReplicationTests(BrokerTest): def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary", ha_cluster=False) pc = primary.connect() ps = pc.session().sender("q;{create:always}") @@ -559,6 +603,26 @@ class ReplicationTests(BrokerTest): test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}") + def test_promoting(self): + """Verify that the primary broker does not go active until expected + backups have connected or timeout expires.""" + cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"]) + c = cluster[0].connect() + for i in xrange(10): + s = c.session().sender("q%s;{create:always}"%i) + for j in xrange(100): s.send(str(j)) + cluster.kill(0) # Fail over to 1 + cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients. + cluster.restart(0) + c = cluster[1].connect_retry() + cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]); + + # Verify in logs that all queue catch-up happened before the transition to active. + log = open(cluster[1].log).read() + i = log.find("Status change: promoting -> active") + self.failIf(i < 0) + self.assertEqual(log.find("caught up", i), -1) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -602,14 +666,15 @@ class LongTests(BrokerTest): else: return 3 # Default is to be quick - def disable_test_failover(self): + def disable_test_failover_send_receive(self): """Test failover with continuous send-receive""" # FIXME aconway 2012-02-03: fails due to dropped messages, # known issue: sending messages to new primary before # backups are ready. Enable when fixed. # Start a cluster, all members will be killed during the test. - brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) + brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL, + args=["--ha-expected-backups=2"]) for name in ["ha0","ha1","ha2"] ] url = ",".join([b.host_port() for b in brokers]) for b in brokers: b.set_broker_url(url) @@ -620,30 +685,31 @@ class LongTests(BrokerTest): receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) receiver.start() sender.start() - # Wait for sender & receiver to get up and running - assert retry(lambda: receiver.received > 100) - # Kill and restart brokers in a cycle: - endtime = time.time() + self.duration() - i = 0 - while time.time() < endtime or i < 3: # At least 3 iterations - sender.sender.assert_running() - receiver.receiver.assert_running() - port = brokers[i].port() - brokers[i].kill() - brokers.append( - HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, - expect=EXPECT_EXIT_FAIL)) - i += 1 - brokers[i].promote() - n = receiver.received # Verify we're still running - def enough(): - receiver.check() # Verify no exceptions - return receiver.received > n + 100 - assert retry(enough, timeout=5) - - sender.stop() - receiver.stop() - for b in brokers[i:]: b.kill() + try: + # Wait for sender & receiver to get up and running + assert retry(lambda: receiver.received > 100) + # Kill and restart brokers in a cycle: + endtime = time.time() + self.duration() + i = 0 + while time.time() < endtime or i < 3: # At least 3 iterations + sender.sender.assert_running() + receiver.receiver.assert_running() + port = brokers[i].port() + brokers[i].kill() + brokers.append( + HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, + expect=EXPECT_EXIT_FAIL)) + i += 1 + brokers[i].promote() + n = receiver.received # Verify we're still running + def enough(): + receiver.check() # Verify no exceptions + return receiver.received > n + 100 + assert retry(enough, timeout=5) + finally: + sender.stop() + receiver.stop() + for b in brokers[i:]: b.kill() if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |
