diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-02-16 18:14:10 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-02-16 18:14:10 -0500 |
commit | 00c6ebb4408964583943826c831064e1d7472169 (patch) | |
tree | 92f6c24c1828b6394d2a942ec908e442cfcd71f7 | |
parent | 5bc3b7fe6b88f12cb39516aa78408c683fc63d0c (diff) | |
download | sqlalchemy-00c6ebb4408964583943826c831064e1d7472169.tar.gz |
- rewrite SQLite reflection tests into one consistent fixture, which tests
both _resolve_type_affinity() directly as well as round trip tests fully.
-rw-r--r-- | lib/sqlalchemy/dialects/sqlite/base.py | 56 | ||||
-rw-r--r-- | lib/sqlalchemy/testing/warnings.py | 9 | ||||
-rw-r--r-- | test/dialect/test_sqlite.py | 303 |
3 files changed, 181 insertions, 187 deletions
diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index a1bd05d38..70caef1e4 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -857,24 +857,7 @@ class SQLiteDialect(default.DefaultDialect): return columns def _get_column_info(self, name, type_, nullable, default, primary_key): - match = re.match(r'([\w ]+)(\(.*?\))?', type_) - if match: - coltype = match.group(1) - args = match.group(2) - else: - coltype = '' - args = '' - coltype = self._resolve_type_affinity(coltype) - if args is not None: - args = re.findall(r'(\d+)', args) - try: - coltype = coltype(*[int(a) for a in args]) - except TypeError: - util.warn( - "Could not instantiate type %s with " - "reflected arguments %s; using no arguments." % - (coltype, args)) - coltype = coltype() + coltype = self._resolve_type_affinity(type_) if default is not None: default = util.text_type(default) @@ -888,7 +871,7 @@ class SQLiteDialect(default.DefaultDialect): 'primary_key': primary_key, } - def _resolve_type_affinity(self, coltype): + def _resolve_type_affinity(self, type_): """Return a data type from a reflected column, using affinity tules. SQLite's goal for universal compatability introduces some complexity @@ -905,18 +888,41 @@ class SQLiteDialect(default.DefaultDialect): DATE and DOUBLE). """ + match = re.match(r'([\w ]+)(\(.*?\))?', type_) + if match: + coltype = match.group(1) + args = match.group(2) + else: + coltype = '' + args = '' + if coltype in self.ischema_names: - return self.ischema_names[coltype] + coltype = self.ischema_names[coltype] elif 'INT' in coltype: - return sqltypes.INTEGER + coltype = sqltypes.INTEGER elif 'CHAR' in coltype or 'CLOB' in coltype or 'TEXT' in coltype: - return sqltypes.TEXT + coltype = sqltypes.TEXT elif 'BLOB' in coltype or not coltype: - return sqltypes.NullType + coltype = sqltypes.NullType elif 'REAL' in coltype or 'FLOA' in coltype or 'DOUB' in coltype: - return sqltypes.REAL + coltype = sqltypes.REAL else: - return sqltypes.NUMERIC + coltype = sqltypes.NUMERIC + + if args is not None: + args = re.findall(r'(\d+)', args) + try: + coltype = coltype(*[int(a) for a in args]) + except TypeError: + util.warn( + "Could not instantiate type %s with " + "reflected arguments %s; using no arguments." % + (coltype, args)) + coltype = coltype() + else: + coltype = coltype() + + return coltype @reflection.cache def get_pk_constraint(self, connection, table_name, schema=None, **kw): diff --git a/lib/sqlalchemy/testing/warnings.py b/lib/sqlalchemy/testing/warnings.py index 74a8933a6..849b1b5b4 100644 --- a/lib/sqlalchemy/testing/warnings.py +++ b/lib/sqlalchemy/testing/warnings.py @@ -9,7 +9,7 @@ from __future__ import absolute_import import warnings from .. import exc as sa_exc from .. import util - +import re def testing_warn(msg, stacklevel=3): """Replaces sqlalchemy.util.warn during tests.""" @@ -33,7 +33,7 @@ def resetwarnings(): warnings.filterwarnings('error', category=sa_exc.SAWarning) -def assert_warnings(fn, warnings): +def assert_warnings(fn, warnings, regex=False): """Assert that each of the given warnings are emitted by fn.""" from .assertions import eq_, emits_warning @@ -45,7 +45,10 @@ def assert_warnings(fn, warnings): orig_warn(*args, **kw) popwarn = warnings.pop(0) canary.append(popwarn) - eq_(args[0], popwarn) + if regex: + assert re.match(popwarn, args[0]) + else: + eq_(args[0], popwarn) util.warn = util.langhelpers.warn = capture_warnings result = emits_warning()(fn)() diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index 38bb78304..e7fcce859 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -6,16 +6,14 @@ from collections import Counter import datetime from sqlalchemy.testing import eq_, assert_raises, \ - assert_raises_message -from sqlalchemy import Table, String, select, Text, CHAR, bindparam, Column,\ - Unicode, Date, MetaData, UnicodeText, Time, Integer, TIMESTAMP, \ - Boolean, func, NUMERIC, DateTime, extract, ForeignKey, text, Numeric,\ - DefaultClause, and_, DECIMAL, TypeDecorator, create_engine, Float, \ - INTEGER, UniqueConstraint, DATETIME, DATE, TIME, BOOLEAN, BIGINT, \ - VARCHAR -from sqlalchemy.types import UserDefinedType + assert_raises_message, is_ +from sqlalchemy import Table, select, bindparam, Column,\ + MetaData, func, extract, ForeignKey, text, DefaultClause, and_, create_engine,\ + UniqueConstraint +from sqlalchemy.types import Integer, String, Boolean, DateTime, Date, Time +from sqlalchemy import types as sqltypes from sqlalchemy.util import u, ue -from sqlalchemy import exc, sql, schema, pool, types as sqltypes, util +from sqlalchemy import exc, sql, schema, pool, util from sqlalchemy.dialects.sqlite import base as sqlite, \ pysqlite as pysqlite_dialect from sqlalchemy.engine.url import make_url @@ -81,9 +79,9 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): | dbapi.PARSE_COLNAMES} engine = engines.testing_engine(options={'connect_args' : connect_args, 'native_datetime': True}) - t = Table('datetest', MetaData(), Column('id', Integer, - primary_key=True), Column('d1', Date), Column('d2', - TIMESTAMP)) + t = Table('datetest', MetaData(), + Column('id', Integer, primary_key=True), + Column('d1', Date), Column('d2', sqltypes.TIMESTAMP)) t.create(engine) try: engine.execute(t.insert(), {'d1': datetime.date(2010, 5, @@ -151,98 +149,18 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): dialect = sqlite.dialect() for t in ( String(convert_unicode=True), - CHAR(convert_unicode=True), - Unicode(), - UnicodeText(), + sqltypes.CHAR(convert_unicode=True), + sqltypes.Unicode(), + sqltypes.UnicodeText(), String(convert_unicode=True), - CHAR(convert_unicode=True), - Unicode(), - UnicodeText(), + sqltypes.CHAR(convert_unicode=True), + sqltypes.Unicode(), + sqltypes.UnicodeText(), ): bindproc = t.dialect_impl(dialect).bind_processor(dialect) assert not bindproc or \ isinstance(bindproc(util.u('some string')), util.text_type) - @testing.emits_warning("Could not instantiate") - @testing.provide_metadata - def test_type_reflection(self): - metadata = self.metadata - - # (ask_for, roundtripped_as_if_different) - - class AnyType(UserDefinedType): - def __init__(self, spec): - self.spec = spec - - def get_col_spec(self): - return self.spec - - specs = [ - (String(), String()), - (String(1), String(1)), - (String(3), String(3)), - (Text(), Text()), - (Unicode(), String()), - (Unicode(1), String(1)), - (Unicode(3), String(3)), - (UnicodeText(), Text()), - (CHAR(1), ), - (CHAR(3), CHAR(3)), - (NUMERIC, NUMERIC()), - (NUMERIC(10, 2), NUMERIC(10, 2)), - (Numeric, NUMERIC()), - (Numeric(10, 2), NUMERIC(10, 2)), - (DECIMAL, DECIMAL()), - (DECIMAL(10, 2), DECIMAL(10, 2)), - (INTEGER, INTEGER()), - (BIGINT, BIGINT()), - (Float, Float()), - (NUMERIC(), ), - (TIMESTAMP, TIMESTAMP()), - (DATETIME, DATETIME()), - (DateTime, DateTime()), - (DateTime(), ), - (DATE, DATE()), - (Date, Date()), - (TIME, TIME()), - (Time, Time()), - (BOOLEAN, BOOLEAN()), - (Boolean, Boolean()), - # types with unsupported arguments - (AnyType("INTEGER(5)"), INTEGER()), - (AnyType("DATETIME(6, 12)"), DATETIME()), - ] - columns = [Column('c%i' % (i + 1), t[0]) for (i, t) in - enumerate(specs)] - db = testing.db - Table('types', metadata, *columns) - metadata.create_all() - m2 = MetaData(db) - rt = Table('types', m2, autoload=True) - try: - db.execute('CREATE VIEW types_v AS SELECT * from types') - rv = Table('types_v', m2, autoload=True) - expected = [len(c) > 1 and c[1] or c[0] for c in specs] - for table in rt, rv: - for i, reflected in enumerate(table.c): - assert isinstance(reflected.type, - type(expected[i])), '%d: %r' % (i, - type(expected[i])) - finally: - db.execute('DROP VIEW types_v') - - @testing.provide_metadata - def test_unknown_reflection(self): - metadata = self.metadata - t = Table('t', metadata, - Column('x', sqltypes.BINARY(16)), - Column('y', sqltypes.BINARY()) - ) - t.create() - t2 = Table('t', MetaData(), autoload=True, autoload_with=testing.db) - assert isinstance(t2.c.x.type, sqltypes.Numeric) - assert isinstance(t2.c.y.type, sqltypes.Numeric) - class DateTimeTest(fixtures.TestBase, AssertsCompiledSQL): @@ -349,14 +267,14 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL): # (ask_for, roundtripped_as_if_different) - specs = [(String(3), '"foo"'), (NUMERIC(10, 2), '100.50'), + specs = [(String(3), '"foo"'), (sqltypes.NUMERIC(10, 2), '100.50'), (Integer, '5'), (Boolean, 'False')] columns = [Column('c%i' % (i + 1), t[0], server_default=text(t[1])) for (i, t) in enumerate(specs)] db = testing.db m = MetaData(db) - t_table = Table('t_defaults', m, *columns) + Table('t_defaults', m, *columns) try: m.create_all() m2 = MetaData(db) @@ -429,13 +347,8 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL): """test non-quoted integer value on older sqlite pragma""" dialect = sqlite.dialect() - eq_( - dialect._get_column_info("foo", "INTEGER", False, 3, False), - {'primary_key': False, 'nullable': False, - 'default': '3', 'autoincrement': False, - 'type': INTEGER, 'name': 'foo'} - ) - + info = dialect._get_column_info("foo", "INTEGER", False, 3, False) + eq_(info['default'], '3') @@ -957,7 +870,7 @@ class AutoIncrementTest(fixtures.TestBase, AssertsCompiledSQL): dialect=sqlite.dialect()) def test_sqlite_autoincrement_int_affinity(self): - class MyInteger(TypeDecorator): + class MyInteger(sqltypes.TypeDecorator): impl = Integer table = Table( 'autoinctable', @@ -1037,58 +950,130 @@ class ReflectFKConstraintTest(fixtures.TestBase): ) -class ColumnTypeAffinityReflectionTest(fixtures.TestBase): - """Tests on data type affinities for SQLite during relection. +class TypeReflectionTest(fixtures.TestBase): - See http://www.sqlite.org/datatype3.html - section 2. - """ __only_on__ = 'sqlite' - def setup(self): - testing.db.execute(""" - CREATE TABLE a ( - "id" INTEGER PRIMARY KEY, - "foo" DOUBLE, - "bar" DECIMAL(19,4), - "baz" VARCHAR(200), - "boff", - "biff" LONGTEXT - )""") - # These example names come from section 2.2 of the datatype docs, - # after pruning out types which we convert to more convenient types - self.example_typenames_integer = ["TINYINT", "MEDIUMINT", "INT2", - "UNSIGNED BIG INT", "INT8"] - self.example_typenames_text = ["CHARACTER(20)", "CLOB", - "VARYING CHARACTER(70)", "NATIVE CHARACTER(70)"] - self.example_typenames_none = [""] - self.example_typenames_real = ["DOUBLE PRECISION"] - cols = ["i%d %s" % (n, t) for n, t in enumerate( - self.example_typenames_integer)] - cols += ["t%d %s" % (n, t) for n, t in enumerate( - self.example_typenames_text)] - cols += ["o%d %s" % (n, t) for n, t in enumerate( - self.example_typenames_none)] - cols += ["n%d %s" % (n, t) for n, t in enumerate( - self.example_typenames_real)] - cols = ','.join(cols) - testing.db.execute("CREATE TABLE b (%s)" % (cols,)) - - def teardown(self): - testing.db.execute("drop table a") - testing.db.execute("drop table b") - - def test_can_reflect_with_affinity(self): - "Test that 'affinity-types' don't break reflection outright." - meta = MetaData() - a = Table('a', meta, autoload=True, autoload_with=testing.db) - eq_(len(a.columns), 6) + def _fixed_lookup_fixture(self): + return [ + (sqltypes.String(), sqltypes.VARCHAR()), + (sqltypes.String(1), sqltypes.VARCHAR(1)), + (sqltypes.String(3), sqltypes.VARCHAR(3)), + (sqltypes.Text(), sqltypes.TEXT()), + (sqltypes.Unicode(), sqltypes.VARCHAR()), + (sqltypes.Unicode(1), sqltypes.VARCHAR(1)), + (sqltypes.UnicodeText(), sqltypes.TEXT()), + (sqltypes.CHAR(3), sqltypes.CHAR(3)), + (sqltypes.NUMERIC, sqltypes.NUMERIC()), + (sqltypes.NUMERIC(10, 2), sqltypes.NUMERIC(10, 2)), + (sqltypes.Numeric, sqltypes.NUMERIC()), + (sqltypes.Numeric(10, 2), sqltypes.NUMERIC(10, 2)), + (sqltypes.DECIMAL, sqltypes.DECIMAL()), + (sqltypes.DECIMAL(10, 2), sqltypes.DECIMAL(10, 2)), + (sqltypes.INTEGER, sqltypes.INTEGER()), + (sqltypes.BIGINT, sqltypes.BIGINT()), + (sqltypes.Float, sqltypes.FLOAT()), + (sqltypes.TIMESTAMP, sqltypes.TIMESTAMP()), + (sqltypes.DATETIME, sqltypes.DATETIME()), + (sqltypes.DateTime, sqltypes.DATETIME()), + (sqltypes.DateTime(), sqltypes.DATETIME()), + (sqltypes.DATE, sqltypes.DATE()), + (sqltypes.Date, sqltypes.DATE()), + (sqltypes.TIME, sqltypes.TIME()), + (sqltypes.Time, sqltypes.TIME()), + (sqltypes.BOOLEAN, sqltypes.BOOLEAN()), + (sqltypes.Boolean, sqltypes.BOOLEAN()), + ] + + def _unsupported_args_fixture(self): + return [ + ("INTEGER(5)", sqltypes.INTEGER(),), + ("DATETIME(6, 12)", sqltypes.DATETIME()) + ] + + def _type_affinity_fixture(self): + return [ + ("LONGTEXT", sqltypes.TEXT()), + ("TINYINT", sqltypes.INTEGER()), + ("MEDIUMINT", sqltypes.INTEGER()), + ("INT2", sqltypes.INTEGER()), + ("UNSIGNED BIG INT", sqltypes.INTEGER()), + ("INT8", sqltypes.INTEGER()), + ("CHARACTER(20)", sqltypes.TEXT()), + ("CLOB", sqltypes.TEXT()), + ("CLOBBER", sqltypes.TEXT()), + ("VARYING CHARACTER(70)", sqltypes.TEXT()), + ("NATIVE CHARACTER(70)", sqltypes.TEXT()), + ("BLOB", sqltypes.BLOB()), + ("BLOBBER", sqltypes.NullType()), + ("DOUBLE PRECISION", sqltypes.REAL()), + ("FLOATY", sqltypes.REAL()), + ("NOTHING WE KNOW", sqltypes.NUMERIC()), + ] + + def _fixture_as_string(self, fixture): + for from_, to_ in fixture: + if isinstance(from_, sqltypes.TypeEngine): + from_ = str(from_.compile()) + elif isinstance(from_, type): + from_ = str(from_().compile()) + yield from_, to_ + + def _test_lookup_direct(self, fixture, warnings=False): + dialect = sqlite.dialect() + for from_, to_ in self._fixture_as_string(fixture): + if warnings: + def go(): + return dialect._resolve_type_affinity(from_) + final_type = testing.assert_warnings(go, + ["Could not instantiate"], regex=True) + else: + final_type = dialect._resolve_type_affinity(from_) + expected_type = type(to_) + is_(type(final_type), expected_type) + + def _test_round_trip(self, fixture, warnings=False): + from sqlalchemy import inspect + conn = testing.db.connect() + for from_, to_ in self._fixture_as_string(fixture): + inspector = inspect(conn) + conn.execute("CREATE TABLE foo (data %s)" % from_) + try: + if warnings: + def go(): + return inspector.get_columns("foo")[0] + col_info = testing.assert_warnings(go, + ["Could not instantiate"], regex=True) + else: + col_info = inspector.get_columns("foo")[0] + expected_type = type(to_) + is_(type(col_info['type']), expected_type) + + # test args + for attr in ("scale", "precision", "length"): + if getattr(to_, attr, None) is not None: + eq_( + getattr(col_info['type'], attr), + getattr(to_, attr, None) + ) + finally: + conn.execute("DROP TABLE foo") + + def test_lookup_direct_lookup(self): + self._test_lookup_direct(self._fixed_lookup_fixture()) + + def test_lookup_direct_unsupported_args(self): + self._test_lookup_direct(self._unsupported_args_fixture(), warnings=True) + + def test_lookup_direct_type_affinity(self): + self._test_lookup_direct(self._type_affinity_fixture()) + + def test_round_trip_direct_lookup(self): + self._test_round_trip(self._fixed_lookup_fixture()) + + def test_round_trip_direct_unsupported_args(self): + self._test_round_trip(self._unsupported_args_fixture(), warnings=True) + + def test_round_trip_direct_type_affinity(self): + self._test_round_trip(self._type_affinity_fixture()) - def test_correct_reflection_with_affinity(self): - "Test that coltypes are detected correctly from affinity rules." - meta = MetaData() - b = Table('b', meta, autoload=True, autoload_with=testing.db) - typecounts = Counter(type(col.type) for col in b.columns) - eq_(typecounts[sqltypes.INTEGER], len(self.example_typenames_integer)) - eq_(typecounts[sqltypes.TEXT], len(self.example_typenames_text)) - eq_(typecounts[sqltypes.NullType], len(self.example_typenames_none)) - eq_(typecounts[sqltypes.REAL], len(self.example_typenames_real)) |