diff options
Diffstat (limited to 'qpid/cpp/src/tests/ha_test.py')
-rwxr-xr-x | qpid/cpp/src/tests/ha_test.py | 401 |
1 files changed, 0 insertions, 401 deletions
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py deleted file mode 100755 index 3659185140..0000000000 --- a/qpid/cpp/src/tests/ha_test.py +++ /dev/null @@ -1,401 +0,0 @@ -#!/usr/bin/env python - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random -import traceback -from brokertest import * -from threading import Thread, Lock, Condition -from logging import getLogger, WARN, ERROR, DEBUG, INFO -from qpidtoollibs import BrokerAgent -from qpid.harness import Skipped - -log = getLogger(__name__) - -class LogLevel: - """ - Temporarily change the log settings on the root logger. - Used to suppress expected WARN messages from the python client. - """ - def __init__(self, level): - self.save_level = getLogger().getEffectiveLevel() - getLogger().setLevel(level) - - def restore(self): - getLogger().setLevel(self.save_level) - -class QmfAgent(object): - """Access to a QMF broker agent.""" - def __init__(self, address, **kwargs): - self._connection = qm.Connection.establish( - address, client_properties={"qpid.ha-admin":1}, **kwargs) - self._agent = BrokerAgent(self._connection) - - def queues(self): - return [q.values['name'] for q in self._agent.getAllQueues()] - - def repsub_queue(self, sub): - """If QMF subscription sub is a replicating subscription return - the name of the replicated queue, else return None""" - session = self.getSession(sub.sessionRef) - if not session: return None - m = re.search("qpid.ha-q:(.*)\.", session.name) - return m and m.group(1) - - def repsub_queues(self): - """Return queue names for all replicating subscriptions""" - return filter(None, [self.repsub_queue(s) for s in self.getAllSubscriptions()]) - - def tx_queues(self): - """Return names of all tx-queues""" - return [q for q in self.queues() if q.startswith("qpid.ha-tx")] - - def __getattr__(self, name): - a = getattr(self._agent, name) - return a - -class Credentials(object): - """SASL credentials: username, password, and mechanism""" - def __init__(self, username, password, mechanism): - (self.username, self.password, self.mechanism) = (username, password, mechanism) - - def __str__(self): return "Credentials%s"%(self.tuple(),) - - def tuple(self): return (self.username, self.password, self.mechanism) - - def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) - -class HaPort: - """Many HA tests need to allocate a broker port dynamically and then kill - and restart a broker on that same port multiple times. qpidd --port=0 only - ensures the port for the initial broker process, subsequent brokers re-using - the same port may fail with "address already in use". - - HaPort binds and listens to the port and returns a file descriptor to pass - to qpidd --socket-fd. It holds on to the port untill the end of the test so - the broker can restart multiple times. - """ - - def __init__(self, test, port=0): - """Bind and listen to port. port=0 allocates a port dynamically. - self.port is the allocated port, self.fileno is the file descriptor for - qpid --socket-fd.""" - - self.test = test - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.bind(("", port)) - self.socket.listen(5) - self.port = self.socket.getsockname()[1] - self.fileno = self.socket.fileno() - self.stopped = False - test.teardown_add(self) # Stop during test.tearDown - - def teardown(self): # Called in tearDown - if not self.stopped: - self.stopped = True - self.socket.shutdown(socket.SHUT_RDWR) - self.socket.close() - - def __str__(self): return "HaPort<port:%s, fileno:%s>"%(self.port, self.fileno) - - -class HaBroker(Broker): - """Start a broker with HA enabled - @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. - """ - - heartbeat=5 - - def __init__(self, test, ha_port=None, args=[], brokers_url=None, ha_cluster=True, - ha_replicate="all", client_credentials=None, **kwargs): - assert BrokerTest.ha_lib, "Cannot locate HA plug-in" - ha_port = ha_port or HaPort(test) - args = copy(args) - args += ["--load-module", BrokerTest.ha_lib, - # Non-standard settings for faster tests. - "--link-maintenance-interval=0.1", - "--ha-cluster=%s"%ha_cluster] - # Add default --log-enable arguments unless args already has --log arguments. - if not env_has_log_config() and not [l for l in args if l.startswith("--log")]: - args += ["--log-enable=info+", "--log-enable=debug+:ha::"] - if not [h for h in args if h.startswith("--link-heartbeat-interval")]: - args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)] - - if ha_replicate is not None: - args += [ "--ha-replicate=%s"%ha_replicate ] - if brokers_url: args += [ "--ha-brokers-url", brokers_url ] - # Set up default ACL - acl=os.path.join(os.getcwd(), "unrestricted.acl") - if not os.path.exists(acl): - aclf=file(acl,"w") - aclf.write(""" -acl allow all all - """) - aclf.close() - if not "--acl-file" in args: - args += [ "--acl-file", acl, ] - args += ["--socket-fd=%s"%ha_port.fileno, "--listen-disable=tcp"] - self._agent = None - self.client_credentials = client_credentials - self.ha_port = ha_port - Broker.__init__(self, test, args, port=ha_port.port, **kwargs) - - # Do some static setup to locate the qpid-config and qpid-ha tools. - @property - def qpid_ha_script(self): - if not hasattr(self, "_qpid_ha_script"): - qpid_ha_exec = os.path.join(os.getenv("SOURCE_DIR"), "management", - "python", "bin", "qpid-ha") - self._qpid_ha_script = import_script(qpid_ha_exec) - return self._qpid_ha_script - - def __repr__(self): return "<HaBroker:%s:%d>"%(self.log, self.port()) - - def qpid_ha(self, args): - if not self.qpid_ha_script: - raise Skipped("qpid-ha not available") - try: - cred = self.client_credentials - url = self.host_port() - if cred: - url =cred.add_user(url) - args = args + ["--sasl-mechanism", cred.mechanism] - self.qpid_ha_script.main_except(["", "-b", url]+args) - except Exception, e: - raise Exception("Error in qpid_ha -b %s %s: %s"%(url, args,e)) - - def promote(self): self.ready(); self.qpid_ha(["promote", "--cluster-manager"]) - def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) - @property - def agent(self): - if not self._agent: - cred = self.client_credentials - if cred: - self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) - else: - self._agent = QmfAgent(self.host_port()) - return self._agent - - def qmf(self): - hb = self.agent.getHaBroker() - hb.update() - return hb - - def ha_status(self): return self.qmf().status - - def wait_status(self, status, timeout=10): - - def try_get_status(): - self._status = "<unknown>" - try: - self._status = self.ha_status() - except qm.ConnectionError, e: - # Record the error but don't raise, the broker may not be up yet. - self._status = "%s: %s" % (type(e).__name__, e) - return self._status == status; - assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%( - self, status, self._status) - - def wait_queue(self, queue, timeout=10, msg="wait_queue"): - """ Wait for queue to be visible via QMF""" - agent = self.agent - assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), \ - "%s queue %s not present" % (msg, queue) - - def wait_no_queue(self, queue, timeout=10, msg="wait_no_queue"): - """ Wait for queue to be invisible via QMF""" - agent = self.agent - assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue) - - def qpid_config(self, args): - assert subprocess.call( - ["qpid-config", "--broker", self.host_port()]+args, stdout=1, stderr=subprocess.STDOUT - ) == 0, "qpid-config failed" - - def config_replicate(self, from_broker, queue): - self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) - - def config_declare(self, queue, replication): - self.qpid_config(["add", "queue", queue, "--replicate", replication]) - - def connect_admin(self, **kwargs): - cred = self.client_credentials - if cred: - return Broker.connect( - self, client_properties={"qpid.ha-admin":1}, - username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, - **kwargs) - else: - return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) - - def wait_address(self, address): - """Wait for address to become valid on the broker.""" - c = self.connect_admin() - try: wait_address(c, address) - finally: c.close() - - wait_backup = wait_address - - def browse(self, queue, timeout=0, transform=lambda m: m.content): - c = self.connect_admin() - try: - return browse(c.session(), queue, timeout, transform) - finally: c.close() - - def assert_browse_backup(self, queue, expected, **kwargs): - """Combines wait_backup and assert_browse_retry.""" - c = self.connect_admin() - try: - wait_address(c, queue) - if not "msg" in kwargs: - kwargs["msg"]=str(self) - assert_browse_retry(c.session(), queue, expected, **kwargs) - finally: c.close() - - assert_browse = assert_browse_backup - - def assert_connect_fail(self): - try: - self.connect() - self.test.fail("Expected qm.ConnectionError") - except qm.ConnectionError: pass - - def try_connect(self): - try: return self.connect() - except qm.ConnectionError: return None - - def ready(self, *args, **kwargs): - if not 'client_properties' in kwargs: kwargs['client_properties'] = {} - kwargs['client_properties']['qpid.ha-admin'] = True - return Broker.ready(self, *args, **kwargs) - - def kill(self, final=True): - if final: self.ha_port.teardown() - self._agent = None - return Broker.kill(self) - - -class HaCluster(object): - _cluster_count = 0 - - def __init__(self, test, n, promote=True, wait=True, args=[], s_args=[], **kwargs): - """Start a cluster of n brokers. - - @test: The test being run - @n: start n brokers - @promote: promote self[0] to primary - @wait: wait for primary active and backups ready. Ignored if promote=False - @args: args for all brokers in the cluster. - @s_args: args for specific brokers: s_args[i] for broker i. - """ - self.test = test - self.args = copy(args) - self.s_args = copy(s_args) - self.kwargs = kwargs - self._ports = [HaPort(test) for i in xrange(n)] - self._set_url() - self._brokers = [] - self.id = HaCluster._cluster_count - self.broker_id = 0 - HaCluster._cluster_count += 1 - for i in xrange(n): self.start() - if promote: - self[0].promote() - if wait: - self[0].wait_status("active") - for b in self[1:]: b.wait_status("ready") - - def next_name(self): - name="cluster%s-%s"%(self.id, self.broker_id) - self.broker_id += 1 - return name - - def _ha_broker(self, i, name): - args = self.args - if i < len(self.s_args): args += self.s_args[i] - ha_port = self._ports[i] - b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name, - args=args, **self.kwargs) - b.ready(timeout=10) - return b - - def start(self): - """Start a new broker in the cluster""" - i = len(self) - assert i <= len(self._ports) - if i == len(self._ports): # Adding new broker after cluster init - self._ports.append(HaPort(self.test)) - self._set_url() - b = self._ha_broker(i, self.next_name()) - self._brokers.append(b) - return b - - def _set_url(self): - self.url = ",".join("127.0.0.1:%s"%(p.port) for p in self._ports) - - def connect(self, i, **kwargs): - """Connect with reconnect_urls""" - c = self[i].connect(reconnect=True, reconnect_urls=self.url.split(","), **kwargs) - self.test.teardown_add(c) # Clean up - return c - - def kill(self, i, promote_next=True, final=True): - """Kill broker i, promote broker i+1""" - self[i].kill(final=final) - if promote_next: self[(i+1) % len(self)].promote() - - def restart(self, i): - """Start a broker with the same port, name and data directory. It will get - a separate log file: foo.n.log""" - if self._ports[i].stopped: raise Exception("Restart after final kill: %s"%(self)) - b = self._brokers[i] - self._brokers[i] = self._ha_broker(i, b.name) - self._brokers[i].ready() - - def bounce(self, i, promote_next=True): - """Stop and restart a broker in a cluster.""" - if (len(self) == 1): - self.kill(i, promote_next=False, final=False) - self.restart(i) - self[i].ready() - if promote_next: self[i].promote() - else: - self.kill(i, promote_next, final=False) - self.restart(i) - - # Behave like a list of brokers. - def __len__(self): return len(self._brokers) - def __getitem__(self,index): return self._brokers[index] - def __iter__(self): return self._brokers.__iter__() - - -def wait_address(connection, address): - """Wait for an address to become valid.""" - assert retry(lambda: valid_address(connection, address)), "Timed out waiting for address %s"%(address) - -def valid_address(connection, address): - """Test if an address is valid""" - try: - s = connection.session().receiver(address) - s.session.close() - return True - except qm.NotFound: - return False - - |