summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/testing')
-rw-r--r--lib/sqlalchemy/testing/provision.py364
-rw-r--r--lib/sqlalchemy/testing/suite/test_reflection.py12
2 files changed, 59 insertions, 317 deletions
diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py
index 70ace0511..a412f0943 100644
--- a/lib/sqlalchemy/testing/provision.py
+++ b/lib/sqlalchemy/testing/provision.py
@@ -1,13 +1,8 @@
import collections
import logging
-import os
-import time
from . import config
from . import engines
-from .. import create_engine
-from .. import exc
-from .. import text
from ..engine import url as sa_url
from ..util import compat
@@ -49,33 +44,32 @@ class register(object):
def create_follower_db(follower_ident):
for cfg in _configs_for_db_operation():
log.info("CREATE database %s, URI %r", follower_ident, cfg.db.url)
- _create_db(cfg, cfg.db, follower_ident)
-
-
-def configure_follower(follower_ident):
- for cfg in config.Config.all_configs():
- _configure_follower(cfg, follower_ident)
+ create_db(cfg, cfg.db, follower_ident)
def setup_config(db_url, options, file_config, follower_ident):
+ # load the dialect, which should also have it set up its provision
+ # hooks
+
+ sa_url.make_url(db_url).get_dialect()
if follower_ident:
- db_url = _follower_url_from_main(db_url, follower_ident)
+ db_url = follower_url_from_main(db_url, follower_ident)
db_opts = {}
- _update_db_opts(db_url, db_opts)
+ update_db_opts(db_url, db_opts)
eng = engines.testing_engine(db_url, db_opts)
- _post_configure_engine(db_url, eng, follower_ident)
+ post_configure_engine(db_url, eng, follower_ident)
eng.connect().close()
cfg = config.Config.register(eng, db_opts, options, file_config)
if follower_ident:
- _configure_follower(cfg, follower_ident)
+ configure_follower(cfg, follower_ident)
return cfg
def drop_follower_db(follower_ident):
for cfg in _configs_for_db_operation():
log.info("DROP database %s, URI %r", follower_ident, cfg.db.url)
- _drop_db(cfg, cfg.db, follower_ident)
+ drop_db(cfg, cfg.db, follower_ident)
def _configs_for_db_operation():
@@ -98,211 +92,66 @@ def _configs_for_db_operation():
@register.init
-def _create_db(cfg, eng, ident):
+def create_db(cfg, eng, ident):
+ """Dynamically create a database for testing.
+
+ Used when a test run will employ multiple processes, e.g., when run
+ via `tox` or `py.test -n4`.
+ """
raise NotImplementedError("no DB creation routine for cfg: %s" % eng.url)
@register.init
-def _drop_db(cfg, eng, ident):
+def drop_db(cfg, eng, ident):
+ """Drop a database that we dynamically created for testing."""
raise NotImplementedError("no DB drop routine for cfg: %s" % eng.url)
@register.init
-def _update_db_opts(db_url, db_opts):
+def update_db_opts(db_url, db_opts):
+ """Set database options (db_opts) for a test database that we created.
+ """
pass
@register.init
-def _configure_follower(cfg, ident):
- pass
-
+def post_configure_engine(url, engine, follower_ident):
+ """Perform extra steps after configuring an engine for testing.
-@register.init
-def _post_configure_engine(url, engine, follower_ident):
+ (For the internal dialects, currently only used by sqlite.)
+ """
pass
@register.init
-def _follower_url_from_main(url, ident):
+def follower_url_from_main(url, ident):
+ """Create a connection URL for a dynamically-created test database.
+
+ :param url: the connection URL specified when the test run was invoked
+ :param ident: the pytest-xdist "worker identifier" to be used as the
+ database name
+ """
url = sa_url.make_url(url)
url.database = ident
return url
-@_update_db_opts.for_db("mssql")
-def _mssql_update_db_opts(db_url, db_opts):
- db_opts["legacy_schema_aliasing"] = False
-
-
-@_follower_url_from_main.for_db("sqlite")
-def _sqlite_follower_url_from_main(url, ident):
- url = sa_url.make_url(url)
- if not url.database or url.database == ":memory:":
- return url
- else:
- return sa_url.make_url("sqlite:///%s.db" % ident)
-
-
-@_post_configure_engine.for_db("sqlite")
-def _sqlite_post_configure_engine(url, engine, follower_ident):
- from sqlalchemy import event
-
- @event.listens_for(engine, "connect")
- def connect(dbapi_connection, connection_record):
- # use file DBs in all cases, memory acts kind of strangely
- # as an attached
- if not follower_ident:
- dbapi_connection.execute(
- 'ATTACH DATABASE "test_schema.db" AS test_schema'
- )
- else:
- dbapi_connection.execute(
- 'ATTACH DATABASE "%s_test_schema.db" AS test_schema'
- % follower_ident
- )
-
-
-@_create_db.for_db("postgresql")
-def _pg_create_db(cfg, eng, ident):
- template_db = cfg.options.postgresql_templatedb
-
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- try:
- _pg_drop_db(cfg, conn, ident)
- except Exception:
- pass
- if not template_db:
- template_db = conn.scalar("select current_database()")
-
- attempt = 0
- while True:
- try:
- conn.execute(
- "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db)
- )
- except exc.OperationalError as err:
- attempt += 1
- if attempt >= 3:
- raise
- if "accessed by other users" in str(err):
- log.info(
- "Waiting to create %s, URI %r, "
- "template DB %s is in use sleeping for .5",
- ident,
- eng.url,
- template_db,
- )
- time.sleep(0.5)
- except:
- raise
- else:
- break
-
-
-@_create_db.for_db("mysql")
-def _mysql_create_db(cfg, eng, ident):
- with eng.connect() as conn:
- try:
- _mysql_drop_db(cfg, conn, ident)
- except Exception:
- pass
-
- conn.execute("CREATE DATABASE %s CHARACTER SET utf8mb4" % ident)
- conn.execute(
- "CREATE DATABASE %s_test_schema CHARACTER SET utf8mb4" % ident
- )
- conn.execute(
- "CREATE DATABASE %s_test_schema_2 CHARACTER SET utf8mb4" % ident
- )
-
-
-@_configure_follower.for_db("mysql")
-def _mysql_configure_follower(config, ident):
- config.test_schema = "%s_test_schema" % ident
- config.test_schema_2 = "%s_test_schema_2" % ident
-
-
-@_create_db.for_db("sqlite")
-def _sqlite_create_db(cfg, eng, ident):
+@register.init
+def configure_follower(cfg, ident):
+ """Create dialect-specific config settings for a follower database."""
pass
-@_drop_db.for_db("postgresql")
-def _pg_drop_db(cfg, eng, ident):
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- conn.execute(
- text(
- "select pg_terminate_backend(pid) from pg_stat_activity "
- "where usename=current_user and pid != pg_backend_pid() "
- "and datname=:dname"
- ),
- dname=ident,
- )
- conn.execute("DROP DATABASE %s" % ident)
-
-
-@_drop_db.for_db("sqlite")
-def _sqlite_drop_db(cfg, eng, ident):
- if ident:
- os.remove("%s_test_schema.db" % ident)
- else:
- os.remove("%s.db" % ident)
-
-
-@_drop_db.for_db("mysql")
-def _mysql_drop_db(cfg, eng, ident):
- with eng.connect() as conn:
- conn.execute("DROP DATABASE %s_test_schema" % ident)
- conn.execute("DROP DATABASE %s_test_schema_2" % ident)
- conn.execute("DROP DATABASE %s" % ident)
-
-
-@_create_db.for_db("oracle")
-def _oracle_create_db(cfg, eng, ident):
- # NOTE: make sure you've run "ALTER DATABASE default tablespace users" or
- # similar, so that the default tablespace is not "system"; reflection will
- # fail otherwise
- with eng.connect() as conn:
- conn.execute("create user %s identified by xe" % ident)
- conn.execute("create user %s_ts1 identified by xe" % ident)
- conn.execute("create user %s_ts2 identified by xe" % ident)
- conn.execute("grant dba to %s" % (ident,))
- conn.execute("grant unlimited tablespace to %s" % ident)
- conn.execute("grant unlimited tablespace to %s_ts1" % ident)
- conn.execute("grant unlimited tablespace to %s_ts2" % ident)
-
-
-@_configure_follower.for_db("oracle")
-def _oracle_configure_follower(config, ident):
- config.test_schema = "%s_ts1" % ident
- config.test_schema_2 = "%s_ts2" % ident
-
-
-def _ora_drop_ignore(conn, dbname):
- try:
- conn.execute("drop user %s cascade" % dbname)
- log.info("Reaped db: %s", dbname)
- return True
- except exc.DatabaseError as err:
- log.warning("couldn't drop db: %s", err)
- return False
-
-
-@_drop_db.for_db("oracle")
-def _oracle_drop_db(cfg, eng, ident):
- with eng.connect() as conn:
- # cx_Oracle seems to occasionally leak open connections when a large
- # suite it run, even if we confirm we have zero references to
- # connection objects.
- # while there is a "kill session" command in Oracle,
- # it unfortunately does not release the connection sufficiently.
- _ora_drop_ignore(conn, ident)
- _ora_drop_ignore(conn, "%s_ts1" % ident)
- _ora_drop_ignore(conn, "%s_ts2" % ident)
-
-
-@_update_db_opts.for_db("oracle")
-def _oracle_update_db_opts(db_url, db_opts):
+@register.init
+def run_reap_dbs(url, ident):
+ """Remove databases that were created during the test process, after the
+ process has ended.
+
+ This is an optional step that is invoked for certain backends that do not
+ reliably release locks on the database as long as a process is still in
+ use. For the internal dialects, this is currently only necessary for
+ mssql and oracle.
+ """
pass
@@ -322,118 +171,19 @@ def reap_dbs(idents_file):
idents[url_key].add(db_name)
for url_key in urls:
- backend = url_key[0]
url = list(urls[url_key])[0]
ident = idents[url_key]
- if backend == "oracle":
- _reap_oracle_dbs(url, ident)
- elif backend == "mssql":
- _reap_mssql_dbs(url, ident)
-
-
-def _reap_oracle_dbs(url, idents):
- log.info("db reaper connecting to %r", url)
- eng = create_engine(url)
- with eng.connect() as conn:
-
- log.info("identifiers in file: %s", ", ".join(idents))
-
- to_reap = conn.execute(
- "select u.username from all_users u where username "
- "like 'TEST_%' and not exists (select username "
- "from v$session where username=u.username)"
- )
- all_names = {username.lower() for (username,) in to_reap}
- to_drop = set()
- for name in all_names:
- if name.endswith("_ts1") or name.endswith("_ts2"):
- continue
- elif name in idents:
- to_drop.add(name)
- if "%s_ts1" % name in all_names:
- to_drop.add("%s_ts1" % name)
- if "%s_ts2" % name in all_names:
- to_drop.add("%s_ts2" % name)
-
- dropped = total = 0
- for total, username in enumerate(to_drop, 1):
- if _ora_drop_ignore(conn, username):
- dropped += 1
- log.info(
- "Dropped %d out of %d stale databases detected", dropped, total
- )
-
-
-@_follower_url_from_main.for_db("oracle")
-def _oracle_follower_url_from_main(url, ident):
- url = sa_url.make_url(url)
- url.username = ident
- url.password = "xe"
- return url
+ run_reap_dbs(url, ident)
-@_create_db.for_db("mssql")
-def _mssql_create_db(cfg, eng, ident):
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- conn.execute("create database %s" % ident)
- conn.execute(
- "ALTER DATABASE %s SET ALLOW_SNAPSHOT_ISOLATION ON" % ident
- )
- conn.execute(
- "ALTER DATABASE %s SET READ_COMMITTED_SNAPSHOT ON" % ident
- )
- conn.execute("use %s" % ident)
- conn.execute("create schema test_schema")
- conn.execute("create schema test_schema_2")
-
-
-@_drop_db.for_db("mssql")
-def _mssql_drop_db(cfg, eng, ident):
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
- _mssql_drop_ignore(conn, ident)
-
-
-def _mssql_drop_ignore(conn, ident):
- try:
- # typically when this happens, we can't KILL the session anyway,
- # so let the cleanup process drop the DBs
- # for row in conn.execute(
- # "select session_id from sys.dm_exec_sessions "
- # "where database_id=db_id('%s')" % ident):
- # log.info("killing SQL server sesssion %s", row['session_id'])
- # conn.execute("kill %s" % row['session_id'])
-
- conn.execute("drop database %s" % ident)
- log.info("Reaped db: %s", ident)
- return True
- except exc.DatabaseError as err:
- log.warning("couldn't drop db: %s", err)
- return False
-
-
-def _reap_mssql_dbs(url, idents):
- log.info("db reaper connecting to %r", url)
- eng = create_engine(url)
- with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
-
- log.info("identifiers in file: %s", ", ".join(idents))
-
- to_reap = conn.execute(
- "select d.name from sys.databases as d where name "
- "like 'TEST_%' and not exists (select session_id "
- "from sys.dm_exec_sessions "
- "where database_id=d.database_id)"
- )
- all_names = {dbname.lower() for (dbname,) in to_reap}
- to_drop = set()
- for name in all_names:
- if name in idents:
- to_drop.add(name)
-
- dropped = total = 0
- for total, dbname in enumerate(to_drop, 1):
- if _mssql_drop_ignore(conn, dbname):
- dropped += 1
- log.info(
- "Dropped %d out of %d stale databases detected", dropped, total
- )
+@register.init
+def temp_table_keyword_args(cfg, eng):
+ """Specify keyword arguments for creating a temporary Table.
+
+ Dialect-specific implementations of this method will return the
+ kwargs that are passed to the Table method when creating a temporary
+ table for testing, e.g., in the define_temp_tables method of the
+ ComponentReflectionTest class in suite/test_reflection.py
+ """
+ raise NotImplementedError(
+ "no temp table keyword args routine for cfg: %s" % eng.url)
diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py
index 85e193d8e..f9ff46492 100644
--- a/lib/sqlalchemy/testing/suite/test_reflection.py
+++ b/lib/sqlalchemy/testing/suite/test_reflection.py
@@ -9,6 +9,7 @@ from .. import eq_
from .. import expect_warnings
from .. import fixtures
from .. import is_
+from ..provision import temp_table_keyword_args
from ..schema import Column
from ..schema import Table
from ... import event
@@ -304,16 +305,7 @@ class ComponentReflectionTest(fixtures.TablesTest):
@classmethod
def define_temp_tables(cls, metadata):
- # cheat a bit, we should fix this with some dialect-level
- # temp table fixture
- if testing.against("oracle"):
- kw = {
- "prefixes": ["GLOBAL TEMPORARY"],
- "oracle_on_commit": "PRESERVE ROWS",
- }
- else:
- kw = {"prefixes": ["TEMPORARY"]}
-
+ kw = temp_table_keyword_args(config, config.db)
user_tmp = Table(
"user_tmp",
metadata,