summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/testing')
-rw-r--r--lib/sqlalchemy/testing/assertsql.py14
-rw-r--r--lib/sqlalchemy/testing/engines.py2
-rw-r--r--lib/sqlalchemy/testing/exclusions.py114
-rw-r--r--lib/sqlalchemy/testing/plugin/config.py33
-rw-r--r--lib/sqlalchemy/testing/profiling.py13
-rw-r--r--lib/sqlalchemy/testing/requirements.py81
-rw-r--r--lib/sqlalchemy/testing/schema.py5
-rw-r--r--lib/sqlalchemy/testing/suite/__init__.py4
-rw-r--r--lib/sqlalchemy/testing/suite/requirements.py30
-rw-r--r--lib/sqlalchemy/testing/suite/test_ddl.py3
-rw-r--r--lib/sqlalchemy/testing/suite/test_insert.py111
-rw-r--r--lib/sqlalchemy/testing/suite/test_reflection.py424
-rw-r--r--lib/sqlalchemy/testing/suite/test_sequencing.py36
-rw-r--r--lib/sqlalchemy/testing/suite/test_update_delete.py64
-rw-r--r--lib/sqlalchemy/testing/util.py4
-rw-r--r--lib/sqlalchemy/testing/warnings.py4
16 files changed, 773 insertions, 169 deletions
diff --git a/lib/sqlalchemy/testing/assertsql.py b/lib/sqlalchemy/testing/assertsql.py
index 897f4b3b1..08ee55d57 100644
--- a/lib/sqlalchemy/testing/assertsql.py
+++ b/lib/sqlalchemy/testing/assertsql.py
@@ -1,8 +1,6 @@
-from sqlalchemy.interfaces import ConnectionProxy
-from sqlalchemy.engine.default import DefaultDialect
-from sqlalchemy.engine.base import Connection
-from sqlalchemy import util
+from ..engine.default import DefaultDialect
+from .. import util
import re
class AssertRule(object):
@@ -262,16 +260,16 @@ def _process_assertion_statement(query, context):
paramstyle = context.dialect.paramstyle
if paramstyle == 'named':
pass
- elif paramstyle =='pyformat':
+ elif paramstyle == 'pyformat':
query = re.sub(r':([\w_]+)', r"%(\1)s", query)
else:
# positional params
repl = None
- if paramstyle=='qmark':
+ if paramstyle == 'qmark':
repl = "?"
- elif paramstyle=='format':
+ elif paramstyle == 'format':
repl = r"%s"
- elif paramstyle=='numeric':
+ elif paramstyle == 'numeric':
repl = None
query = re.sub(r':([\w_]+)', repl, query)
diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py
index f7401550e..74e22adf1 100644
--- a/lib/sqlalchemy/testing/engines.py
+++ b/lib/sqlalchemy/testing/engines.py
@@ -5,7 +5,7 @@ import weakref
from collections import deque
from . import config
from .util import decorator
-from sqlalchemy import event, pool
+from .. import event, pool
import re
import warnings
diff --git a/lib/sqlalchemy/testing/exclusions.py b/lib/sqlalchemy/testing/exclusions.py
index ba2eebe4f..96dd0d693 100644
--- a/lib/sqlalchemy/testing/exclusions.py
+++ b/lib/sqlalchemy/testing/exclusions.py
@@ -1,49 +1,63 @@
import operator
from nose import SkipTest
-from sqlalchemy.util import decorator
+from ..util import decorator
from . import config
-from sqlalchemy import util
+from .. import util
-def fails_if(predicate, reason=None):
- predicate = _as_predicate(predicate)
-
- @decorator
- def decorate(fn, *args, **kw):
- if not predicate():
- return fn(*args, **kw)
- else:
- try:
- fn(*args, **kw)
- except Exception, ex:
- print ("'%s' failed as expected (%s): %s " % (
- fn.__name__, predicate, str(ex)))
- return True
- else:
- raise AssertionError(
- "Unexpected success for '%s' (%s)" %
- (fn.__name__, predicate))
- return decorate
+class fails_if(object):
+ def __init__(self, predicate, reason=None):
+ self.predicate = _as_predicate(predicate)
+ self.reason = reason
-def skip_if(predicate, reason=None):
- predicate = _as_predicate(predicate)
+ @property
+ def enabled(self):
+ return not self.predicate()
- @decorator
- def decorate(fn, *args, **kw):
- if predicate():
- if reason:
- msg = "'%s' : %s" % (
- fn.__name__,
- reason
- )
+ def __call__(self, fn):
+ @decorator
+ def decorate(fn, *args, **kw):
+ if not self.predicate():
+ return fn(*args, **kw)
else:
- msg = "'%s': %s" % (
- fn.__name__, predicate
- )
- raise SkipTest(msg)
- else:
- return fn(*args, **kw)
- return decorate
+ try:
+ fn(*args, **kw)
+ except Exception, ex:
+ print ("'%s' failed as expected (%s): %s " % (
+ fn.__name__, self.predicate, str(ex)))
+ return True
+ else:
+ raise AssertionError(
+ "Unexpected success for '%s' (%s)" %
+ (fn.__name__, self.predicate))
+ return decorate(fn)
+
+class skip_if(object):
+ def __init__(self, predicate, reason=None):
+ self.predicate = _as_predicate(predicate)
+ self.reason = reason
+
+ @property
+ def enabled(self):
+ return not self.predicate()
+
+ def __call__(self, fn):
+ @decorator
+ def decorate(fn, *args, **kw):
+ if self.predicate():
+ if self.reason:
+ msg = "'%s' : %s" % (
+ fn.__name__,
+ self.reason
+ )
+ else:
+ msg = "'%s': %s" % (
+ fn.__name__, self.predicate
+ )
+ raise SkipTest(msg)
+ else:
+ return fn(*args, **kw)
+ return decorate(fn)
def only_if(predicate, reason=None):
predicate = _as_predicate(predicate)
@@ -69,6 +83,23 @@ class Predicate(object):
else:
assert False, "unknown predicate type: %s" % predicate
+class BooleanPredicate(Predicate):
+ def __init__(self, value, description=None):
+ self.value = value
+ self.description = description
+
+ def __call__(self):
+ return self.value
+
+ def _as_string(self, negate=False):
+ if negate:
+ return "not " + self.description
+ else:
+ return self.description
+
+ def __str__(self):
+ return self._as_string()
+
class SpecPredicate(Predicate):
def __init__(self, db, op=None, spec=None, description=None):
self.db = db
@@ -232,8 +263,11 @@ def db_spec(*dbs):
Predicate.as_predicate(db) for db in dbs
)
-def open(fn):
- return fn
+def open():
+ return skip_if(BooleanPredicate(False))
+
+def closed():
+ return skip_if(BooleanPredicate(True))
@decorator
def future(fn, *args, **kw):
diff --git a/lib/sqlalchemy/testing/plugin/config.py b/lib/sqlalchemy/testing/plugin/config.py
index 946a856ad..6c9292864 100644
--- a/lib/sqlalchemy/testing/plugin/config.py
+++ b/lib/sqlalchemy/testing/plugin/config.py
@@ -44,19 +44,26 @@ def _engine_strategy(options, opt_str, value, parser):
pre_configure = []
post_configure = []
-
+def pre(fn):
+ pre_configure.append(fn)
+ return fn
+def post(fn):
+ post_configure.append(fn)
+ return fn
+
+@pre
def _setup_options(opt, file_config):
global options
options = opt
-pre_configure.append(_setup_options)
+@pre
def _monkeypatch_cdecimal(options, file_config):
if options.cdecimal:
import sys
import cdecimal
sys.modules['decimal'] = cdecimal
-pre_configure.append(_monkeypatch_cdecimal)
+@post
def _engine_uri(options, file_config):
global db_label, db_url
@@ -73,8 +80,8 @@ def _engine_uri(options, file_config):
"Unknown URI specifier '%s'. Specify --dbs for known uris."
% db_label)
db_url = file_config.get('db', db_label)
-post_configure.append(_engine_uri)
+@post
def _require(options, file_config):
if not(options.require or
(file_config.has_section('require') and
@@ -99,14 +106,14 @@ def _require(options, file_config):
if seen:
continue
pkg_resources.require(requirement)
-post_configure.append(_require)
+@post
def _engine_pool(options, file_config):
if options.mockpool:
from sqlalchemy import pool
db_opts['poolclass'] = pool.AssertionPool
-post_configure.append(_engine_pool)
+@post
def _create_testing_engine(options, file_config):
from sqlalchemy.testing import engines, config
from sqlalchemy import testing
@@ -115,8 +122,8 @@ def _create_testing_engine(options, file_config):
config.db_opts = db_opts
config.db_url = db_url
-post_configure.append(_create_testing_engine)
+@post
def _prep_testing_database(options, file_config):
from sqlalchemy.testing import engines
from sqlalchemy import schema
@@ -137,8 +144,8 @@ def _prep_testing_database(options, file_config):
md.drop_all()
e.dispose()
-post_configure.append(_prep_testing_database)
+@post
def _set_table_options(options, file_config):
from sqlalchemy.testing import schema
@@ -149,8 +156,8 @@ def _set_table_options(options, file_config):
if options.mysql_engine:
table_options['mysql_engine'] = options.mysql_engine
-post_configure.append(_set_table_options)
+@post
def _reverse_topological(options, file_config):
if options.reversetop:
from sqlalchemy.orm import unitofwork, session, mapper, dependency
@@ -158,8 +165,8 @@ def _reverse_topological(options, file_config):
from sqlalchemy.testing.util import RandomSet
topological.set = unitofwork.set = session.set = mapper.set = \
dependency.set = RandomSet
-post_configure.append(_reverse_topological)
+@post
def _requirements(options, file_config):
from sqlalchemy.testing import config
from sqlalchemy import testing
@@ -175,17 +182,15 @@ def _requirements(options, file_config):
req_cls = getattr(mod, clsname)
config.requirements = testing.requires = req_cls(db, config)
-post_configure.append(_requirements)
+@post
def _post_setup_options(opt, file_config):
from sqlalchemy.testing import config
config.options = options
-post_configure.append(_post_setup_options)
+@post
def _setup_profiling(options, file_config):
from sqlalchemy.testing import profiling
profiling._profile_stats = profiling.ProfileStatsFile(
file_config.get('sqla_testing', 'profile_file'))
-post_configure.append(_setup_profiling)
-
diff --git a/lib/sqlalchemy/testing/profiling.py b/lib/sqlalchemy/testing/profiling.py
index be32b1d1d..a22e83cbc 100644
--- a/lib/sqlalchemy/testing/profiling.py
+++ b/lib/sqlalchemy/testing/profiling.py
@@ -13,25 +13,24 @@ from nose import SkipTest
import pstats
import time
import collections
-from sqlalchemy import util
+from .. import util
try:
import cProfile
except ImportError:
cProfile = None
-from sqlalchemy.util.compat import jython, pypy, win32
+from ..util.compat import jython, pypy, win32
_current_test = None
def profiled(target=None, **target_opts):
"""Function profiling.
- @profiled('label')
+ @profiled()
or
- @profiled('label', report=True, sort=('calls',), limit=20)
+ @profiled(report=True, sort=('calls',), limit=20)
+
+ Outputs profiling info for a decorated function.
- Enables profiling for a function when 'label' is targetted for
- profiling. Report options can be supplied, and override the global
- configuration and command-line options.
"""
profile_config = {'targets': set(),
diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py
index eca883d4e..90385c391 100644
--- a/lib/sqlalchemy/testing/requirements.py
+++ b/lib/sqlalchemy/testing/requirements.py
@@ -3,32 +3,12 @@
Provides decorators to mark tests requiring specific feature support from the
target database.
-"""
-
-from .exclusions import \
- skip, \
- skip_if,\
- only_if,\
- only_on,\
- fails_on,\
- fails_on_everything_except,\
- fails_if,\
- SpecPredicate,\
- against
-
-def no_support(db, reason):
- return SpecPredicate(db, description=reason)
-
-def exclude(db, op, spec, description=None):
- return SpecPredicate(db, op, spec, description=description)
+External dialect test suites should subclass SuiteRequirements
+to provide specific inclusion/exlusions.
+"""
-def _chain_decorators_on(*decorators):
- def decorate(fn):
- for decorator in reversed(decorators):
- fn = decorator(fn)
- return fn
- return decorate
+from . import exclusions
class Requirements(object):
def __init__(self, db, config):
@@ -36,3 +16,56 @@ class Requirements(object):
self.config = config
+class SuiteRequirements(Requirements):
+
+ @property
+ def create_table(self):
+ """target platform can emit basic CreateTable DDL."""
+
+ return exclusions.open()
+
+ @property
+ def drop_table(self):
+ """target platform can emit basic DropTable DDL."""
+
+ return exclusions.open()
+
+ @property
+ def autoincrement_insert(self):
+ """target platform generates new surrogate integer primary key values
+ when insert() is executed, excluding the pk column."""
+
+ return exclusions.open()
+
+ @property
+ def returning(self):
+ """target platform supports RETURNING."""
+
+ return exclusions.closed()
+
+ @property
+ def dbapi_lastrowid(self):
+ """"target platform includes a 'lastrowid' accessor on the DBAPI
+ cursor object.
+
+ """
+ return exclusions.closed()
+
+ @property
+ def views(self):
+ """Target database must support VIEWs."""
+
+ return exclusions.closed()
+
+ @property
+ def schemas(self):
+ """Target database must support external schemas, and have one
+ named 'test_schema'."""
+
+ return exclusions.closed()
+
+ @property
+ def sequences(self):
+ """Target database must support SEQUENCEs."""
+
+ return self.config.db.dialect.supports_sequences
diff --git a/lib/sqlalchemy/testing/schema.py b/lib/sqlalchemy/testing/schema.py
index 03da78c64..805c8e567 100644
--- a/lib/sqlalchemy/testing/schema.py
+++ b/lib/sqlalchemy/testing/schema.py
@@ -1,9 +1,6 @@
-"""Enhanced versions of schema.Table and schema.Column which establish
-desired state for different backends.
-"""
from . import exclusions
-from sqlalchemy import schema, event
+from .. import schema, event
from . import config
__all__ = 'Table', 'Column',
diff --git a/lib/sqlalchemy/testing/suite/__init__.py b/lib/sqlalchemy/testing/suite/__init__.py
index e69de29bb..a92ecb469 100644
--- a/lib/sqlalchemy/testing/suite/__init__.py
+++ b/lib/sqlalchemy/testing/suite/__init__.py
@@ -0,0 +1,4 @@
+from .test_ddl import *
+from .test_insert import *
+from .test_update_delete import *
+from .test_reflection import *
diff --git a/lib/sqlalchemy/testing/suite/requirements.py b/lib/sqlalchemy/testing/suite/requirements.py
deleted file mode 100644
index 3ea72adcd..000000000
--- a/lib/sqlalchemy/testing/suite/requirements.py
+++ /dev/null
@@ -1,30 +0,0 @@
-"""Requirement definitions used by the generic dialect suite.
-
-External dialect test suites should subclass SuiteRequirements
-to provide specific inclusion/exlusions.
-
-"""
-from ..requirements import Requirements
-from .. import exclusions
-
-
-class SuiteRequirements(Requirements):
-
- @property
- def create_table(self):
- """target platform can emit basic CreateTable DDL."""
-
- return exclusions.open
-
- @property
- def drop_table(self):
- """target platform can emit basic DropTable DDL."""
-
- return exclusions.open
-
- @property
- def autoincrement_insert(self):
- """target platform generates new surrogate integer primary key values
- when insert() is executed, excluding the pk column."""
-
- return exclusions.open
diff --git a/lib/sqlalchemy/testing/suite/test_ddl.py b/lib/sqlalchemy/testing/suite/test_ddl.py
index 1285c4196..c9637cd70 100644
--- a/lib/sqlalchemy/testing/suite/test_ddl.py
+++ b/lib/sqlalchemy/testing/suite/test_ddl.py
@@ -45,4 +45,5 @@ class TableDDLTest(fixtures.TestBase):
config.db, checkfirst=False
)
-__all__ = ('TableDDLTest',) \ No newline at end of file
+
+__all__ = ('TableDDLTest', ) \ No newline at end of file
diff --git a/lib/sqlalchemy/testing/suite/test_insert.py b/lib/sqlalchemy/testing/suite/test_insert.py
new file mode 100644
index 000000000..53a70e0c6
--- /dev/null
+++ b/lib/sqlalchemy/testing/suite/test_insert.py
@@ -0,0 +1,111 @@
+from .. import fixtures, config
+from ..config import requirements
+from .. import exclusions
+from ..assertions import eq_
+from .. import engines
+
+from sqlalchemy import Integer, String, select, util
+
+from ..schema import Table, Column
+
+class InsertSequencingTest(fixtures.TablesTest):
+ run_deletes = 'each'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('autoinc_pk', metadata,
+ Column('id', Integer, primary_key=True,
+ test_needs_autoincrement=True),
+ Column('data', String(50))
+ )
+
+ Table('manual_pk', metadata,
+ Column('id', Integer, primary_key=True, autoincrement=False),
+ Column('data', String(50))
+ )
+
+ def _assert_round_trip(self, table):
+ row = config.db.execute(table.select()).first()
+ eq_(
+ row,
+ (1, "some data")
+ )
+
+ @requirements.autoincrement_insert
+ def test_autoincrement_on_insert(self):
+
+ config.db.execute(
+ self.tables.autoinc_pk.insert(),
+ data="some data"
+ )
+ self._assert_round_trip(self.tables.autoinc_pk)
+
+ @requirements.autoincrement_insert
+ def test_last_inserted_id(self):
+
+ r = config.db.execute(
+ self.tables.autoinc_pk.insert(),
+ data="some data"
+ )
+ pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+ eq_(
+ r.inserted_primary_key,
+ [pk]
+ )
+
+ @exclusions.fails_if(lambda: util.pypy, "lastrowid not maintained after "
+ "connection close")
+ @requirements.dbapi_lastrowid
+ def test_native_lastrowid_autoinc(self):
+ r = config.db.execute(
+ self.tables.autoinc_pk.insert(),
+ data="some data"
+ )
+ lastrowid = r.lastrowid
+ pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+ eq_(
+ lastrowid, pk
+ )
+
+
+class InsertBehaviorTest(fixtures.TablesTest):
+ run_deletes = 'each'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('autoinc_pk', metadata,
+ Column('id', Integer, primary_key=True, \
+ test_needs_autoincrement=True),
+ Column('data', String(50))
+ )
+
+ def test_autoclose_on_insert(self):
+ if requirements.returning.enabled:
+ engine = engines.testing_engine(
+ options={'implicit_returning': False})
+ else:
+ engine = config.db
+
+
+ r = engine.execute(
+ self.tables.autoinc_pk.insert(),
+ data="some data"
+ )
+ assert r.closed
+ assert r.is_insert
+ assert not r.returns_rows
+
+ @requirements.returning
+ def test_autoclose_on_insert_implicit_returning(self):
+ r = config.db.execute(
+ self.tables.autoinc_pk.insert(),
+ data="some data"
+ )
+ assert r.closed
+ assert r.is_insert
+ assert r.returns_rows
+
+
+__all__ = ('InsertSequencingTest', 'InsertBehaviorTest')
+
+
diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py
index e69de29bb..f816895a4 100644
--- a/lib/sqlalchemy/testing/suite/test_reflection.py
+++ b/lib/sqlalchemy/testing/suite/test_reflection.py
@@ -0,0 +1,424 @@
+import sqlalchemy as sa
+from sqlalchemy import exc as sa_exc
+from sqlalchemy import types as sql_types
+from sqlalchemy import schema
+from sqlalchemy import inspect
+from sqlalchemy import MetaData, Integer, String
+from sqlalchemy.engine.reflection import Inspector
+from sqlalchemy.testing import engines, fixtures
+from sqlalchemy.testing.schema import Table, Column
+from sqlalchemy.testing import eq_, assert_raises_message
+from sqlalchemy import testing
+from .. import config
+
+metadata, users = None, None
+
+
+class HasTableTest(fixtures.TablesTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('test_table', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50))
+ )
+
+ def test_has_table(self):
+ with config.db.begin() as conn:
+ assert config.db.dialect.has_table(conn, "test_table")
+ assert not config.db.dialect.has_table(conn, "nonexistent_table")
+
+class HasSequenceTest(fixtures.TestBase):
+ __requires__ = 'sequences',
+
+ def test_has_sequence(self):
+ metadata = MetaData()
+ Table('users', metadata, Column('user_id', sa.Integer,
+ sa.Sequence('user_id_seq'), primary_key=True),
+ Column('user_name', sa.String(40)))
+ metadata.create_all(bind=testing.db)
+ try:
+ eq_(testing.db.dialect.has_sequence(testing.db,
+ 'user_id_seq'), True)
+ finally:
+ metadata.drop_all(bind=testing.db)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+ False)
+
+ @testing.requires.schemas
+ def test_has_sequence_schema(self):
+ test_schema = 'test_schema'
+ s1 = sa.Sequence('user_id_seq', schema=test_schema)
+ s2 = sa.Sequence('user_id_seq')
+ testing.db.execute(schema.CreateSequence(s1))
+ testing.db.execute(schema.CreateSequence(s2))
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
+ schema=test_schema), True)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+ True)
+ testing.db.execute(schema.DropSequence(s1))
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
+ schema=test_schema), False)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+ True)
+ testing.db.execute(schema.DropSequence(s2))
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
+ schema=test_schema), False)
+ eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
+ False)
+
+
+def createTables(meta, schema=None):
+ if schema:
+ schema_prefix = schema + "."
+ else:
+ schema_prefix = ""
+
+ users = Table('users', meta,
+ Column('user_id', sa.INT, primary_key=True),
+ Column('user_name', sa.VARCHAR(20), nullable=False),
+ Column('test1', sa.CHAR(5), nullable=False),
+ Column('test2', sa.Float(5), nullable=False),
+ Column('test3', sa.Text),
+ Column('test4', sa.Numeric(10, 2), nullable=False),
+ Column('test5', sa.Date),
+ Column('test5_1', sa.TIMESTAMP),
+ Column('parent_user_id', sa.Integer,
+ sa.ForeignKey('%susers.user_id' % schema_prefix)),
+ Column('test6', sa.Date, nullable=False),
+ Column('test7', sa.Text),
+ Column('test8', sa.LargeBinary),
+ Column('test_passivedefault2', sa.Integer, server_default='5'),
+ Column('test9', sa.LargeBinary(100)),
+ Column('test10', sa.Numeric(10, 2)),
+ schema=schema,
+ test_needs_fk=True,
+ )
+ dingalings = Table("dingalings", meta,
+ Column('dingaling_id', sa.Integer, primary_key=True),
+ Column('address_id', sa.Integer,
+ sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
+ Column('data', sa.String(30)),
+ schema=schema,
+ test_needs_fk=True,
+ )
+ addresses = Table('email_addresses', meta,
+ Column('address_id', sa.Integer),
+ Column('remote_user_id', sa.Integer,
+ sa.ForeignKey(users.c.user_id)),
+ Column('email_address', sa.String(20)),
+ sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
+ schema=schema,
+ test_needs_fk=True,
+ )
+
+ return (users, addresses, dingalings)
+
+def createIndexes(con, schema=None):
+ fullname = 'users'
+ if schema:
+ fullname = "%s.%s" % (schema, 'users')
+ query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname
+ con.execute(sa.sql.text(query))
+
+@testing.requires.views
+def _create_views(con, schema=None):
+ for table_name in ('users', 'email_addresses'):
+ fullname = table_name
+ if schema:
+ fullname = "%s.%s" % (schema, table_name)
+ view_name = fullname + '_v'
+ query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name,
+ fullname)
+ con.execute(sa.sql.text(query))
+
+@testing.requires.views
+def _drop_views(con, schema=None):
+ for table_name in ('email_addresses', 'users'):
+ fullname = table_name
+ if schema:
+ fullname = "%s.%s" % (schema, table_name)
+ view_name = fullname + '_v'
+ query = "DROP VIEW %s" % view_name
+ con.execute(sa.sql.text(query))
+
+class ComponentReflectionTest(fixtures.TestBase):
+
+ @testing.requires.schemas
+ def test_get_schema_names(self):
+ insp = inspect(testing.db)
+
+ self.assert_('test_schema' in insp.get_schema_names())
+
+ def test_dialect_initialize(self):
+ engine = engines.testing_engine()
+ assert not hasattr(engine.dialect, 'default_schema_name')
+ inspect(engine)
+ assert hasattr(engine.dialect, 'default_schema_name')
+
+ def test_get_default_schema_name(self):
+ insp = inspect(testing.db)
+ eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
+
+ @testing.provide_metadata
+ def _test_get_table_names(self, schema=None, table_type='table',
+ order_by=None):
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
+ meta.create_all()
+ _create_views(meta.bind, schema)
+ try:
+ insp = inspect(meta.bind)
+ if table_type == 'view':
+ table_names = insp.get_view_names(schema)
+ table_names.sort()
+ answer = ['email_addresses_v', 'users_v']
+ else:
+ table_names = insp.get_table_names(schema,
+ order_by=order_by)
+ if order_by == 'foreign_key':
+ answer = ['dingalings', 'email_addresses', 'users']
+ eq_(table_names, answer)
+ else:
+ answer = ['dingalings', 'email_addresses', 'users']
+ eq_(sorted(table_names), answer)
+ finally:
+ _drop_views(meta.bind, schema)
+
+ def test_get_table_names(self):
+ self._test_get_table_names()
+
+ def test_get_table_names_fks(self):
+ self._test_get_table_names(order_by='foreign_key')
+
+ @testing.requires.schemas
+ def test_get_table_names_with_schema(self):
+ self._test_get_table_names('test_schema')
+
+ @testing.requires.views
+ def test_get_view_names(self):
+ self._test_get_table_names(table_type='view')
+
+ @testing.requires.schemas
+ def test_get_view_names_with_schema(self):
+ self._test_get_table_names('test_schema', table_type='view')
+
+ def _test_get_columns(self, schema=None, table_type='table'):
+ meta = MetaData(testing.db)
+ users, addresses, dingalings = createTables(meta, schema)
+ table_names = ['users', 'email_addresses']
+ meta.create_all()
+ if table_type == 'view':
+ _create_views(meta.bind, schema)
+ table_names = ['users_v', 'email_addresses_v']
+ try:
+ insp = inspect(meta.bind)
+ for table_name, table in zip(table_names, (users,
+ addresses)):
+ schema_name = schema
+ cols = insp.get_columns(table_name, schema=schema_name)
+ self.assert_(len(cols) > 0, len(cols))
+
+ # should be in order
+
+ for i, col in enumerate(table.columns):
+ eq_(col.name, cols[i]['name'])
+ ctype = cols[i]['type'].__class__
+ ctype_def = col.type
+ if isinstance(ctype_def, sa.types.TypeEngine):
+ ctype_def = ctype_def.__class__
+
+ # Oracle returns Date for DateTime.
+
+ if testing.against('oracle') and ctype_def \
+ in (sql_types.Date, sql_types.DateTime):
+ ctype_def = sql_types.Date
+
+ # assert that the desired type and return type share
+ # a base within one of the generic types.
+
+ self.assert_(len(set(ctype.__mro__).
+ intersection(ctype_def.__mro__).intersection([
+ sql_types.Integer,
+ sql_types.Numeric,
+ sql_types.DateTime,
+ sql_types.Date,
+ sql_types.Time,
+ sql_types.String,
+ sql_types._Binary,
+ ])) > 0, '%s(%s), %s(%s)' % (col.name,
+ col.type, cols[i]['name'], ctype))
+ finally:
+ if table_type == 'view':
+ _drop_views(meta.bind, schema)
+ meta.drop_all()
+
+ def test_get_columns(self):
+ self._test_get_columns()
+
+ @testing.requires.schemas
+ def test_get_columns_with_schema(self):
+ self._test_get_columns(schema='test_schema')
+
+ @testing.requires.views
+ def test_get_view_columns(self):
+ self._test_get_columns(table_type='view')
+
+ @testing.requires.views
+ @testing.requires.schemas
+ def test_get_view_columns_with_schema(self):
+ self._test_get_columns(schema='test_schema', table_type='view')
+
+ @testing.provide_metadata
+ def _test_get_pk_constraint(self, schema=None):
+ meta = self.metadata
+ users, addresses, _ = createTables(meta, schema)
+ meta.create_all()
+ insp = inspect(meta.bind)
+
+ users_cons = insp.get_pk_constraint(users.name, schema=schema)
+ users_pkeys = users_cons['constrained_columns']
+ eq_(users_pkeys, ['user_id'])
+
+ addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
+ addr_pkeys = addr_cons['constrained_columns']
+ eq_(addr_pkeys, ['address_id'])
+
+ @testing.requires.reflects_pk_names
+ def go():
+ eq_(addr_cons['name'], 'email_ad_pk')
+ go()
+
+ def test_get_pk_constraint(self):
+ self._test_get_pk_constraint()
+
+ @testing.fails_on('sqlite', 'no schemas')
+ def test_get_pk_constraint_with_schema(self):
+ self._test_get_pk_constraint(schema='test_schema')
+
+ @testing.provide_metadata
+ def test_deprecated_get_primary_keys(self):
+ meta = self.metadata
+ users, _, _ = createTables(meta, schema=None)
+ meta.create_all()
+ insp = Inspector(meta.bind)
+ assert_raises_message(
+ sa_exc.SADeprecationWarning,
+ "Call to deprecated method get_primary_keys."
+ " Use get_pk_constraint instead.",
+ insp.get_primary_keys, users.name
+ )
+
+ @testing.provide_metadata
+ def _test_get_foreign_keys(self, schema=None):
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
+ meta.create_all()
+ insp = inspect(meta.bind)
+ expected_schema = schema
+ # users
+ users_fkeys = insp.get_foreign_keys(users.name,
+ schema=schema)
+ fkey1 = users_fkeys[0]
+
+ @testing.fails_on('sqlite', 'no support for constraint names')
+ def go():
+ self.assert_(fkey1['name'] is not None)
+ go()
+
+ eq_(fkey1['referred_schema'], expected_schema)
+ eq_(fkey1['referred_table'], users.name)
+ eq_(fkey1['referred_columns'], ['user_id', ])
+ eq_(fkey1['constrained_columns'], ['parent_user_id'])
+ #addresses
+ addr_fkeys = insp.get_foreign_keys(addresses.name,
+ schema=schema)
+ fkey1 = addr_fkeys[0]
+ @testing.fails_on('sqlite', 'no support for constraint names')
+ def go():
+ self.assert_(fkey1['name'] is not None)
+ go()
+ eq_(fkey1['referred_schema'], expected_schema)
+ eq_(fkey1['referred_table'], users.name)
+ eq_(fkey1['referred_columns'], ['user_id', ])
+ eq_(fkey1['constrained_columns'], ['remote_user_id'])
+
+ def test_get_foreign_keys(self):
+ self._test_get_foreign_keys()
+
+ @testing.requires.schemas
+ def test_get_foreign_keys_with_schema(self):
+ self._test_get_foreign_keys(schema='test_schema')
+
+ @testing.provide_metadata
+ def _test_get_indexes(self, schema=None):
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
+ meta.create_all()
+ createIndexes(meta.bind, schema)
+ # The database may decide to create indexes for foreign keys, etc.
+ # so there may be more indexes than expected.
+ insp = inspect(meta.bind)
+ indexes = insp.get_indexes('users', schema=schema)
+ expected_indexes = [
+ {'unique': False,
+ 'column_names': ['test1', 'test2'],
+ 'name': 'users_t_idx'}]
+ index_names = [d['name'] for d in indexes]
+ for e_index in expected_indexes:
+ assert e_index['name'] in index_names
+ index = indexes[index_names.index(e_index['name'])]
+ for key in e_index:
+ eq_(e_index[key], index[key])
+
+ def test_get_indexes(self):
+ self._test_get_indexes()
+
+ @testing.requires.schemas
+ def test_get_indexes_with_schema(self):
+ self._test_get_indexes(schema='test_schema')
+
+ @testing.provide_metadata
+ def _test_get_view_definition(self, schema=None):
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
+ meta.create_all()
+ _create_views(meta.bind, schema)
+ view_name1 = 'users_v'
+ view_name2 = 'email_addresses_v'
+ try:
+ insp = inspect(meta.bind)
+ v1 = insp.get_view_definition(view_name1, schema=schema)
+ self.assert_(v1)
+ v2 = insp.get_view_definition(view_name2, schema=schema)
+ self.assert_(v2)
+ finally:
+ _drop_views(meta.bind, schema)
+
+ @testing.requires.views
+ def test_get_view_definition(self):
+ self._test_get_view_definition()
+
+ @testing.requires.views
+ @testing.requires.schemas
+ def test_get_view_definition_with_schema(self):
+ self._test_get_view_definition(schema='test_schema')
+
+ @testing.only_on("postgresql", "PG specific feature")
+ @testing.provide_metadata
+ def _test_get_table_oid(self, table_name, schema=None):
+ meta = self.metadata
+ users, addresses, dingalings = createTables(meta, schema)
+ meta.create_all()
+ insp = inspect(meta.bind)
+ oid = insp.get_table_oid(table_name, schema)
+ self.assert_(isinstance(oid, (int, long)))
+
+ def test_get_table_oid(self):
+ self._test_get_table_oid('users')
+
+ @testing.requires.schemas
+ def test_get_table_oid_with_schema(self):
+ self._test_get_table_oid('users', schema='test_schema')
+
+
+__all__ = ('ComponentReflectionTest', 'HasSequenceTest', 'HasTableTest') \ No newline at end of file
diff --git a/lib/sqlalchemy/testing/suite/test_sequencing.py b/lib/sqlalchemy/testing/suite/test_sequencing.py
deleted file mode 100644
index 7b09ecb76..000000000
--- a/lib/sqlalchemy/testing/suite/test_sequencing.py
+++ /dev/null
@@ -1,36 +0,0 @@
-from .. import fixtures, config, util
-from ..config import requirements
-from ..assertions import eq_
-
-from sqlalchemy import Table, Column, Integer, String
-
-
-class InsertSequencingTest(fixtures.TablesTest):
- run_deletes = 'each'
-
- @classmethod
- def define_tables(cls, metadata):
- Table('plain_pk', metadata,
- Column('id', Integer, primary_key=True),
- Column('data', String(50))
- )
-
- def _assert_round_trip(self, table):
- row = config.db.execute(table.select()).first()
- eq_(
- row,
- (1, "some data")
- )
-
- @requirements.autoincrement_insert
- def test_autoincrement_on_insert(self):
-
- config.db.execute(
- self.tables.plain_pk.insert(),
- data="some data"
- )
- self._assert_round_trip(self.tables.plain_pk)
-
-
-
-__all__ = ('InsertSequencingTest',) \ No newline at end of file
diff --git a/lib/sqlalchemy/testing/suite/test_update_delete.py b/lib/sqlalchemy/testing/suite/test_update_delete.py
new file mode 100644
index 000000000..e73b05485
--- /dev/null
+++ b/lib/sqlalchemy/testing/suite/test_update_delete.py
@@ -0,0 +1,64 @@
+from .. import fixtures, config
+from ..config import requirements
+from ..assertions import eq_
+from .. import engines
+
+from sqlalchemy import Integer, String, select
+from ..schema import Table, Column
+
+
+class SimpleUpdateDeleteTest(fixtures.TablesTest):
+ run_deletes = 'each'
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table('plain_pk', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50))
+ )
+
+ @classmethod
+ def insert_data(cls):
+ config.db.execute(
+ cls.tables.plain_pk.insert(),
+ [
+ {"id":1, "data":"d1"},
+ {"id":2, "data":"d2"},
+ {"id":3, "data":"d3"},
+ ]
+ )
+
+ def test_update(self):
+ t = self.tables.plain_pk
+ r = config.db.execute(
+ t.update().where(t.c.id == 2),
+ data="d2_new"
+ )
+ assert not r.is_insert
+ assert not r.returns_rows
+
+ eq_(
+ config.db.execute(t.select().order_by(t.c.id)).fetchall(),
+ [
+ (1, "d1"),
+ (2, "d2_new"),
+ (3, "d3")
+ ]
+ )
+
+ def test_delete(self):
+ t = self.tables.plain_pk
+ r = config.db.execute(
+ t.delete().where(t.c.id == 2)
+ )
+ assert not r.is_insert
+ assert not r.returns_rows
+ eq_(
+ config.db.execute(t.select().order_by(t.c.id)).fetchall(),
+ [
+ (1, "d1"),
+ (3, "d3")
+ ]
+ )
+
+__all__ = ('SimpleUpdateDeleteTest', ) \ No newline at end of file
diff --git a/lib/sqlalchemy/testing/util.py b/lib/sqlalchemy/testing/util.py
index 625b9e6a5..a02053dfb 100644
--- a/lib/sqlalchemy/testing/util.py
+++ b/lib/sqlalchemy/testing/util.py
@@ -1,5 +1,5 @@
-from sqlalchemy.util import jython, pypy, defaultdict, decorator
-from sqlalchemy.util.compat import decimal
+from ..util import jython, pypy, defaultdict, decorator
+from ..util.compat import decimal
import gc
import time
diff --git a/lib/sqlalchemy/testing/warnings.py b/lib/sqlalchemy/testing/warnings.py
index 799fca128..7afcc63c5 100644
--- a/lib/sqlalchemy/testing/warnings.py
+++ b/lib/sqlalchemy/testing/warnings.py
@@ -1,8 +1,8 @@
from __future__ import absolute_import
import warnings
-from sqlalchemy import exc as sa_exc
-from sqlalchemy import util
+from .. import exc as sa_exc
+from .. import util
def testing_warn(msg, stacklevel=3):
"""Replaces sqlalchemy.util.warn during tests."""