summaryrefslogtreecommitdiff
path: root/test/dialect/test_sqlite.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-12-13 18:04:11 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2014-12-13 19:24:56 -0500
commit468db416dbf284f0e7dddde90ec9641dc89428c6 (patch)
tree061c600e0a2cd834d60137f901b5088a9c6eb664 /test/dialect/test_sqlite.py
parent5b146e1bab7b440038c356f388e3362a669399c1 (diff)
downloadsqlalchemy-468db416dbf284f0e7dddde90ec9641dc89428c6.tar.gz
- rework sqlite FK and unique constraint system to combine both PRAGMA
and regexp parsing of SQL in order to form a complete picture of constraints + their names. fixes #3244 fixes #3261 - factor various PRAGMA work to be centralized into one call
Diffstat (limited to 'test/dialect/test_sqlite.py')
-rw-r--r--test/dialect/test_sqlite.py414
1 files changed, 291 insertions, 123 deletions
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py
index b4524dc27..44e4eda42 100644
--- a/test/dialect/test_sqlite.py
+++ b/test/dialect/test_sqlite.py
@@ -22,6 +22,7 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \
from sqlalchemy import testing
from sqlalchemy.schema import CreateTable
from sqlalchemy.engine.reflection import Inspector
+from sqlalchemy.testing import mock
class TestTypes(fixtures.TestBase, AssertsExecutionResults):
@@ -500,30 +501,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
# assert j.onclause.compare(table1.c['"id"']
# == table2.c['"aid"'])
- def test_legacy_quoted_identifiers_unit(self):
- dialect = sqlite.dialect()
- dialect._broken_fk_pragma_quotes = True
-
- for row in [
- (0, 'target', 'tid', 'id'),
- (0, '"target"', 'tid', 'id'),
- (0, '[target]', 'tid', 'id'),
- (0, "'target'", 'tid', 'id'),
- (0, '`target`', 'tid', 'id'),
- ]:
- fks = {}
- fkeys = []
- dialect._parse_fk(fks, fkeys, *row)
- eq_(
- fkeys,
- [{
- 'referred_table': 'target',
- 'referred_columns': ['id'],
- 'referred_schema': None,
- 'name': None,
- 'constrained_columns': ['tid']
- }])
-
@testing.provide_metadata
def test_description_encoding(self):
# amazingly, pysqlite seems to still deliver cursor.description
@@ -557,69 +534,6 @@ class DialectTest(fixtures.TestBase, AssertsExecutionResults):
e = create_engine('sqlite+pysqlite:///foo.db')
assert e.pool.__class__ is pool.NullPool
- @testing.provide_metadata
- def test_dont_reflect_autoindex(self):
- meta = self.metadata
- Table('foo', meta, Column('bar', String, primary_key=True))
- meta.create_all()
- inspector = Inspector(testing.db)
- eq_(inspector.get_indexes('foo'), [])
- eq_(
- inspector.get_indexes('foo', include_auto_indexes=True),
- [{
- 'unique': 1,
- 'name': 'sqlite_autoindex_foo_1',
- 'column_names': ['bar']}])
-
- @testing.provide_metadata
- def test_create_index_with_schema(self):
- """Test creation of index with explicit schema"""
-
- meta = self.metadata
- Table(
- 'foo', meta, Column('bar', String, index=True),
- schema='main')
- meta.create_all()
- inspector = Inspector(testing.db)
- eq_(
- inspector.get_indexes('foo', schema='main'),
- [{'unique': 0, 'name': u'ix_main_foo_bar',
- 'column_names': [u'bar']}])
-
- @testing.provide_metadata
- def test_get_unique_constraints(self):
- meta = self.metadata
- Table(
- 'foo', meta, Column('f', Integer),
- UniqueConstraint('f', name='foo_f'))
- Table(
- 'bar', meta, Column('b', Integer),
- UniqueConstraint('b', name='bar_b'),
- prefixes=['TEMPORARY'])
- meta.create_all()
- inspector = Inspector(testing.db)
- eq_(inspector.get_unique_constraints('foo'),
- [{'column_names': [u'f'], 'name': u'foo_f'}])
- eq_(inspector.get_unique_constraints('bar'),
- [{'column_names': [u'b'], 'name': u'bar_b'}])
-
- def test_get_unnamed_unique_constraints(self):
- meta = MetaData(testing.db)
- t1 = Table('foo', meta, Column('f', Integer),
- UniqueConstraint('f'))
- t2 = Table('bar', meta, Column('b', Integer),
- UniqueConstraint('b'),
- prefixes=['TEMPORARY'])
- meta.create_all()
- from sqlalchemy.engine.reflection import Inspector
- try:
- inspector = Inspector(testing.db)
- eq_(inspector.get_unique_constraints('foo'),
- [{'column_names': [u'f'], 'name': u''}])
- eq_(inspector.get_unique_constraints('bar'),
- [{'column_names': [u'b'], 'name': u''}])
- finally:
- meta.drop_all()
class AttachedMemoryDBTest(fixtures.TestBase):
@@ -1072,52 +986,306 @@ class ReflectHeadlessFKsTest(fixtures.TestBase):
assert b.c.id.references(a.c.id)
-class ReflectFKConstraintTest(fixtures.TestBase):
+class ConstraintReflectionTest(fixtures.TestBase):
__only_on__ = 'sqlite'
- def setup(self):
- testing.db.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
- testing.db.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
- testing.db.execute(
- "CREATE TABLE b (id INTEGER PRIMARY KEY, "
- "FOREIGN KEY(id) REFERENCES a1(id),"
- "FOREIGN KEY(id) REFERENCES a2(id)"
- ")")
- testing.db.execute(
- "CREATE TABLE c (id INTEGER, "
- "CONSTRAINT bar PRIMARY KEY(id),"
- "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id),"
- "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)"
- ")")
+ @classmethod
+ def setup_class(cls):
+ with testing.db.begin() as conn:
+
+ conn.execute("CREATE TABLE a1 (id INTEGER PRIMARY KEY)")
+ conn.execute("CREATE TABLE a2 (id INTEGER PRIMARY KEY)")
+ conn.execute(
+ "CREATE TABLE b (id INTEGER PRIMARY KEY, "
+ "FOREIGN KEY(id) REFERENCES a1(id),"
+ "FOREIGN KEY(id) REFERENCES a2(id)"
+ ")")
+ conn.execute(
+ "CREATE TABLE c (id INTEGER, "
+ "CONSTRAINT bar PRIMARY KEY(id),"
+ "CONSTRAINT foo1 FOREIGN KEY(id) REFERENCES a1(id),"
+ "CONSTRAINT foo2 FOREIGN KEY(id) REFERENCES a2(id)"
+ ")")
+ conn.execute(
+ # the lower casing + inline is intentional here
+ "CREATE TABLE d (id INTEGER, x INTEGER unique)")
+ conn.execute(
+ # the lower casing + inline is intentional here
+ 'CREATE TABLE d1 '
+ '(id INTEGER, "some ( STUPID n,ame" INTEGER unique)')
+ conn.execute(
+ # the lower casing + inline is intentional here
+ 'CREATE TABLE d2 ( "some STUPID n,ame" INTEGER unique)')
+ conn.execute(
+ # the lower casing + inline is intentional here
+ 'CREATE TABLE d3 ( "some STUPID n,ame" INTEGER NULL unique)')
+
+ conn.execute(
+ # lower casing + inline is intentional
+ "CREATE TABLE e (id INTEGER, x INTEGER references a2(id))")
+ conn.execute(
+ 'CREATE TABLE e1 (id INTEGER, "some ( STUPID n,ame" INTEGER '
+ 'references a2 ("some ( STUPID n,ame"))')
+ conn.execute(
+ 'CREATE TABLE e2 (id INTEGER, '
+ '"some ( STUPID n,ame" INTEGER NOT NULL '
+ 'references a2 ("some ( STUPID n,ame"))')
+
+ conn.execute(
+ "CREATE TABLE f (x INTEGER, CONSTRAINT foo_fx UNIQUE(x))"
+ )
+ conn.execute(
+ "CREATE TEMPORARY TABLE g "
+ "(x INTEGER, CONSTRAINT foo_gx UNIQUE(x))"
+ )
+ conn.execute(
+ # intentional broken casing
+ "CREATE TABLE h (x INTEGER, COnstraINT foo_hx unIQUE(x))"
+ )
+ conn.execute(
+ "CREATE TABLE i (x INTEGER, y INTEGER, PRIMARY KEY(x, y))"
+ )
+ conn.execute(
+ "CREATE TABLE j (id INTEGER, q INTEGER, p INTEGER, "
+ "PRIMARY KEY(id), FOreiGN KEY(q,p) REFERENCes i(x,y))"
+ )
+ conn.execute(
+ "CREATE TABLE k (id INTEGER, q INTEGER, p INTEGER, "
+ "PRIMARY KEY(id), "
+ "conSTRAINT my_fk FOreiGN KEY ( q , p ) "
+ "REFERENCes i ( x , y ))"
+ )
- def teardown(self):
- testing.db.execute("drop table c")
- testing.db.execute("drop table b")
- testing.db.execute("drop table a1")
- testing.db.execute("drop table a2")
+ meta = MetaData()
+ Table(
+ 'l', meta, Column('bar', String, index=True),
+ schema='main')
+
+ Table(
+ 'm', meta,
+ Column('id', Integer, primary_key=True),
+ Column('x', String(30)),
+ UniqueConstraint('x')
+ )
- def test_name_is_none(self):
+ Table(
+ 'n', meta,
+ Column('id', Integer, primary_key=True),
+ Column('x', String(30)),
+ UniqueConstraint('x'),
+ prefixes=['TEMPORARY']
+ )
+
+ meta.create_all(conn)
+
+ # will contain an "autoindex"
+ conn.execute("create table o (foo varchar(20) primary key)")
+
+ @classmethod
+ def teardown_class(cls):
+ with testing.db.begin() as conn:
+ for name in [
+ "m", "main.l", "k", "j", "i", "h", "g", "f", "e", "e1",
+ "d", "d1", "d2", "c", "b", "a1", "a2"]:
+ conn.execute("drop table %s" % name)
+
+ def test_legacy_quoted_identifiers_unit(self):
+ dialect = sqlite.dialect()
+ dialect._broken_fk_pragma_quotes = True
+
+ for row in [
+ (0, None, 'target', 'tid', 'id', None),
+ (0, None, '"target"', 'tid', 'id', None),
+ (0, None, '[target]', 'tid', 'id', None),
+ (0, None, "'target'", 'tid', 'id', None),
+ (0, None, '`target`', 'tid', 'id', None),
+ ]:
+ def _get_table_pragma(*arg, **kw):
+ return [row]
+
+ def _get_table_sql(*arg, **kw):
+ return "CREATE TABLE foo "\
+ "(tid INTEGER, "\
+ "FOREIGN KEY(tid) REFERENCES %s (id))" % row[2]
+ with mock.patch.object(
+ dialect, "_get_table_pragma", _get_table_pragma):
+ with mock.patch.object(
+ dialect, '_get_table_sql', _get_table_sql):
+
+ fkeys = dialect.get_foreign_keys(None, 'foo')
+ eq_(
+ fkeys,
+ [{
+ 'referred_table': 'target',
+ 'referred_columns': ['id'],
+ 'referred_schema': None,
+ 'name': None,
+ 'constrained_columns': ['tid']
+ }])
+
+ def test_foreign_key_name_is_none(self):
# and not "0"
- meta = MetaData()
- b = Table('b', meta, autoload=True, autoload_with=testing.db)
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('b')
eq_(
- [con.name for con in b.constraints],
- [None, None, None]
+ fks,
+ [
+ {'referred_table': 'a1', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': None,
+ 'constrained_columns': ['id']},
+ {'referred_table': 'a2', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': None,
+ 'constrained_columns': ['id']},
+ ]
)
- def test_name_not_none(self):
- # we don't have names for PK constraints,
- # it appears we get back None in the pragma for
- # FKs also (also it doesn't even appear to be documented on
- # sqlite's docs
- # at http://www.sqlite.org/pragma.html#pragma_foreign_key_list
- # how did we ever know that's the "name" field ??)
+ def test_foreign_key_name_is_not_none(self):
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('c')
+ eq_(
+ fks,
+ [
+ {
+ 'referred_table': 'a1', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': 'foo1',
+ 'constrained_columns': ['id']},
+ {
+ 'referred_table': 'a2', 'referred_columns': ['id'],
+ 'referred_schema': None, 'name': 'foo2',
+ 'constrained_columns': ['id']},
+ ]
+ )
- meta = MetaData()
- c = Table('c', meta, autoload=True, autoload_with=testing.db)
+ def test_unnamed_inline_foreign_key(self):
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('e')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'a2', 'referred_columns': ['id'],
+ 'referred_schema': None,
+ 'name': None, 'constrained_columns': ['x']
+ }]
+ )
+
+ def test_unnamed_inline_foreign_key_quoted(self):
+ inspector = Inspector(testing.db)
+
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('e1')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'a2',
+ 'referred_columns': ['some ( STUPID n,ame'],
+ 'referred_schema': None,
+ 'name': None, 'constrained_columns': ['some ( STUPID n,ame']
+ }]
+ )
+ fks = inspector.get_foreign_keys('e2')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'a2',
+ 'referred_columns': ['some ( STUPID n,ame'],
+ 'referred_schema': None,
+ 'name': None, 'constrained_columns': ['some ( STUPID n,ame']
+ }]
+ )
+
+ def test_foreign_key_composite_broken_casing(self):
+ inspector = Inspector(testing.db)
+ fks = inspector.get_foreign_keys('j')
+ eq_(
+ fks,
+ [{
+ 'referred_table': 'i',
+ 'referred_columns': ['x', 'y'],
+ 'referred_schema': None, 'name': None,
+ 'constrained_columns': ['q', 'p']}]
+ )
+ fks = inspector.get_foreign_keys('k')
+ eq_(
+ fks,
+ [{'referred_table': 'i', 'referred_columns': ['x', 'y'],
+ 'referred_schema': None, 'name': 'my_fk',
+ 'constrained_columns': ['q', 'p']}]
+ )
+
+ def test_dont_reflect_autoindex(self):
+ inspector = Inspector(testing.db)
+ eq_(inspector.get_indexes('o'), [])
+ eq_(
+ inspector.get_indexes('o', include_auto_indexes=True),
+ [{
+ 'unique': 1,
+ 'name': 'sqlite_autoindex_o_1',
+ 'column_names': ['foo']}])
+
+ def test_create_index_with_schema(self):
+ """Test creation of index with explicit schema"""
+
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_indexes('l', schema='main'),
+ [{'unique': 0, 'name': u'ix_main_l_bar',
+ 'column_names': [u'bar']}])
+
+ def test_unique_constraint_named(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("f"),
+ [{'column_names': ['x'], 'name': 'foo_fx'}]
+ )
+
+ def test_unique_constraint_named_broken_casing(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("h"),
+ [{'column_names': ['x'], 'name': 'foo_hx'}]
+ )
+
+ def test_unique_constraint_named_broken_temp(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("g"),
+ [{'column_names': ['x'], 'name': 'foo_gx'}]
+ )
+
+ def test_unique_constraint_unnamed_inline(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("d"),
+ [{'column_names': ['x'], 'name': None}]
+ )
+
+ def test_unique_constraint_unnamed_inline_quoted(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("d1"),
+ [{'column_names': ['some ( STUPID n,ame'], 'name': None}]
+ )
+ eq_(
+ inspector.get_unique_constraints("d2"),
+ [{'column_names': ['some STUPID n,ame'], 'name': None}]
+ )
+ eq_(
+ inspector.get_unique_constraints("d3"),
+ [{'column_names': ['some STUPID n,ame'], 'name': None}]
+ )
+
+ def test_unique_constraint_unnamed_normal(self):
+ inspector = Inspector(testing.db)
+ eq_(
+ inspector.get_unique_constraints("m"),
+ [{'column_names': ['x'], 'name': None}]
+ )
+
+ def test_unique_constraint_unnamed_normal_temporary(self):
+ inspector = Inspector(testing.db)
eq_(
- set([con.name for con in c.constraints]),
- set([None, None])
+ inspector.get_unique_constraints("n"),
+ [{'column_names': ['x'], 'name': None}]
)