diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-12-13 18:04:11 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-12-13 19:24:56 -0500 |
commit | 468db416dbf284f0e7dddde90ec9641dc89428c6 (patch) | |
tree | 061c600e0a2cd834d60137f901b5088a9c6eb664 /test/dialect/test_sqlite.py | |
parent | 5b146e1bab7b440038c356f388e3362a669399c1 (diff) | |
download | sqlalchemy-468db416dbf284f0e7dddde90ec9641dc89428c6.tar.gz |
- rework sqlite FK and unique constraint system to combine both PRAGMA
and regexp parsing of SQL in order to form a complete picture of
constraints + their names. fixes #3244 fixes #3261
- factor various PRAGMA work to be centralized into one call
Diffstat (limited to 'test/dialect/test_sqlite.py')
-rw-r--r-- | test/dialect/test_sqlite.py | 414 |
1 files changed, 291 insertions, 123 deletions
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index b4524dc27..44e4eda42 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -22,6 +22,7 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \ from sqlalchemy import testing from sqlalchemy.schema import CreateTable from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.testing import mock class TestTypes(fixtures.TestBase, AssertsExecutionResults): @@ -500,30 +501,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): # assert j.onclause.compare(table1.c['"id"'] # == table2.c['"aid"']) - def test_legacy_quoted_identifiers_unit(self): - dialect = sqlite.dialect() - dialect._broken_fk_pragma_quotes = True - - for row in [ - (0, 'target', 'tid', 'id'), - (0, '"target"', 'tid', 'id'), - (0, '[target]', 'tid', 'id'), - (0, "'target'", 'tid', 'id'), - (0, '`target`', 'tid', 'id'), - ]: - fks = {} - fkeys = [] - dialect._parse_fk(fks, fkeys, *row) - eq_( - fkeys, - [{ - 'referred_table': 'target', - 'referred_columns': ['id'], - 'referred_schema': None, - 'name': None, - 'constrained_columns': ['tid'] - }]) - @testing.provide_metadata def test_description_encoding(self): # amazingly, pysqlite seems to still deliver cursor.description @@ -557,69 +534,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults): e = create_engine('sqlite+pysqlite:///foo.db') assert e.pool.__class__ is pool.NullPool - @testing.provide_metadata - def test_dont_reflect_autoindex(self): - meta = self.metadata - Table('foo', meta, Column('bar', String, primary_key=True)) - meta.create_all() - inspector = Inspector(testing.db) - eq_(inspector.get_indexes('foo'), []) - eq_( - inspector.get_indexes('foo', include_auto_indexes=True), - [{ - 'unique': 1, - 'name': 'sqlite_autoindex_foo_1', - 'column_names': ['bar']}]) - - @testing.provide_metadata - def test_create_index_with_schema(self): - """Test creation of index with explicit schema""" - - meta = self.metadata - Table( - 'foo', meta, Column('bar', String, index=True), - schema='main') - meta.create_all() - inspector = Inspector(testing.db) - eq_( - inspector.get_indexes('foo', schema='main'), - [{'unique': 0, 'name': u'ix_main_foo_bar', - 'column_names': [u'bar']}]) - - @testing.provide_metadata - def test_get_unique_constraints(self): - meta = self.metadata - Table( - 'foo', meta, Column('f', Integer), - UniqueConstraint('f', name='foo_f')) - Table( - 'bar', meta, Column('b', Integer), - UniqueConstraint('b', name='bar_b'), - prefixes=['TEMPORARY']) - meta.create_all() - inspector = Inspector(testing.db) - eq_(inspector.get_unique_constraints('foo'), - [{'column_names': [u'f'], 'name': u'foo_f'}]) - eq_(inspector.get_unique_constraints('bar'), - [{'column_names': [u'b'], 'name': u'bar_b'}]) - - def test_get_unnamed_unique_constraints(self): - meta = MetaData(testing.db) - t1 = Table('foo', meta, Column('f', Integer), - UniqueConstraint('f')) - t2 = Table('bar', meta, Column('b', Integer), - UniqueConstraint('b'), - prefixes=['TEMPORARY']) - meta.create_all() - from sqlalchemy.engine.reflection import Inspector - try: - inspector = Inspector(testing.db) - eq_(inspector.get_unique_constraints('foo'), - [{'column_names': [u'f'], 'name': u''}]) - eq_(inspector.get_unique_constraints('bar'), - [{'column_names': [u'b'], 'name': u''}]) - finally: - meta.drop_all() class AttachedMemoryDBTest(fixtures.TestBase): @@ -1072,52 +986,306 @@ class ReflectHeadlessFKsTest(fixtures.TestBase): assert b.c.id.references(a.c.id) -class ReflectFKConstraintTest(fixtures.TestBase): +class ConstraintReflectionTest(fixtures.TestBase): __only_on__ = 'sqlite' - def setup(self): - testing.db.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)") - testing.db.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)") - testing.db.execute( - "CREATE TABLE b (id INTEGER PRIMARY KEY, " - "FOREIGN KEY(id) REFERENCES a1(id)," - "FOREIGN KEY(id) REFERENCES a2(id)" - ")") - testing.db.execute( - "CREATE TABLE c (id INTEGER, " - "CONSTRAINT bar PRIMARY KEY(id)," - "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id)," - "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)" - ")") + @classmethod + def setup_class(cls): + with testing.db.begin() as conn: + + conn.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)") + conn.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)") + conn.execute( + "CREATE TABLE b (id INTEGER PRIMARY KEY, " + "FOREIGN KEY(id) REFERENCES a1(id)," + "FOREIGN KEY(id) REFERENCES a2(id)" + ")") + conn.execute( + "CREATE TABLE c (id INTEGER, " + "CONSTRAINT bar PRIMARY KEY(id)," + "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id)," + "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)" + ")") + conn.execute( + # the lower casing + inline is intentional here + "CREATE TABLE d (id INTEGER, x INTEGER unique)") + conn.execute( + # the lower casing + inline is intentional here + 'CREATE TABLE d1 ' + '(id INTEGER, "some ( STUPID n,ame" INTEGER unique)') + conn.execute( + # the lower casing + inline is intentional here + 'CREATE TABLE d2 ( "some STUPID n,ame" INTEGER unique)') + conn.execute( + # the lower casing + inline is intentional here + 'CREATE TABLE d3 ( "some STUPID n,ame" INTEGER NULL unique)') + + conn.execute( + # lower casing + inline is intentional + "CREATE TABLE e (id INTEGER, x INTEGER references a2(id))") + conn.execute( + 'CREATE TABLE e1 (id INTEGER, "some ( STUPID n,ame" INTEGER ' + 'references a2 ("some ( STUPID n,ame"))') + conn.execute( + 'CREATE TABLE e2 (id INTEGER, ' + '"some ( STUPID n,ame" INTEGER NOT NULL ' + 'references a2 ("some ( STUPID n,ame"))') + + conn.execute( + "CREATE TABLE f (x INTEGER, CONSTRAINT foo_fx UNIQUE(x))" + ) + conn.execute( + "CREATE TEMPORARY TABLE g " + "(x INTEGER, CONSTRAINT foo_gx UNIQUE(x))" + ) + conn.execute( + # intentional broken casing + "CREATE TABLE h (x INTEGER, COnstraINT foo_hx unIQUE(x))" + ) + conn.execute( + "CREATE TABLE i (x INTEGER, y INTEGER, PRIMARY KEY(x, y))" + ) + conn.execute( + "CREATE TABLE j (id INTEGER, q INTEGER, p INTEGER, " + "PRIMARY KEY(id), FOreiGN KEY(q,p) REFERENCes i(x,y))" + ) + conn.execute( + "CREATE TABLE k (id INTEGER, q INTEGER, p INTEGER, " + "PRIMARY KEY(id), " + "conSTRAINT my_fk FOreiGN KEY ( q , p ) " + "REFERENCes i ( x , y ))" + ) - def teardown(self): - testing.db.execute("drop table c") - testing.db.execute("drop table b") - testing.db.execute("drop table a1") - testing.db.execute("drop table a2") + meta = MetaData() + Table( + 'l', meta, Column('bar', String, index=True), + schema='main') + + Table( + 'm', meta, + Column('id', Integer, primary_key=True), + Column('x', String(30)), + UniqueConstraint('x') + ) - def test_name_is_none(self): + Table( + 'n', meta, + Column('id', Integer, primary_key=True), + Column('x', String(30)), + UniqueConstraint('x'), + prefixes=['TEMPORARY'] + ) + + meta.create_all(conn) + + # will contain an "autoindex" + conn.execute("create table o (foo varchar(20) primary key)") + + @classmethod + def teardown_class(cls): + with testing.db.begin() as conn: + for name in [ + "m", "main.l", "k", "j", "i", "h", "g", "f", "e", "e1", + "d", "d1", "d2", "c", "b", "a1", "a2"]: + conn.execute("drop table %s" % name) + + def test_legacy_quoted_identifiers_unit(self): + dialect = sqlite.dialect() + dialect._broken_fk_pragma_quotes = True + + for row in [ + (0, None, 'target', 'tid', 'id', None), + (0, None, '"target"', 'tid', 'id', None), + (0, None, '[target]', 'tid', 'id', None), + (0, None, "'target'", 'tid', 'id', None), + (0, None, '`target`', 'tid', 'id', None), + ]: + def _get_table_pragma(*arg, **kw): + return [row] + + def _get_table_sql(*arg, **kw): + return "CREATE TABLE foo "\ + "(tid INTEGER, "\ + "FOREIGN KEY(tid) REFERENCES %s (id))" % row[2] + with mock.patch.object( + dialect, "_get_table_pragma", _get_table_pragma): + with mock.patch.object( + dialect, '_get_table_sql', _get_table_sql): + + fkeys = dialect.get_foreign_keys(None, 'foo') + eq_( + fkeys, + [{ + 'referred_table': 'target', + 'referred_columns': ['id'], + 'referred_schema': None, + 'name': None, + 'constrained_columns': ['tid'] + }]) + + def test_foreign_key_name_is_none(self): # and not "0" - meta = MetaData() - b = Table('b', meta, autoload=True, autoload_with=testing.db) + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('b') eq_( - [con.name for con in b.constraints], - [None, None, None] + fks, + [ + {'referred_table': 'a1', 'referred_columns': ['id'], + 'referred_schema': None, 'name': None, + 'constrained_columns': ['id']}, + {'referred_table': 'a2', 'referred_columns': ['id'], + 'referred_schema': None, 'name': None, + 'constrained_columns': ['id']}, + ] ) - def test_name_not_none(self): - # we don't have names for PK constraints, - # it appears we get back None in the pragma for - # FKs also (also it doesn't even appear to be documented on - # sqlite's docs - # at http://www.sqlite.org/pragma.html#pragma_foreign_key_list - # how did we ever know that's the "name" field ??) + def test_foreign_key_name_is_not_none(self): + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('c') + eq_( + fks, + [ + { + 'referred_table': 'a1', 'referred_columns': ['id'], + 'referred_schema': None, 'name': 'foo1', + 'constrained_columns': ['id']}, + { + 'referred_table': 'a2', 'referred_columns': ['id'], + 'referred_schema': None, 'name': 'foo2', + 'constrained_columns': ['id']}, + ] + ) - meta = MetaData() - c = Table('c', meta, autoload=True, autoload_with=testing.db) + def test_unnamed_inline_foreign_key(self): + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('e') + eq_( + fks, + [{ + 'referred_table': 'a2', 'referred_columns': ['id'], + 'referred_schema': None, + 'name': None, 'constrained_columns': ['x'] + }] + ) + + def test_unnamed_inline_foreign_key_quoted(self): + inspector = Inspector(testing.db) + + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('e1') + eq_( + fks, + [{ + 'referred_table': 'a2', + 'referred_columns': ['some ( STUPID n,ame'], + 'referred_schema': None, + 'name': None, 'constrained_columns': ['some ( STUPID n,ame'] + }] + ) + fks = inspector.get_foreign_keys('e2') + eq_( + fks, + [{ + 'referred_table': 'a2', + 'referred_columns': ['some ( STUPID n,ame'], + 'referred_schema': None, + 'name': None, 'constrained_columns': ['some ( STUPID n,ame'] + }] + ) + + def test_foreign_key_composite_broken_casing(self): + inspector = Inspector(testing.db) + fks = inspector.get_foreign_keys('j') + eq_( + fks, + [{ + 'referred_table': 'i', + 'referred_columns': ['x', 'y'], + 'referred_schema': None, 'name': None, + 'constrained_columns': ['q', 'p']}] + ) + fks = inspector.get_foreign_keys('k') + eq_( + fks, + [{'referred_table': 'i', 'referred_columns': ['x', 'y'], + 'referred_schema': None, 'name': 'my_fk', + 'constrained_columns': ['q', 'p']}] + ) + + def test_dont_reflect_autoindex(self): + inspector = Inspector(testing.db) + eq_(inspector.get_indexes('o'), []) + eq_( + inspector.get_indexes('o', include_auto_indexes=True), + [{ + 'unique': 1, + 'name': 'sqlite_autoindex_o_1', + 'column_names': ['foo']}]) + + def test_create_index_with_schema(self): + """Test creation of index with explicit schema""" + + inspector = Inspector(testing.db) + eq_( + inspector.get_indexes('l', schema='main'), + [{'unique': 0, 'name': u'ix_main_l_bar', + 'column_names': [u'bar']}]) + + def test_unique_constraint_named(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("f"), + [{'column_names': ['x'], 'name': 'foo_fx'}] + ) + + def test_unique_constraint_named_broken_casing(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("h"), + [{'column_names': ['x'], 'name': 'foo_hx'}] + ) + + def test_unique_constraint_named_broken_temp(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("g"), + [{'column_names': ['x'], 'name': 'foo_gx'}] + ) + + def test_unique_constraint_unnamed_inline(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("d"), + [{'column_names': ['x'], 'name': None}] + ) + + def test_unique_constraint_unnamed_inline_quoted(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("d1"), + [{'column_names': ['some ( STUPID n,ame'], 'name': None}] + ) + eq_( + inspector.get_unique_constraints("d2"), + [{'column_names': ['some STUPID n,ame'], 'name': None}] + ) + eq_( + inspector.get_unique_constraints("d3"), + [{'column_names': ['some STUPID n,ame'], 'name': None}] + ) + + def test_unique_constraint_unnamed_normal(self): + inspector = Inspector(testing.db) + eq_( + inspector.get_unique_constraints("m"), + [{'column_names': ['x'], 'name': None}] + ) + + def test_unique_constraint_unnamed_normal_temporary(self): + inspector = Inspector(testing.db) eq_( - set([con.name for con in c.constraints]), - set([None, None]) + inspector.get_unique_constraints("n"), + [{'column_names': ['x'], 'name': None}] ) |