# plugin/plugin_base.py # Copyright (C) 2005-2020 the SQLAlchemy authors and contributors # # # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php """Testing extensions. this module is designed to work as a testing-framework-agnostic library, created so that multiple test frameworks can be supported at once (mostly so that we can migrate to new ones). The current target is pytest. """ from __future__ import absolute_import import abc import re import sys py3k = sys.version_info >= (3, 0) if py3k: import configparser ABC = abc.ABC else: import ConfigParser as configparser import collections as collections_abc # noqa class ABC(object): __metaclass__ = abc.ABCMeta # late imports fixtures = None engines = None exclusions = None warnings = None profiling = None assertions = None requirements = None config = None testing = None util = None file_config = None logging = None include_tags = set() exclude_tags = set() options = None def setup_options(make_option): make_option( "--log-info", action="callback", type="string", callback=_log, help="turn on info logging for (multiple OK)", ) make_option( "--log-debug", action="callback", type="string", callback=_log, help="turn on debug logging for (multiple OK)", ) make_option( "--db", action="append", type="string", dest="db", help="Use prefab database uri. Multiple OK, " "first one is run by default.", ) make_option( "--dbs", action="callback", zeroarg_callback=_list_dbs, help="List available prefab dbs", ) make_option( "--dburi", action="append", type="string", dest="dburi", help="Database uri. Multiple OK, " "first one is run by default.", ) make_option( "--dropfirst", action="store_true", dest="dropfirst", help="Drop all tables in the target database first", ) make_option( "--backend-only", action="store_true", dest="backend_only", help="Run only tests marked with __backend__ or __sparse_backend__", ) make_option( "--nomemory", action="store_true", dest="nomemory", help="Don't run memory profiling tests", ) make_option( "--profile-sort", type="string", default="cumulative", dest="profilesort", help="Type of sort for profiling standard output", ) make_option( "--profile-dump", type="string", dest="profiledump", help="Filename where a single profile run will be dumped", ) make_option( "--postgresql-templatedb", type="string", help="name of template database to use for PostgreSQL " "CREATE DATABASE (defaults to current database)", ) make_option( "--low-connections", action="store_true", dest="low_connections", help="Use a low number of distinct connections - " "i.e. for Oracle TNS", ) make_option( "--write-idents", type="string", dest="write_idents", help="write out generated follower idents to , " "when -n is used", ) make_option( "--reversetop", action="store_true", dest="reversetop", default=False, help="Use a random-ordering set implementation in the ORM " "(helps reveal dependency issues)", ) make_option( "--requirements", action="callback", type="string", callback=_requirements_opt, help="requirements class for testing, overrides setup.cfg", ) make_option( "--with-cdecimal", action="store_true", dest="cdecimal", default=False, help="Monkeypatch the cdecimal library into Python 'decimal' " "for all tests", ) make_option( "--include-tag", action="callback", callback=_include_tag, type="string", help="Include tests with tag ", ) make_option( "--exclude-tag", action="callback", callback=_exclude_tag, type="string", help="Exclude tests with tag ", ) make_option( "--write-profiles", action="store_true", dest="write_profiles", default=False, help="Write/update failing profiling data.", ) make_option( "--force-write-profiles", action="store_true", dest="force_write_profiles", default=False, help="Unconditionally write/update profiling data.", ) make_option( "--dump-pyannotate", type=str, dest="dump_pyannotate", help="Run pyannotate and dump json info to given file", ) def configure_follower(follower_ident): """Configure required state for a follower. This invokes in the parent process and typically includes database creation. """ from sqlalchemy.testing import provision provision.FOLLOWER_IDENT = follower_ident def memoize_important_follower_config(dict_): """Store important configuration we will need to send to a follower. This invokes in the parent process after normal config is set up. This is necessary as pytest seems to not be using forking, so we start with nothing in memory, *but* it isn't running our argparse callables, so we have to just copy all of that over. """ dict_["memoized_config"] = { "include_tags": include_tags, "exclude_tags": exclude_tags, } def restore_important_follower_config(dict_): """Restore important configuration needed by a follower. This invokes in the follower process. """ global include_tags, exclude_tags include_tags.update(dict_["memoized_config"]["include_tags"]) exclude_tags.update(dict_["memoized_config"]["exclude_tags"]) def read_config(): global file_config file_config = configparser.ConfigParser() file_config.read(["setup.cfg", "test.cfg"]) def pre_begin(opt): """things to set up early, before coverage might be setup.""" global options options = opt for fn in pre_configure: fn(options, file_config) def set_coverage_flag(value): options.has_coverage = value def post_begin(): """things to set up later, once we know coverage is running.""" # Lazy setup of other options (post coverage) for fn in post_configure: fn(options, file_config) # late imports, has to happen after config. global util, fixtures, engines, exclusions, assertions global warnings, profiling, config, testing from sqlalchemy import testing # noqa from sqlalchemy.testing import fixtures, engines, exclusions # noqa from sqlalchemy.testing import assertions, warnings, profiling # noqa from sqlalchemy.testing import config # noqa from sqlalchemy import util # noqa warnings.setup_filters() def _log(opt_str, value, parser): global logging if not logging: import logging logging.basicConfig() if opt_str.endswith("-info"): logging.getLogger(value).setLevel(logging.INFO) elif opt_str.endswith("-debug"): logging.getLogger(value).setLevel(logging.DEBUG) def _list_dbs(*args): print("Available --db options (use --dburi to override)") for macro in sorted(file_config.options("db")): print("%20s\t%s" % (macro, file_config.get("db", macro))) sys.exit(0) def _requirements_opt(opt_str, value, parser): _setup_requirements(value) def _exclude_tag(opt_str, value, parser): exclude_tags.add(value.replace("-", "_")) def _include_tag(opt_str, value, parser): include_tags.add(value.replace("-", "_")) pre_configure = [] post_configure = [] def pre(fn): pre_configure.append(fn) return fn def post(fn): post_configure.append(fn) return fn @pre def _setup_options(opt, file_config): global options options = opt @pre def _set_nomemory(opt, file_config): if opt.nomemory: exclude_tags.add("memory_intensive") @pre def _monkeypatch_cdecimal(options, file_config): if options.cdecimal: import cdecimal sys.modules["decimal"] = cdecimal @post def _init_symbols(options, file_config): from sqlalchemy.testing import config config._fixture_functions = _fixture_fn_class() @post def _engine_uri(options, file_config): from sqlalchemy.testing import config from sqlalchemy import testing from sqlalchemy.testing import provision if options.dburi: db_urls = list(options.dburi) else: db_urls = [] if options.db: for db_token in options.db: for db in re.split(r"[,\s]+", db_token): if db not in file_config.options("db"): raise RuntimeError( "Unknown URI specifier '%s'. " "Specify --dbs for known uris." % db ) else: db_urls.append(file_config.get("db", db)) if not db_urls: db_urls.append(file_config.get("db", "default")) config._current = None for db_url in db_urls: if options.write_idents and provision.FOLLOWER_IDENT: # != 'master': with open(options.write_idents, "a") as file_: file_.write(provision.FOLLOWER_IDENT + " " + db_url + "\n") cfg = provision.setup_config( db_url, options, file_config, provision.FOLLOWER_IDENT ) if not config._current: cfg.set_as_current(cfg, testing) @post def _requirements(options, file_config): requirement_cls = file_config.get("sqla_testing", "requirement_cls") _setup_requirements(requirement_cls) def _setup_requirements(argument): from sqlalchemy.testing import config from sqlalchemy import testing if config.requirements is not None: return modname, clsname = argument.split(":") # importlib.import_module() only introduced in 2.7, a little # late mod = __import__(modname) for component in modname.split(".")[1:]: mod = getattr(mod, component) req_cls = getattr(mod, clsname) config.requirements = testing.requires = req_cls() @post def _prep_testing_database(options, file_config): from sqlalchemy.testing import config, util from sqlalchemy.testing.exclusions import against from sqlalchemy import schema, inspect if options.dropfirst: for cfg in config.Config.all_configs(): e = cfg.db # TODO: this has to be part of provision.py in postgresql if against(cfg, "postgresql"): with e.connect().execution_options( isolation_level="AUTOCOMMIT" ) as conn: for xid in conn.execute( "select gid from pg_prepared_xacts" ).scalars(): conn.execute("ROLLBACK PREPARED '%s'" % xid) inspector = inspect(e) try: view_names = inspector.get_view_names() except NotImplementedError: pass else: for vname in view_names: e.execute( schema._DropView( schema.Table(vname, schema.MetaData()) ) ) if config.requirements.schemas.enabled_for_config(cfg): try: view_names = inspector.get_view_names(schema="test_schema") except NotImplementedError: pass else: for vname in view_names: e.execute( schema._DropView( schema.Table( vname, schema.MetaData(), schema="test_schema", ) ) ) util.drop_all_tables(e, inspector) if config.requirements.schemas.enabled_for_config(cfg): util.drop_all_tables(e, inspector, schema=cfg.test_schema) # TODO: this has to be part of provision.py in postgresql if against(cfg, "postgresql"): from sqlalchemy.dialects import postgresql for enum in inspector.get_enums("*"): e.execute( postgresql.DropEnumType( postgresql.ENUM( name=enum["name"], schema=enum["schema"] ) ) ) # TODO: need to do a get_sequences and drop them also after tables @post def _reverse_topological(options, file_config): if options.reversetop: from sqlalchemy.orm.util import randomize_unitofwork randomize_unitofwork() @post def _post_setup_options(opt, file_config): from sqlalchemy.testing import config config.options = options config.file_config = file_config @post def _setup_profiling(options, file_config): from sqlalchemy.testing import profiling profiling._profile_stats = profiling.ProfileStatsFile( file_config.get("sqla_testing", "profile_file"), sort=options.profilesort, dump=options.profiledump, ) def want_class(name, cls): if not issubclass(cls, fixtures.TestBase): return False elif name.startswith("_"): return False elif ( config.options.backend_only and not getattr(cls, "__backend__", False) and not getattr(cls, "__sparse_backend__", False) ): return False else: return True def want_method(cls, fn): if not fn.__name__.startswith("test_"): return False elif fn.__module__ is None: return False elif include_tags: return ( hasattr(cls, "__tags__") and exclusions.tags(cls.__tags__).include_test( include_tags, exclude_tags ) ) or ( hasattr(fn, "_sa_exclusion_extend") and fn._sa_exclusion_extend.include_test( include_tags, exclude_tags ) ) elif exclude_tags and hasattr(cls, "__tags__"): return exclusions.tags(cls.__tags__).include_test( include_tags, exclude_tags ) elif exclude_tags and hasattr(fn, "_sa_exclusion_extend"): return fn._sa_exclusion_extend.include_test(include_tags, exclude_tags) else: return True def generate_sub_tests(cls, module): if getattr(cls, "__backend__", False) or getattr( cls, "__sparse_backend__", False ): sparse = getattr(cls, "__sparse_backend__", False) for cfg in _possible_configs_for_cls(cls, sparse=sparse): orig_name = cls.__name__ # we can have special chars in these names except for the # pytest junit plugin, which is tripped up by the brackets # and periods, so sanitize alpha_name = re.sub(r"[_\[\]\.]+", "_", cfg.name) alpha_name = re.sub(r"_+$", "", alpha_name) name = "%s_%s" % (cls.__name__, alpha_name) subcls = type( name, (cls,), {"_sa_orig_cls_name": orig_name, "__only_on_config__": cfg}, ) setattr(module, name, subcls) yield subcls else: yield cls def start_test_class(cls): _do_skips(cls) _setup_engine(cls) def stop_test_class(cls): # from sqlalchemy import inspect # assert not inspect(testing.db).get_table_names() engines.testing_reaper._stop_test_ctx() try: if not options.low_connections: assertions.global_cleanup_assertions() finally: _restore_engine() def _restore_engine(): config._current.reset(testing) def final_process_cleanup(): engines.testing_reaper._stop_test_ctx_aggressive() assertions.global_cleanup_assertions() _restore_engine() def _setup_engine(cls): if getattr(cls, "__engine_options__", None): eng = engines.testing_engine(options=cls.__engine_options__) config._current.push_engine(eng, testing) def before_test(test, test_module_name, test_class, test_name): # format looks like: # "test.aaa_profiling.test_compiler.CompileTest.test_update_whereclause" name = getattr(test_class, "_sa_orig_cls_name", test_class.__name__) id_ = "%s.%s.%s" % (test_module_name, name, test_name) profiling._start_current_test(id_) def after_test(test): engines.testing_reaper._after_test_ctx() def _possible_configs_for_cls(cls, reasons=None, sparse=False): all_configs = set(config.Config.all_configs()) if cls.__unsupported_on__: spec = exclusions.db_spec(*cls.__unsupported_on__) for config_obj in list(all_configs): if spec(config_obj): all_configs.remove(config_obj) if getattr(cls, "__only_on__", None): spec = exclusions.db_spec(*util.to_list(cls.__only_on__)) for config_obj in list(all_configs): if not spec(config_obj): all_configs.remove(config_obj) if getattr(cls, "__only_on_config__", None): all_configs.intersection_update([cls.__only_on_config__]) if hasattr(cls, "__requires__"): requirements = config.requirements for config_obj in list(all_configs): for requirement in cls.__requires__: check = getattr(requirements, requirement) skip_reasons = check.matching_config_reasons(config_obj) if skip_reasons: all_configs.remove(config_obj) if reasons is not None: reasons.extend(skip_reasons) break if hasattr(cls, "__prefer_requires__"): non_preferred = set() requirements = config.requirements for config_obj in list(all_configs): for requirement in cls.__prefer_requires__: check = getattr(requirements, requirement) if not check.enabled_for_config(config_obj): non_preferred.add(config_obj) if all_configs.difference(non_preferred): all_configs.difference_update(non_preferred) if sparse: # pick only one config from each base dialect # sorted so we get the same backend each time selecting the highest # server version info. per_dialect = {} for cfg in reversed( sorted( all_configs, key=lambda cfg: ( cfg.db.name, cfg.db.dialect.server_version_info, ), ) ): db = cfg.db.name if db not in per_dialect: per_dialect[db] = cfg return per_dialect.values() return all_configs def _do_skips(cls): reasons = [] all_configs = _possible_configs_for_cls(cls, reasons) if getattr(cls, "__skip_if__", False): for c in getattr(cls, "__skip_if__"): if c(): config.skip_test( "'%s' skipped by %s" % (cls.__name__, c.__name__) ) if not all_configs: msg = "'%s' unsupported on any DB implementation %s%s" % ( cls.__name__, ", ".join( "'%s(%s)+%s'" % ( config_obj.db.name, ".".join( str(dig) for dig in exclusions._server_version(config_obj.db) ), config_obj.db.driver, ) for config_obj in config.Config.all_configs() ), ", ".join(reasons), ) config.skip_test(msg) elif hasattr(cls, "__prefer_backends__"): non_preferred = set() spec = exclusions.db_spec(*util.to_list(cls.__prefer_backends__)) for config_obj in all_configs: if not spec(config_obj): non_preferred.add(config_obj) if all_configs.difference(non_preferred): all_configs.difference_update(non_preferred) if config._current not in all_configs: _setup_config(all_configs.pop(), cls) def _setup_config(config_obj, ctx): config._current.push(config_obj, testing) class FixtureFunctions(ABC): @abc.abstractmethod def skip_test_exception(self, *arg, **kw): raise NotImplementedError() @abc.abstractmethod def combinations(self, *args, **kw): raise NotImplementedError() @abc.abstractmethod def param_ident(self, *args, **kw): raise NotImplementedError() @abc.abstractmethod def fixture(self, *arg, **kw): raise NotImplementedError() def get_current_test_name(self): raise NotImplementedError() _fixture_fn_class = None def set_fixture_functions(fixture_fn_class): global _fixture_fn_class _fixture_fn_class = fixture_fn_class