diff options
Diffstat (limited to 'lib/sqlalchemy/testing')
-rw-r--r-- | lib/sqlalchemy/testing/assertsql.py | 14 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/engines.py | 2 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/exclusions.py | 114 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/plugin/config.py | 33 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/profiling.py | 13 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/requirements.py | 81 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/schema.py | 5 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/suite/__init__.py | 4 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/suite/requirements.py | 30 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/suite/test_ddl.py | 3 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/suite/test_insert.py | 111 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/suite/test_reflection.py | 424 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/suite/test_sequencing.py | 36 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/suite/test_update_delete.py | 64 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/util.py | 4 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/warnings.py | 4 |
16 files changed, 773 insertions, 169 deletions
diff --git a/lib/sqlalchemy/testing/assertsql.py b/lib/sqlalchemy/testing/assertsql.py index 897f4b3b1..08ee55d57 100644 --- a/lib/sqlalchemy/testing/assertsql.py +++ b/lib/sqlalchemy/testing/assertsql.py @@ -1,8 +1,6 @@ -from sqlalchemy.interfaces import ConnectionProxy -from sqlalchemy.engine.default import DefaultDialect -from sqlalchemy.engine.base import Connection -from sqlalchemy import util +from ..engine.default import DefaultDialect +from .. import util import re class AssertRule(object): @@ -262,16 +260,16 @@ def _process_assertion_statement(query, context): paramstyle = context.dialect.paramstyle if paramstyle == 'named': pass - elif paramstyle =='pyformat': + elif paramstyle == 'pyformat': query = re.sub(r':([\w_]+)', r"%(\1)s", query) else: # positional params repl = None - if paramstyle=='qmark': + if paramstyle == 'qmark': repl = "?" - elif paramstyle=='format': + elif paramstyle == 'format': repl = r"%s" - elif paramstyle=='numeric': + elif paramstyle == 'numeric': repl = None query = re.sub(r':([\w_]+)', repl, query) diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index f7401550e..74e22adf1 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -5,7 +5,7 @@ import weakref from collections import deque from . import config from .util import decorator -from sqlalchemy import event, pool +from .. import event, pool import re import warnings diff --git a/lib/sqlalchemy/testing/exclusions.py b/lib/sqlalchemy/testing/exclusions.py index ba2eebe4f..96dd0d693 100644 --- a/lib/sqlalchemy/testing/exclusions.py +++ b/lib/sqlalchemy/testing/exclusions.py @@ -1,49 +1,63 @@ import operator from nose import SkipTest -from sqlalchemy.util import decorator +from ..util import decorator from . import config -from sqlalchemy import util +from .. import util -def fails_if(predicate, reason=None): - predicate = _as_predicate(predicate) - - @decorator - def decorate(fn, *args, **kw): - if not predicate(): - return fn(*args, **kw) - else: - try: - fn(*args, **kw) - except Exception, ex: - print ("'%s' failed as expected (%s): %s " % ( - fn.__name__, predicate, str(ex))) - return True - else: - raise AssertionError( - "Unexpected success for '%s' (%s)" % - (fn.__name__, predicate)) - return decorate +class fails_if(object): + def __init__(self, predicate, reason=None): + self.predicate = _as_predicate(predicate) + self.reason = reason -def skip_if(predicate, reason=None): - predicate = _as_predicate(predicate) + @property + def enabled(self): + return not self.predicate() - @decorator - def decorate(fn, *args, **kw): - if predicate(): - if reason: - msg = "'%s' : %s" % ( - fn.__name__, - reason - ) + def __call__(self, fn): + @decorator + def decorate(fn, *args, **kw): + if not self.predicate(): + return fn(*args, **kw) else: - msg = "'%s': %s" % ( - fn.__name__, predicate - ) - raise SkipTest(msg) - else: - return fn(*args, **kw) - return decorate + try: + fn(*args, **kw) + except Exception, ex: + print ("'%s' failed as expected (%s): %s " % ( + fn.__name__, self.predicate, str(ex))) + return True + else: + raise AssertionError( + "Unexpected success for '%s' (%s)" % + (fn.__name__, self.predicate)) + return decorate(fn) + +class skip_if(object): + def __init__(self, predicate, reason=None): + self.predicate = _as_predicate(predicate) + self.reason = reason + + @property + def enabled(self): + return not self.predicate() + + def __call__(self, fn): + @decorator + def decorate(fn, *args, **kw): + if self.predicate(): + if self.reason: + msg = "'%s' : %s" % ( + fn.__name__, + self.reason + ) + else: + msg = "'%s': %s" % ( + fn.__name__, self.predicate + ) + raise SkipTest(msg) + else: + return fn(*args, **kw) + return decorate(fn) def only_if(predicate, reason=None): predicate = _as_predicate(predicate) @@ -69,6 +83,23 @@ class Predicate(object): else: assert False, "unknown predicate type: %s" % predicate +class BooleanPredicate(Predicate): + def __init__(self, value, description=None): + self.value = value + self.description = description + + def __call__(self): + return self.value + + def _as_string(self, negate=False): + if negate: + return "not " + self.description + else: + return self.description + + def __str__(self): + return self._as_string() + class SpecPredicate(Predicate): def __init__(self, db, op=None, spec=None, description=None): self.db = db @@ -232,8 +263,11 @@ def db_spec(*dbs): Predicate.as_predicate(db) for db in dbs ) -def open(fn): - return fn +def open(): + return skip_if(BooleanPredicate(False)) + +def closed(): + return skip_if(BooleanPredicate(True)) @decorator def future(fn, *args, **kw): diff --git a/lib/sqlalchemy/testing/plugin/config.py b/lib/sqlalchemy/testing/plugin/config.py index 946a856ad..6c9292864 100644 --- a/lib/sqlalchemy/testing/plugin/config.py +++ b/lib/sqlalchemy/testing/plugin/config.py @@ -44,19 +44,26 @@ def _engine_strategy(options, opt_str, value, parser): pre_configure = [] post_configure = [] - +def pre(fn): + pre_configure.append(fn) + return fn +def post(fn): + post_configure.append(fn) + return fn + +@pre def _setup_options(opt, file_config): global options options = opt -pre_configure.append(_setup_options) +@pre def _monkeypatch_cdecimal(options, file_config): if options.cdecimal: import sys import cdecimal sys.modules['decimal'] = cdecimal -pre_configure.append(_monkeypatch_cdecimal) +@post def _engine_uri(options, file_config): global db_label, db_url @@ -73,8 +80,8 @@ def _engine_uri(options, file_config): "Unknown URI specifier '%s'. Specify --dbs for known uris." % db_label) db_url = file_config.get('db', db_label) -post_configure.append(_engine_uri) +@post def _require(options, file_config): if not(options.require or (file_config.has_section('require') and @@ -99,14 +106,14 @@ def _require(options, file_config): if seen: continue pkg_resources.require(requirement) -post_configure.append(_require) +@post def _engine_pool(options, file_config): if options.mockpool: from sqlalchemy import pool db_opts['poolclass'] = pool.AssertionPool -post_configure.append(_engine_pool) +@post def _create_testing_engine(options, file_config): from sqlalchemy.testing import engines, config from sqlalchemy import testing @@ -115,8 +122,8 @@ def _create_testing_engine(options, file_config): config.db_opts = db_opts config.db_url = db_url -post_configure.append(_create_testing_engine) +@post def _prep_testing_database(options, file_config): from sqlalchemy.testing import engines from sqlalchemy import schema @@ -137,8 +144,8 @@ def _prep_testing_database(options, file_config): md.drop_all() e.dispose() -post_configure.append(_prep_testing_database) +@post def _set_table_options(options, file_config): from sqlalchemy.testing import schema @@ -149,8 +156,8 @@ def _set_table_options(options, file_config): if options.mysql_engine: table_options['mysql_engine'] = options.mysql_engine -post_configure.append(_set_table_options) +@post def _reverse_topological(options, file_config): if options.reversetop: from sqlalchemy.orm import unitofwork, session, mapper, dependency @@ -158,8 +165,8 @@ def _reverse_topological(options, file_config): from sqlalchemy.testing.util import RandomSet topological.set = unitofwork.set = session.set = mapper.set = \ dependency.set = RandomSet -post_configure.append(_reverse_topological) +@post def _requirements(options, file_config): from sqlalchemy.testing import config from sqlalchemy import testing @@ -175,17 +182,15 @@ def _requirements(options, file_config): req_cls = getattr(mod, clsname) config.requirements = testing.requires = req_cls(db, config) -post_configure.append(_requirements) +@post def _post_setup_options(opt, file_config): from sqlalchemy.testing import config config.options = options -post_configure.append(_post_setup_options) +@post def _setup_profiling(options, file_config): from sqlalchemy.testing import profiling profiling._profile_stats = profiling.ProfileStatsFile( file_config.get('sqla_testing', 'profile_file')) -post_configure.append(_setup_profiling) - diff --git a/lib/sqlalchemy/testing/profiling.py b/lib/sqlalchemy/testing/profiling.py index be32b1d1d..a22e83cbc 100644 --- a/lib/sqlalchemy/testing/profiling.py +++ b/lib/sqlalchemy/testing/profiling.py @@ -13,25 +13,24 @@ from nose import SkipTest import pstats import time import collections -from sqlalchemy import util +from .. import util try: import cProfile except ImportError: cProfile = None -from sqlalchemy.util.compat import jython, pypy, win32 +from ..util.compat import jython, pypy, win32 _current_test = None def profiled(target=None, **target_opts): """Function profiling. - @profiled('label') + @profiled() or - @profiled('label', report=True, sort=('calls',), limit=20) + @profiled(report=True, sort=('calls',), limit=20) + + Outputs profiling info for a decorated function. - Enables profiling for a function when 'label' is targetted for - profiling. Report options can be supplied, and override the global - configuration and command-line options. """ profile_config = {'targets': set(), diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index eca883d4e..90385c391 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -3,32 +3,12 @@ Provides decorators to mark tests requiring specific feature support from the target database. -""" - -from .exclusions import \ - skip, \ - skip_if,\ - only_if,\ - only_on,\ - fails_on,\ - fails_on_everything_except,\ - fails_if,\ - SpecPredicate,\ - against - -def no_support(db, reason): - return SpecPredicate(db, description=reason) - -def exclude(db, op, spec, description=None): - return SpecPredicate(db, op, spec, description=description) +External dialect test suites should subclass SuiteRequirements +to provide specific inclusion/exlusions. +""" -def _chain_decorators_on(*decorators): - def decorate(fn): - for decorator in reversed(decorators): - fn = decorator(fn) - return fn - return decorate +from . import exclusions class Requirements(object): def __init__(self, db, config): @@ -36,3 +16,56 @@ class Requirements(object): self.config = config +class SuiteRequirements(Requirements): + + @property + def create_table(self): + """target platform can emit basic CreateTable DDL.""" + + return exclusions.open() + + @property + def drop_table(self): + """target platform can emit basic DropTable DDL.""" + + return exclusions.open() + + @property + def autoincrement_insert(self): + """target platform generates new surrogate integer primary key values + when insert() is executed, excluding the pk column.""" + + return exclusions.open() + + @property + def returning(self): + """target platform supports RETURNING.""" + + return exclusions.closed() + + @property + def dbapi_lastrowid(self): + """"target platform includes a 'lastrowid' accessor on the DBAPI + cursor object. + + """ + return exclusions.closed() + + @property + def views(self): + """Target database must support VIEWs.""" + + return exclusions.closed() + + @property + def schemas(self): + """Target database must support external schemas, and have one + named 'test_schema'.""" + + return exclusions.closed() + + @property + def sequences(self): + """Target database must support SEQUENCEs.""" + + return self.config.db.dialect.supports_sequences diff --git a/lib/sqlalchemy/testing/schema.py b/lib/sqlalchemy/testing/schema.py index 03da78c64..805c8e567 100644 --- a/lib/sqlalchemy/testing/schema.py +++ b/lib/sqlalchemy/testing/schema.py @@ -1,9 +1,6 @@ -"""Enhanced versions of schema.Table and schema.Column which establish -desired state for different backends. -""" from . import exclusions -from sqlalchemy import schema, event +from .. import schema, event from . import config __all__ = 'Table', 'Column', diff --git a/lib/sqlalchemy/testing/suite/__init__.py b/lib/sqlalchemy/testing/suite/__init__.py index e69de29bb..a92ecb469 100644 --- a/lib/sqlalchemy/testing/suite/__init__.py +++ b/lib/sqlalchemy/testing/suite/__init__.py @@ -0,0 +1,4 @@ +from .test_ddl import * +from .test_insert import * +from .test_update_delete import * +from .test_reflection import * diff --git a/lib/sqlalchemy/testing/suite/requirements.py b/lib/sqlalchemy/testing/suite/requirements.py deleted file mode 100644 index 3ea72adcd..000000000 --- a/lib/sqlalchemy/testing/suite/requirements.py +++ /dev/null @@ -1,30 +0,0 @@ -"""Requirement definitions used by the generic dialect suite. - -External dialect test suites should subclass SuiteRequirements -to provide specific inclusion/exlusions. - -""" -from ..requirements import Requirements -from .. import exclusions - - -class SuiteRequirements(Requirements): - - @property - def create_table(self): - """target platform can emit basic CreateTable DDL.""" - - return exclusions.open - - @property - def drop_table(self): - """target platform can emit basic DropTable DDL.""" - - return exclusions.open - - @property - def autoincrement_insert(self): - """target platform generates new surrogate integer primary key values - when insert() is executed, excluding the pk column.""" - - return exclusions.open diff --git a/lib/sqlalchemy/testing/suite/test_ddl.py b/lib/sqlalchemy/testing/suite/test_ddl.py index 1285c4196..c9637cd70 100644 --- a/lib/sqlalchemy/testing/suite/test_ddl.py +++ b/lib/sqlalchemy/testing/suite/test_ddl.py @@ -45,4 +45,5 @@ class TableDDLTest(fixtures.TestBase): config.db, checkfirst=False ) -__all__ = ('TableDDLTest',)
\ No newline at end of file + +__all__ = ('TableDDLTest', )
\ No newline at end of file diff --git a/lib/sqlalchemy/testing/suite/test_insert.py b/lib/sqlalchemy/testing/suite/test_insert.py new file mode 100644 index 000000000..53a70e0c6 --- /dev/null +++ b/lib/sqlalchemy/testing/suite/test_insert.py @@ -0,0 +1,111 @@ +from .. import fixtures, config +from ..config import requirements +from .. import exclusions +from ..assertions import eq_ +from .. import engines + +from sqlalchemy import Integer, String, select, util + +from ..schema import Table, Column + +class InsertSequencingTest(fixtures.TablesTest): + run_deletes = 'each' + + @classmethod + def define_tables(cls, metadata): + Table('autoinc_pk', metadata, + Column('id', Integer, primary_key=True, + test_needs_autoincrement=True), + Column('data', String(50)) + ) + + Table('manual_pk', metadata, + Column('id', Integer, primary_key=True, autoincrement=False), + Column('data', String(50)) + ) + + def _assert_round_trip(self, table): + row = config.db.execute(table.select()).first() + eq_( + row, + (1, "some data") + ) + + @requirements.autoincrement_insert + def test_autoincrement_on_insert(self): + + config.db.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + self._assert_round_trip(self.tables.autoinc_pk) + + @requirements.autoincrement_insert + def test_last_inserted_id(self): + + r = config.db.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + pk = config.db.scalar(select([self.tables.autoinc_pk.c.id])) + eq_( + r.inserted_primary_key, + [pk] + ) + + @exclusions.fails_if(lambda: util.pypy, "lastrowid not maintained after " + "connection close") + @requirements.dbapi_lastrowid + def test_native_lastrowid_autoinc(self): + r = config.db.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + lastrowid = r.lastrowid + pk = config.db.scalar(select([self.tables.autoinc_pk.c.id])) + eq_( + lastrowid, pk + ) + + +class InsertBehaviorTest(fixtures.TablesTest): + run_deletes = 'each' + + @classmethod + def define_tables(cls, metadata): + Table('autoinc_pk', metadata, + Column('id', Integer, primary_key=True, \ + test_needs_autoincrement=True), + Column('data', String(50)) + ) + + def test_autoclose_on_insert(self): + if requirements.returning.enabled: + engine = engines.testing_engine( + options={'implicit_returning': False}) + else: + engine = config.db + + + r = engine.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + assert r.closed + assert r.is_insert + assert not r.returns_rows + + @requirements.returning + def test_autoclose_on_insert_implicit_returning(self): + r = config.db.execute( + self.tables.autoinc_pk.insert(), + data="some data" + ) + assert r.closed + assert r.is_insert + assert r.returns_rows + + +__all__ = ('InsertSequencingTest', 'InsertBehaviorTest') + + diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index e69de29bb..f816895a4 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -0,0 +1,424 @@ +import sqlalchemy as sa +from sqlalchemy import exc as sa_exc +from sqlalchemy import types as sql_types +from sqlalchemy import schema +from sqlalchemy import inspect +from sqlalchemy import MetaData, Integer, String +from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.testing import engines, fixtures +from sqlalchemy.testing.schema import Table, Column +from sqlalchemy.testing import eq_, assert_raises_message +from sqlalchemy import testing +from .. import config + +metadata, users = None, None + + +class HasTableTest(fixtures.TablesTest): + @classmethod + def define_tables(cls, metadata): + Table('test_table', metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)) + ) + + def test_has_table(self): + with config.db.begin() as conn: + assert config.db.dialect.has_table(conn, "test_table") + assert not config.db.dialect.has_table(conn, "nonexistent_table") + +class HasSequenceTest(fixtures.TestBase): + __requires__ = 'sequences', + + def test_has_sequence(self): + metadata = MetaData() + Table('users', metadata, Column('user_id', sa.Integer, + sa.Sequence('user_id_seq'), primary_key=True), + Column('user_name', sa.String(40))) + metadata.create_all(bind=testing.db) + try: + eq_(testing.db.dialect.has_sequence(testing.db, + 'user_id_seq'), True) + finally: + metadata.drop_all(bind=testing.db) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + False) + + @testing.requires.schemas + def test_has_sequence_schema(self): + test_schema = 'test_schema' + s1 = sa.Sequence('user_id_seq', schema=test_schema) + s2 = sa.Sequence('user_id_seq') + testing.db.execute(schema.CreateSequence(s1)) + testing.db.execute(schema.CreateSequence(s2)) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), True) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + True) + testing.db.execute(schema.DropSequence(s1)) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + True) + testing.db.execute(schema.DropSequence(s2)) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', + schema=test_schema), False) + eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), + False) + + +def createTables(meta, schema=None): + if schema: + schema_prefix = schema + "." + else: + schema_prefix = "" + + users = Table('users', meta, + Column('user_id', sa.INT, primary_key=True), + Column('user_name', sa.VARCHAR(20), nullable=False), + Column('test1', sa.CHAR(5), nullable=False), + Column('test2', sa.Float(5), nullable=False), + Column('test3', sa.Text), + Column('test4', sa.Numeric(10, 2), nullable=False), + Column('test5', sa.Date), + Column('test5_1', sa.TIMESTAMP), + Column('parent_user_id', sa.Integer, + sa.ForeignKey('%susers.user_id' % schema_prefix)), + Column('test6', sa.Date, nullable=False), + Column('test7', sa.Text), + Column('test8', sa.LargeBinary), + Column('test_passivedefault2', sa.Integer, server_default='5'), + Column('test9', sa.LargeBinary(100)), + Column('test10', sa.Numeric(10, 2)), + schema=schema, + test_needs_fk=True, + ) + dingalings = Table("dingalings", meta, + Column('dingaling_id', sa.Integer, primary_key=True), + Column('address_id', sa.Integer, + sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)), + Column('data', sa.String(30)), + schema=schema, + test_needs_fk=True, + ) + addresses = Table('email_addresses', meta, + Column('address_id', sa.Integer), + Column('remote_user_id', sa.Integer, + sa.ForeignKey(users.c.user_id)), + Column('email_address', sa.String(20)), + sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'), + schema=schema, + test_needs_fk=True, + ) + + return (users, addresses, dingalings) + +def createIndexes(con, schema=None): + fullname = 'users' + if schema: + fullname = "%s.%s" % (schema, 'users') + query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname + con.execute(sa.sql.text(query)) + +@testing.requires.views +def _create_views(con, schema=None): + for table_name in ('users', 'email_addresses'): + fullname = table_name + if schema: + fullname = "%s.%s" % (schema, table_name) + view_name = fullname + '_v' + query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name, + fullname) + con.execute(sa.sql.text(query)) + +@testing.requires.views +def _drop_views(con, schema=None): + for table_name in ('email_addresses', 'users'): + fullname = table_name + if schema: + fullname = "%s.%s" % (schema, table_name) + view_name = fullname + '_v' + query = "DROP VIEW %s" % view_name + con.execute(sa.sql.text(query)) + +class ComponentReflectionTest(fixtures.TestBase): + + @testing.requires.schemas + def test_get_schema_names(self): + insp = inspect(testing.db) + + self.assert_('test_schema' in insp.get_schema_names()) + + def test_dialect_initialize(self): + engine = engines.testing_engine() + assert not hasattr(engine.dialect, 'default_schema_name') + inspect(engine) + assert hasattr(engine.dialect, 'default_schema_name') + + def test_get_default_schema_name(self): + insp = inspect(testing.db) + eq_(insp.default_schema_name, testing.db.dialect.default_schema_name) + + @testing.provide_metadata + def _test_get_table_names(self, schema=None, table_type='table', + order_by=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + _create_views(meta.bind, schema) + try: + insp = inspect(meta.bind) + if table_type == 'view': + table_names = insp.get_view_names(schema) + table_names.sort() + answer = ['email_addresses_v', 'users_v'] + else: + table_names = insp.get_table_names(schema, + order_by=order_by) + if order_by == 'foreign_key': + answer = ['dingalings', 'email_addresses', 'users'] + eq_(table_names, answer) + else: + answer = ['dingalings', 'email_addresses', 'users'] + eq_(sorted(table_names), answer) + finally: + _drop_views(meta.bind, schema) + + def test_get_table_names(self): + self._test_get_table_names() + + def test_get_table_names_fks(self): + self._test_get_table_names(order_by='foreign_key') + + @testing.requires.schemas + def test_get_table_names_with_schema(self): + self._test_get_table_names('test_schema') + + @testing.requires.views + def test_get_view_names(self): + self._test_get_table_names(table_type='view') + + @testing.requires.schemas + def test_get_view_names_with_schema(self): + self._test_get_table_names('test_schema', table_type='view') + + def _test_get_columns(self, schema=None, table_type='table'): + meta = MetaData(testing.db) + users, addresses, dingalings = createTables(meta, schema) + table_names = ['users', 'email_addresses'] + meta.create_all() + if table_type == 'view': + _create_views(meta.bind, schema) + table_names = ['users_v', 'email_addresses_v'] + try: + insp = inspect(meta.bind) + for table_name, table in zip(table_names, (users, + addresses)): + schema_name = schema + cols = insp.get_columns(table_name, schema=schema_name) + self.assert_(len(cols) > 0, len(cols)) + + # should be in order + + for i, col in enumerate(table.columns): + eq_(col.name, cols[i]['name']) + ctype = cols[i]['type'].__class__ + ctype_def = col.type + if isinstance(ctype_def, sa.types.TypeEngine): + ctype_def = ctype_def.__class__ + + # Oracle returns Date for DateTime. + + if testing.against('oracle') and ctype_def \ + in (sql_types.Date, sql_types.DateTime): + ctype_def = sql_types.Date + + # assert that the desired type and return type share + # a base within one of the generic types. + + self.assert_(len(set(ctype.__mro__). + intersection(ctype_def.__mro__).intersection([ + sql_types.Integer, + sql_types.Numeric, + sql_types.DateTime, + sql_types.Date, + sql_types.Time, + sql_types.String, + sql_types._Binary, + ])) > 0, '%s(%s), %s(%s)' % (col.name, + col.type, cols[i]['name'], ctype)) + finally: + if table_type == 'view': + _drop_views(meta.bind, schema) + meta.drop_all() + + def test_get_columns(self): + self._test_get_columns() + + @testing.requires.schemas + def test_get_columns_with_schema(self): + self._test_get_columns(schema='test_schema') + + @testing.requires.views + def test_get_view_columns(self): + self._test_get_columns(table_type='view') + + @testing.requires.views + @testing.requires.schemas + def test_get_view_columns_with_schema(self): + self._test_get_columns(schema='test_schema', table_type='view') + + @testing.provide_metadata + def _test_get_pk_constraint(self, schema=None): + meta = self.metadata + users, addresses, _ = createTables(meta, schema) + meta.create_all() + insp = inspect(meta.bind) + + users_cons = insp.get_pk_constraint(users.name, schema=schema) + users_pkeys = users_cons['constrained_columns'] + eq_(users_pkeys, ['user_id']) + + addr_cons = insp.get_pk_constraint(addresses.name, schema=schema) + addr_pkeys = addr_cons['constrained_columns'] + eq_(addr_pkeys, ['address_id']) + + @testing.requires.reflects_pk_names + def go(): + eq_(addr_cons['name'], 'email_ad_pk') + go() + + def test_get_pk_constraint(self): + self._test_get_pk_constraint() + + @testing.fails_on('sqlite', 'no schemas') + def test_get_pk_constraint_with_schema(self): + self._test_get_pk_constraint(schema='test_schema') + + @testing.provide_metadata + def test_deprecated_get_primary_keys(self): + meta = self.metadata + users, _, _ = createTables(meta, schema=None) + meta.create_all() + insp = Inspector(meta.bind) + assert_raises_message( + sa_exc.SADeprecationWarning, + "Call to deprecated method get_primary_keys." + " Use get_pk_constraint instead.", + insp.get_primary_keys, users.name + ) + + @testing.provide_metadata + def _test_get_foreign_keys(self, schema=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + insp = inspect(meta.bind) + expected_schema = schema + # users + users_fkeys = insp.get_foreign_keys(users.name, + schema=schema) + fkey1 = users_fkeys[0] + + @testing.fails_on('sqlite', 'no support for constraint names') + def go(): + self.assert_(fkey1['name'] is not None) + go() + + eq_(fkey1['referred_schema'], expected_schema) + eq_(fkey1['referred_table'], users.name) + eq_(fkey1['referred_columns'], ['user_id', ]) + eq_(fkey1['constrained_columns'], ['parent_user_id']) + #addresses + addr_fkeys = insp.get_foreign_keys(addresses.name, + schema=schema) + fkey1 = addr_fkeys[0] + @testing.fails_on('sqlite', 'no support for constraint names') + def go(): + self.assert_(fkey1['name'] is not None) + go() + eq_(fkey1['referred_schema'], expected_schema) + eq_(fkey1['referred_table'], users.name) + eq_(fkey1['referred_columns'], ['user_id', ]) + eq_(fkey1['constrained_columns'], ['remote_user_id']) + + def test_get_foreign_keys(self): + self._test_get_foreign_keys() + + @testing.requires.schemas + def test_get_foreign_keys_with_schema(self): + self._test_get_foreign_keys(schema='test_schema') + + @testing.provide_metadata + def _test_get_indexes(self, schema=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + createIndexes(meta.bind, schema) + # The database may decide to create indexes for foreign keys, etc. + # so there may be more indexes than expected. + insp = inspect(meta.bind) + indexes = insp.get_indexes('users', schema=schema) + expected_indexes = [ + {'unique': False, + 'column_names': ['test1', 'test2'], + 'name': 'users_t_idx'}] + index_names = [d['name'] for d in indexes] + for e_index in expected_indexes: + assert e_index['name'] in index_names + index = indexes[index_names.index(e_index['name'])] + for key in e_index: + eq_(e_index[key], index[key]) + + def test_get_indexes(self): + self._test_get_indexes() + + @testing.requires.schemas + def test_get_indexes_with_schema(self): + self._test_get_indexes(schema='test_schema') + + @testing.provide_metadata + def _test_get_view_definition(self, schema=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + _create_views(meta.bind, schema) + view_name1 = 'users_v' + view_name2 = 'email_addresses_v' + try: + insp = inspect(meta.bind) + v1 = insp.get_view_definition(view_name1, schema=schema) + self.assert_(v1) + v2 = insp.get_view_definition(view_name2, schema=schema) + self.assert_(v2) + finally: + _drop_views(meta.bind, schema) + + @testing.requires.views + def test_get_view_definition(self): + self._test_get_view_definition() + + @testing.requires.views + @testing.requires.schemas + def test_get_view_definition_with_schema(self): + self._test_get_view_definition(schema='test_schema') + + @testing.only_on("postgresql", "PG specific feature") + @testing.provide_metadata + def _test_get_table_oid(self, table_name, schema=None): + meta = self.metadata + users, addresses, dingalings = createTables(meta, schema) + meta.create_all() + insp = inspect(meta.bind) + oid = insp.get_table_oid(table_name, schema) + self.assert_(isinstance(oid, (int, long))) + + def test_get_table_oid(self): + self._test_get_table_oid('users') + + @testing.requires.schemas + def test_get_table_oid_with_schema(self): + self._test_get_table_oid('users', schema='test_schema') + + +__all__ = ('ComponentReflectionTest', 'HasSequenceTest', 'HasTableTest')
\ No newline at end of file diff --git a/lib/sqlalchemy/testing/suite/test_sequencing.py b/lib/sqlalchemy/testing/suite/test_sequencing.py deleted file mode 100644 index 7b09ecb76..000000000 --- a/lib/sqlalchemy/testing/suite/test_sequencing.py +++ /dev/null @@ -1,36 +0,0 @@ -from .. import fixtures, config, util -from ..config import requirements -from ..assertions import eq_ - -from sqlalchemy import Table, Column, Integer, String - - -class InsertSequencingTest(fixtures.TablesTest): - run_deletes = 'each' - - @classmethod - def define_tables(cls, metadata): - Table('plain_pk', metadata, - Column('id', Integer, primary_key=True), - Column('data', String(50)) - ) - - def _assert_round_trip(self, table): - row = config.db.execute(table.select()).first() - eq_( - row, - (1, "some data") - ) - - @requirements.autoincrement_insert - def test_autoincrement_on_insert(self): - - config.db.execute( - self.tables.plain_pk.insert(), - data="some data" - ) - self._assert_round_trip(self.tables.plain_pk) - - - -__all__ = ('InsertSequencingTest',)
\ No newline at end of file diff --git a/lib/sqlalchemy/testing/suite/test_update_delete.py b/lib/sqlalchemy/testing/suite/test_update_delete.py new file mode 100644 index 000000000..e73b05485 --- /dev/null +++ b/lib/sqlalchemy/testing/suite/test_update_delete.py @@ -0,0 +1,64 @@ +from .. import fixtures, config +from ..config import requirements +from ..assertions import eq_ +from .. import engines + +from sqlalchemy import Integer, String, select +from ..schema import Table, Column + + +class SimpleUpdateDeleteTest(fixtures.TablesTest): + run_deletes = 'each' + + @classmethod + def define_tables(cls, metadata): + Table('plain_pk', metadata, + Column('id', Integer, primary_key=True), + Column('data', String(50)) + ) + + @classmethod + def insert_data(cls): + config.db.execute( + cls.tables.plain_pk.insert(), + [ + {"id":1, "data":"d1"}, + {"id":2, "data":"d2"}, + {"id":3, "data":"d3"}, + ] + ) + + def test_update(self): + t = self.tables.plain_pk + r = config.db.execute( + t.update().where(t.c.id == 2), + data="d2_new" + ) + assert not r.is_insert + assert not r.returns_rows + + eq_( + config.db.execute(t.select().order_by(t.c.id)).fetchall(), + [ + (1, "d1"), + (2, "d2_new"), + (3, "d3") + ] + ) + + def test_delete(self): + t = self.tables.plain_pk + r = config.db.execute( + t.delete().where(t.c.id == 2) + ) + assert not r.is_insert + assert not r.returns_rows + eq_( + config.db.execute(t.select().order_by(t.c.id)).fetchall(), + [ + (1, "d1"), + (3, "d3") + ] + ) + +__all__ = ('SimpleUpdateDeleteTest', )
\ No newline at end of file diff --git a/lib/sqlalchemy/testing/util.py b/lib/sqlalchemy/testing/util.py index 625b9e6a5..a02053dfb 100644 --- a/lib/sqlalchemy/testing/util.py +++ b/lib/sqlalchemy/testing/util.py @@ -1,5 +1,5 @@ -from sqlalchemy.util import jython, pypy, defaultdict, decorator -from sqlalchemy.util.compat import decimal +from ..util import jython, pypy, defaultdict, decorator +from ..util.compat import decimal import gc import time diff --git a/lib/sqlalchemy/testing/warnings.py b/lib/sqlalchemy/testing/warnings.py index 799fca128..7afcc63c5 100644 --- a/lib/sqlalchemy/testing/warnings.py +++ b/lib/sqlalchemy/testing/warnings.py @@ -1,8 +1,8 @@ from __future__ import absolute_import import warnings -from sqlalchemy import exc as sa_exc -from sqlalchemy import util +from .. import exc as sa_exc +from .. import util def testing_warn(msg, stacklevel=3): """Replaces sqlalchemy.util.warn during tests.""" |