summaryrefslogtreecommitdiff
path: root/test/engine/test_reflection.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_reflection.py')
-rw-r--r--test/engine/test_reflection.py467
1 files changed, 426 insertions, 41 deletions
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index ea80776a6..dff9fa1bb 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -1,17 +1,22 @@
from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
import StringIO, unicodedata
import sqlalchemy as sa
+from sqlalchemy import types as sql_types
+from sqlalchemy import schema
+from sqlalchemy.engine.reflection import Inspector
from sqlalchemy import MetaData
from sqlalchemy.test.schema import Table
from sqlalchemy.test.schema import Column
import sqlalchemy as tsa
from sqlalchemy.test import TestBase, ComparesTables, testing, engines
+create_inspector = Inspector.from_engine
metadata, users = None, None
class ReflectionTest(TestBase, ComparesTables):
+ @testing.exclude('mssql', '<', (10, 0, 0), 'Date is only supported on MSSQL 2008+')
@testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely')
def test_basic_reflection(self):
meta = MetaData(testing.db)
@@ -22,16 +27,16 @@ class ReflectionTest(TestBase, ComparesTables):
Column('test1', sa.CHAR(5), nullable=False),
Column('test2', sa.Float(5), nullable=False),
Column('test3', sa.Text),
- Column('test4', sa.Numeric, nullable = False),
- Column('test5', sa.DateTime),
+ Column('test4', sa.Numeric(10, 2), nullable = False),
+ Column('test5', sa.Date),
Column('parent_user_id', sa.Integer,
sa.ForeignKey('engine_users.user_id')),
- Column('test6', sa.DateTime, nullable=False),
+ Column('test6', sa.Date, nullable=False),
Column('test7', sa.Text),
Column('test8', sa.Binary),
Column('test_passivedefault2', sa.Integer, server_default='5'),
Column('test9', sa.Binary(100)),
- Column('test_numeric', sa.Numeric()),
+ Column('test10', sa.Numeric(10, 2)),
test_needs_fk=True,
)
@@ -52,9 +57,35 @@ class ReflectionTest(TestBase, ComparesTables):
self.assert_tables_equal(users, reflected_users)
self.assert_tables_equal(addresses, reflected_addresses)
finally:
- addresses.drop()
- users.drop()
-
+ meta.drop_all()
+
+ def test_two_foreign_keys(self):
+ meta = MetaData(testing.db)
+ t1 = Table('t1', meta,
+ Column('id', sa.Integer, primary_key=True),
+ Column('t2id', sa.Integer, sa.ForeignKey('t2.id')),
+ Column('t3id', sa.Integer, sa.ForeignKey('t3.id')),
+ test_needs_fk=True
+ )
+ t2 = Table('t2', meta,
+ Column('id', sa.Integer, primary_key=True),
+ test_needs_fk=True
+ )
+ t3 = Table('t3', meta,
+ Column('id', sa.Integer, primary_key=True),
+ test_needs_fk=True
+ )
+ meta.create_all()
+ try:
+ meta2 = MetaData()
+ t1r, t2r, t3r = [Table(x, meta2, autoload=True, autoload_with=testing.db) for x in ('t1', 't2', 't3')]
+
+ assert t1r.c.t2id.references(t2r.c.id)
+ assert t1r.c.t3id.references(t3r.c.id)
+
+ finally:
+ meta.drop_all()
+
def test_include_columns(self):
meta = MetaData(testing.db)
foo = Table('foo', meta, *[Column(n, sa.String(30))
@@ -84,26 +115,68 @@ class ReflectionTest(TestBase, ComparesTables):
finally:
meta.drop_all()
+ @testing.emits_warning(r".*omitted columns")
+ def test_include_columns_indexes(self):
+ m = MetaData(testing.db)
+
+ t1 = Table('t1', m, Column('a', sa.Integer), Column('b', sa.Integer))
+ sa.Index('foobar', t1.c.a, t1.c.b)
+ sa.Index('bat', t1.c.a)
+ m.create_all()
+ try:
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True)
+ assert len(t2.indexes) == 2
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True, include_columns=['a'])
+ assert len(t2.indexes) == 1
+
+ m2 = MetaData(testing.db)
+ t2 = Table('t1', m2, autoload=True, include_columns=['a', 'b'])
+ assert len(t2.indexes) == 2
+ finally:
+ m.drop_all()
+
+ def test_autoincrement_col(self):
+ """test that 'autoincrement' is reflected according to sqla's policy.
+
+ Don't mark this test as unsupported for any backend !
+
+ (technically it fails with MySQL InnoDB since "id" comes before "id2")
+
+ """
+
+ meta = MetaData(testing.db)
+ t1 = Table('test', meta,
+ Column('id', sa.Integer, primary_key=True),
+ Column('data', sa.String(50)),
+ )
+ t2 = Table('test2', meta,
+ Column('id', sa.Integer, sa.ForeignKey('test.id'), primary_key=True),
+ Column('id2', sa.Integer, primary_key=True),
+ Column('data', sa.String(50)),
+ )
+ meta.create_all()
+ try:
+ m2 = MetaData(testing.db)
+ t1a = Table('test', m2, autoload=True)
+ assert t1a._autoincrement_column is t1a.c.id
+
+ t2a = Table('test2', m2, autoload=True)
+ assert t2a._autoincrement_column is t2a.c.id2
+
+ finally:
+ meta.drop_all()
+
def test_unknown_types(self):
meta = MetaData(testing.db)
t = Table("test", meta,
Column('foo', sa.DateTime))
- import sys
- dialect_module = sys.modules[testing.db.dialect.__module__]
-
- # we're relying on the presence of "ischema_names" in the
- # dialect module, else we can't test this. we need to be able
- # to get the dialect to not be aware of some type so we temporarily
- # monkeypatch. not sure what a better way for this could be,
- # except for an established dialect hook or dialect-specific tests
- if not hasattr(dialect_module, 'ischema_names'):
- return
-
- ischema_names = dialect_module.ischema_names
+ ischema_names = testing.db.dialect.ischema_names
t.create()
- dialect_module.ischema_names = {}
+ testing.db.dialect.ischema_names = {}
try:
m2 = MetaData(testing.db)
assert_raises(tsa.exc.SAWarning, Table, "test", m2, autoload=True)
@@ -115,7 +188,7 @@ class ReflectionTest(TestBase, ComparesTables):
assert t3.c.foo.type.__class__ == sa.types.NullType
finally:
- dialect_module.ischema_names = ischema_names
+ testing.db.dialect.ischema_names = ischema_names
t.drop()
def test_basic_override(self):
@@ -578,7 +651,6 @@ class ReflectionTest(TestBase, ComparesTables):
m9.reflect()
self.assert_(not m9.tables)
- @testing.fails_on_everything_except('postgres', 'mysql')
def test_index_reflection(self):
m1 = MetaData(testing.db)
t1 = Table('party', m1,
@@ -698,7 +770,7 @@ class UnicodeReflectionTest(TestBase):
def test_basic(self):
try:
# the 'convert_unicode' should not get in the way of the reflection
- # process. reflecttable for oracle, postgres (others?) expect non-unicode
+ # process. reflecttable for oracle, postgresql (others?) expect non-unicode
# strings in result sets/bind params
bind = engines.utf8_engine(options={'convert_unicode':True})
metadata = MetaData(bind)
@@ -713,7 +785,8 @@ class UnicodeReflectionTest(TestBase):
metadata.create_all()
reflected = set(bind.table_names())
- if not names.issubset(reflected):
+ # Jython 2.5 on Java 5 lacks unicodedata.normalize
+ if not names.issubset(reflected) and hasattr(unicodedata, 'normalize'):
# Python source files in the utf-8 coding seem to normalize
# literals as NFC (and the above are explicitly NFC). Maybe
# this database normalizes NFD on reflection.
@@ -741,23 +814,15 @@ class SchemaTest(TestBase):
Column('col1', sa.Integer, primary_key=True),
Column('col2', sa.Integer, sa.ForeignKey('someschema.table1.col1')),
schema='someschema')
- # ensure this doesnt crash
- print [t for t in metadata.sorted_tables]
- buf = StringIO.StringIO()
- def foo(s, p=None):
- buf.write(s)
- gen = sa.create_engine(testing.db.name + "://", strategy="mock", executor=foo)
- gen = gen.dialect.schemagenerator(gen.dialect, gen)
- gen.traverse(table1)
- gen.traverse(table2)
- buf = buf.getvalue()
- print buf
+
+ t1 = str(schema.CreateTable(table1).compile(bind=testing.db))
+ t2 = str(schema.CreateTable(table2).compile(bind=testing.db))
if testing.db.dialect.preparer(testing.db.dialect).omit_schema:
- assert buf.index("CREATE TABLE table1") > -1
- assert buf.index("CREATE TABLE table2") > -1
+ assert t1.index("CREATE TABLE table1") > -1
+ assert t2.index("CREATE TABLE table2") > -1
else:
- assert buf.index("CREATE TABLE someschema.table1") > -1
- assert buf.index("CREATE TABLE someschema.table2") > -1
+ assert t1.index("CREATE TABLE someschema.table1") > -1
+ assert t2.index("CREATE TABLE someschema.table2") > -1
@testing.crashes('firebird', 'No schema support')
@testing.fails_on('sqlite', 'FIXME: unknown')
@@ -767,9 +832,9 @@ class SchemaTest(TestBase):
def test_explicit_default_schema(self):
engine = testing.db
- if testing.against('mysql'):
+ if testing.against('mysql+mysqldb'):
schema = testing.db.url.database
- elif testing.against('postgres'):
+ elif testing.against('postgresql'):
schema = 'public'
elif testing.against('sqlite'):
# Works for CREATE TABLE main.foo, SELECT FROM main.foo, etc.,
@@ -820,4 +885,324 @@ class HasSequenceTest(TestBase):
metadata.drop_all(bind=testing.db)
eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False)
+# Tests related to engine.reflection
+
+def get_schema():
+ if testing.against('oracle'):
+ return 'scott'
+ return 'test_schema'
+
+def createTables(meta, schema=None):
+ if schema:
+ parent_user_id = Column('parent_user_id', sa.Integer,
+ sa.ForeignKey('%s.users.user_id' % schema)
+ )
+ else:
+ parent_user_id = Column('parent_user_id', sa.Integer,
+ sa.ForeignKey('users.user_id')
+ )
+
+ users = Table('users', meta,
+ Column('user_id', sa.INT, primary_key=True),
+ Column('user_name', sa.VARCHAR(20), nullable=False),
+ Column('test1', sa.CHAR(5), nullable=False),
+ Column('test2', sa.Float(5), nullable=False),
+ Column('test3', sa.Text),
+ Column('test4', sa.Numeric(10, 2), nullable = False),
+ Column('test5', sa.DateTime),
+ Column('test5-1', sa.TIMESTAMP),
+ parent_user_id,
+ Column('test6', sa.DateTime, nullable=False),
+ Column('test7', sa.Text),
+ Column('test8', sa.Binary),
+ Column('test_passivedefault2', sa.Integer, server_default='5'),
+ Column('test9', sa.Binary(100)),
+ Column('test10', sa.Numeric(10, 2)),
+ schema=schema,
+ test_needs_fk=True,
+ )
+ addresses = Table('email_addresses', meta,
+ Column('address_id', sa.Integer, primary_key = True),
+ Column('remote_user_id', sa.Integer,
+ sa.ForeignKey(users.c.user_id)),
+ Column('email_address', sa.String(20)),
+ schema=schema,
+ test_needs_fk=True,
+ )
+ return (users, addresses)
+
+def createIndexes(con, schema=None):
+ fullname = 'users'
+ if schema:
+ fullname = "%s.%s" % (schema, 'users')
+ query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname
+ con.execute(sa.sql.text(query))
+
+def createViews(con, schema=None):
+ for table_name in ('users', 'email_addresses'):
+ fullname = table_name
+ if schema:
+ fullname = "%s.%s" % (schema, table_name)
+ view_name = fullname + '_v'
+ query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name,
+ fullname)
+ con.execute(sa.sql.text(query))
+
+def dropViews(con, schema=None):
+ for table_name in ('email_addresses', 'users'):
+ fullname = table_name
+ if schema:
+ fullname = "%s.%s" % (schema, table_name)
+ view_name = fullname + '_v'
+ query = "DROP VIEW %s" % view_name
+ con.execute(sa.sql.text(query))
+
+
+class ComponentReflectionTest(TestBase):
+
+ @testing.requires.schemas
+ def test_get_schema_names(self):
+ meta = MetaData(testing.db)
+ insp = Inspector(meta.bind)
+
+ self.assert_(get_schema() in insp.get_schema_names())
+
+ def _test_get_table_names(self, schema=None, table_type='table',
+ order_by=None):
+ meta = MetaData(testing.db)
+ (users, addresses) = createTables(meta, schema)
+ meta.create_all()
+ createViews(meta.bind, schema)
+ try:
+ insp = Inspector(meta.bind)
+ if table_type == 'view':
+ table_names = insp.get_view_names(schema)
+ table_names.sort()
+ answer = ['email_addresses_v', 'users_v']
+ else:
+ table_names = insp.get_table_names(schema,
+ order_by=order_by)
+ table_names.sort()
+ if order_by == 'foreign_key':
+ answer = ['users', 'email_addresses']
+ else:
+ answer = ['email_addresses', 'users']
+ eq_(table_names, answer)
+ finally:
+ dropViews(meta.bind, schema)
+ addresses.drop()
+ users.drop()
+
+ def test_get_table_names(self):
+ self._test_get_table_names()
+
+ @testing.requires.schemas
+ def test_get_table_names_with_schema(self):
+ self._test_get_table_names(get_schema())
+
+ def test_get_view_names(self):
+ self._test_get_table_names(table_type='view')
+
+ @testing.requires.schemas
+ def test_get_view_names_with_schema(self):
+ self._test_get_table_names(get_schema(), table_type='view')
+
+ def _test_get_columns(self, schema=None, table_type='table'):
+ meta = MetaData(testing.db)
+ (users, addresses) = createTables(meta, schema)
+ table_names = ['users', 'email_addresses']
+ meta.create_all()
+ if table_type == 'view':
+ createViews(meta.bind, schema)
+ table_names = ['users_v', 'email_addresses_v']
+ try:
+ insp = Inspector(meta.bind)
+ for (table_name, table) in zip(table_names, (users, addresses)):
+ schema_name = schema
+ cols = insp.get_columns(table_name, schema=schema_name)
+ self.assert_(len(cols) > 0, len(cols))
+ # should be in order
+ for (i, col) in enumerate(table.columns):
+ eq_(col.name, cols[i]['name'])
+ ctype = cols[i]['type'].__class__
+ ctype_def = col.type
+ if isinstance(ctype_def, sa.types.TypeEngine):
+ ctype_def = ctype_def.__class__
+
+ # Oracle returns Date for DateTime.
+ if testing.against('oracle') \
+ and ctype_def in (sql_types.Date, sql_types.DateTime):
+ ctype_def = sql_types.Date
+
+ # assert that the desired type and return type
+ # share a base within one of the generic types.
+ self.assert_(
+ len(
+ set(
+ ctype.__mro__
+ ).intersection(ctype_def.__mro__)
+ .intersection([sql_types.Integer, sql_types.Numeric,
+ sql_types.DateTime, sql_types.Date, sql_types.Time,
+ sql_types.String, sql_types.Binary])
+ ) > 0
+ ,("%s(%s), %s(%s)" % (col.name, col.type, cols[i]['name'],
+ ctype)))
+ finally:
+ if table_type == 'view':
+ dropViews(meta.bind, schema)
+ addresses.drop()
+ users.drop()
+
+ def test_get_columns(self):
+ self._test_get_columns()
+
+ @testing.requires.schemas
+ def test_get_columns_with_schema(self):
+ self._test_get_columns(schema=get_schema())
+
+ def test_get_view_columns(self):
+ self._test_get_columns(table_type='view')
+
+ @testing.requires.schemas
+ def test_get_view_columns_with_schema(self):
+ self._test_get_columns(schema=get_schema(), table_type='view')
+
+ def _test_get_primary_keys(self, schema=None):
+ meta = MetaData(testing.db)
+ (users, addresses) = createTables(meta, schema)
+ meta.create_all()
+ insp = Inspector(meta.bind)
+ try:
+ users_pkeys = insp.get_primary_keys(users.name,
+ schema=schema)
+ eq_(users_pkeys, ['user_id'])
+ addr_pkeys = insp.get_primary_keys(addresses.name,
+ schema=schema)
+ eq_(addr_pkeys, ['address_id'])
+
+ finally:
+ addresses.drop()
+ users.drop()
+
+ def test_get_primary_keys(self):
+ self._test_get_primary_keys()
+
+ @testing.fails_on('sqlite', 'no schemas')
+ def test_get_primary_keys_with_schema(self):
+ self._test_get_primary_keys(schema=get_schema())
+
+ def _test_get_foreign_keys(self, schema=None):
+ meta = MetaData(testing.db)
+ (users, addresses) = createTables(meta, schema)
+ meta.create_all()
+ insp = Inspector(meta.bind)
+ try:
+ expected_schema = schema
+ # users
+ users_fkeys = insp.get_foreign_keys(users.name,
+ schema=schema)
+ fkey1 = users_fkeys[0]
+ self.assert_(fkey1['name'] is not None)
+ eq_(fkey1['referred_schema'], expected_schema)
+ eq_(fkey1['referred_table'], users.name)
+ eq_(fkey1['referred_columns'], ['user_id', ])
+ eq_(fkey1['constrained_columns'], ['parent_user_id'])
+ #addresses
+ addr_fkeys = insp.get_foreign_keys(addresses.name,
+ schema=schema)
+ fkey1 = addr_fkeys[0]
+ self.assert_(fkey1['name'] is not None)
+ eq_(fkey1['referred_schema'], expected_schema)
+ eq_(fkey1['referred_table'], users.name)
+ eq_(fkey1['referred_columns'], ['user_id', ])
+ eq_(fkey1['constrained_columns'], ['remote_user_id'])
+ finally:
+ addresses.drop()
+ users.drop()
+
+ def test_get_foreign_keys(self):
+ self._test_get_foreign_keys()
+
+ @testing.requires.schemas
+ def test_get_foreign_keys_with_schema(self):
+ self._test_get_foreign_keys(schema=get_schema())
+
+ def _test_get_indexes(self, schema=None):
+ meta = MetaData(testing.db)
+ (users, addresses) = createTables(meta, schema)
+ meta.create_all()
+ createIndexes(meta.bind, schema)
+ try:
+ # The database may decide to create indexes for foreign keys, etc.
+ # so there may be more indexes than expected.
+ insp = Inspector(meta.bind)
+ indexes = insp.get_indexes('users', schema=schema)
+ indexes.sort()
+ expected_indexes = [
+ {'unique': False,
+ 'column_names': ['test1', 'test2'],
+ 'name': 'users_t_idx'}]
+ index_names = [d['name'] for d in indexes]
+ for e_index in expected_indexes:
+ assert e_index['name'] in index_names
+ index = indexes[index_names.index(e_index['name'])]
+ for key in e_index:
+ eq_(e_index[key], index[key])
+
+ finally:
+ addresses.drop()
+ users.drop()
+
+ def test_get_indexes(self):
+ self._test_get_indexes()
+
+ @testing.requires.schemas
+ def test_get_indexes_with_schema(self):
+ self._test_get_indexes(schema=get_schema())
+
+ def _test_get_view_definition(self, schema=None):
+ meta = MetaData(testing.db)
+ (users, addresses) = createTables(meta, schema)
+ meta.create_all()
+ createViews(meta.bind, schema)
+ view_name1 = 'users_v'
+ view_name2 = 'email_addresses_v'
+ try:
+ insp = Inspector(meta.bind)
+ v1 = insp.get_view_definition(view_name1, schema=schema)
+ self.assert_(v1)
+ v2 = insp.get_view_definition(view_name2, schema=schema)
+ self.assert_(v2)
+ finally:
+ dropViews(meta.bind, schema)
+ addresses.drop()
+ users.drop()
+
+ def test_get_view_definition(self):
+ self._test_get_view_definition()
+
+ @testing.requires.schemas
+ def test_get_view_definition_with_schema(self):
+ self._test_get_view_definition(schema=get_schema())
+
+ def _test_get_table_oid(self, table_name, schema=None):
+ if testing.against('postgresql'):
+ meta = MetaData(testing.db)
+ (users, addresses) = createTables(meta, schema)
+ meta.create_all()
+ try:
+ insp = create_inspector(meta.bind)
+ oid = insp.get_table_oid(table_name, schema)
+ self.assert_(isinstance(oid, (int, long)))
+ finally:
+ addresses.drop()
+ users.drop()
+
+ def test_get_table_oid(self):
+ self._test_get_table_oid('users')
+
+ @testing.requires.schemas
+ def test_get_table_oid_with_schema(self):
+ self._test_get_table_oid('users', schema=get_schema())
+