diff options
Diffstat (limited to 'test/engine/test_reflection.py')
| -rw-r--r-- | test/engine/test_reflection.py | 467 |
1 files changed, 426 insertions, 41 deletions
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index ea80776a6..dff9fa1bb 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -1,17 +1,22 @@ from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message import StringIO, unicodedata import sqlalchemy as sa +from sqlalchemy import types as sql_types +from sqlalchemy import schema +from sqlalchemy.engine.reflection import Inspector from sqlalchemy import MetaData from sqlalchemy.test.schema import Table from sqlalchemy.test.schema import Column import sqlalchemy as tsa from sqlalchemy.test import TestBase, ComparesTables, testing, engines +create_inspector = Inspector.from_engine metadata, users = None, None class ReflectionTest(TestBase, ComparesTables): + @testing.exclude('mssql', '<', (10, 0, 0), 'Date is only supported on MSSQL 2008+') @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') def test_basic_reflection(self): meta = MetaData(testing.db) @@ -22,16 +27,16 @@ class ReflectionTest(TestBase, ComparesTables): Column('test1', sa.CHAR(5), nullable=False), Column('test2', sa.Float(5), nullable=False), Column('test3', sa.Text), - Column('test4', sa.Numeric, nullable = False), - Column('test5', sa.DateTime), + Column('test4', sa.Numeric(10, 2), nullable = False), + Column('test5', sa.Date), Column('parent_user_id', sa.Integer, sa.ForeignKey('engine_users.user_id')), - Column('test6', sa.DateTime, nullable=False), + Column('test6', sa.Date, nullable=False), Column('test7', sa.Text), Column('test8', sa.Binary), Column('test_passivedefault2', sa.Integer, server_default='5'), Column('test9', sa.Binary(100)), - Column('test_numeric', sa.Numeric()), + Column('test10', sa.Numeric(10, 2)), test_needs_fk=True, ) @@ -52,9 +57,35 @@ class ReflectionTest(TestBase, ComparesTables): self.assert_tables_equal(users, reflected_users) self.assert_tables_equal(addresses, reflected_addresses) finally: - addresses.drop() - users.drop() - + meta.drop_all() + + def test_two_foreign_keys(self): + meta = MetaData(testing.db) + t1 = Table('t1', meta, + Column('id', sa.Integer, primary_key=True), + Column('t2id', sa.Integer, sa.ForeignKey('t2.id')), + Column('t3id', sa.Integer, sa.ForeignKey('t3.id')), + test_needs_fk=True + ) + t2 = Table('t2', meta, + Column('id', sa.Integer, primary_key=True), + test_needs_fk=True + ) + t3 = Table('t3', meta, + Column('id', sa.Integer, primary_key=True), + test_needs_fk=True + ) + meta.create_all() + try: + meta2 = MetaData() + t1r, t2r, t3r = [Table(x, meta2, autoload=True, autoload_with=testing.db) for x in ('t1', 't2', 't3')] + + assert t1r.c.t2id.references(t2r.c.id) + assert t1r.c.t3id.references(t3r.c.id) + + finally: + meta.drop_all() + def test_include_columns(self): meta = MetaData(testing.db) foo = Table('foo', meta, *[Column(n, sa.String(30)) @@ -84,26 +115,68 @@ class ReflectionTest(TestBase, ComparesTables): finally: meta.drop_all() + @testing.emits_warning(r".*omitted columns") + def test_include_columns_indexes(self): + m = MetaData(testing.db) + + t1 = Table('t1', m, Column('a', sa.Integer), Column('b', sa.Integer)) + sa.Index('foobar', t1.c.a, t1.c.b) + sa.Index('bat', t1.c.a) + m.create_all() + try: + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + assert len(t2.indexes) == 2 + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True, include_columns=['a']) + assert len(t2.indexes) == 1 + + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True, include_columns=['a', 'b']) + assert len(t2.indexes) == 2 + finally: + m.drop_all() + + def test_autoincrement_col(self): + """test that 'autoincrement' is reflected according to sqla's policy. + + Don't mark this test as unsupported for any backend ! + + (technically it fails with MySQL InnoDB since "id" comes before "id2") + + """ + + meta = MetaData(testing.db) + t1 = Table('test', meta, + Column('id', sa.Integer, primary_key=True), + Column('data', sa.String(50)), + ) + t2 = Table('test2', meta, + Column('id', sa.Integer, sa.ForeignKey('test.id'), primary_key=True), + Column('id2', sa.Integer, primary_key=True), + Column('data', sa.String(50)), + ) + meta.create_all() + try: + m2 = MetaData(testing.db) + t1a = Table('test', m2, autoload=True) + assert t1a._autoincrement_column is t1a.c.id + + t2a = Table('test2', m2, autoload=True) + assert t2a._autoincrement_column is t2a.c.id2 + + finally: + meta.drop_all() + def test_unknown_types(self): meta = MetaData(testing.db) t = Table("test", meta, Column('foo', sa.DateTime)) - import sys - dialect_module = sys.modules[testing.db.dialect.__module__] - - # we're relying on the presence of "ischema_names" in the - # dialect module, else we can't test this. we need to be able - # to get the dialect to not be aware of some type so we temporarily - # monkeypatch. not sure what a better way for this could be, - # except for an established dialect hook or dialect-specific tests - if not hasattr(dialect_module, 'ischema_names'): - return - - ischema_names = dialect_module.ischema_names + ischema_names = testing.db.dialect.ischema_names t.create() - dialect_module.ischema_names = {} + testing.db.dialect.ischema_names = {} try: m2 = MetaData(testing.db) assert_raises(tsa.exc.SAWarning, Table, "test", m2, autoload=True) @@ -115,7 +188,7 @@ class ReflectionTest(TestBase, ComparesTables): assert t3.c.foo.type.__class__ == sa.types.NullType finally: - dialect_module.ischema_names = ischema_names + testing.db.dialect.ischema_names = ischema_names t.drop() def test_basic_override(self): @@ -578,7 +651,6 @@ class ReflectionTest(TestBase, ComparesTables): m9.reflect() self.assert_(not m9.tables) - @testing.fails_on_everything_except('postgres', 'mysql') def test_index_reflection(self): m1 = MetaData(testing.db) t1 = Table('party', m1, @@ -698,7 +770,7 @@ class UnicodeReflectionTest(TestBase): def test_basic(self): try: # the 'convert_unicode' should not get in the way of the reflection - # process. reflecttable for oracle, postgres (others?) expect non-unicode + # process. reflecttable for oracle, postgresql (others?) expect non-unicode # strings in result sets/bind params bind = engines.utf8_engine(options={'convert_unicode':True}) metadata = MetaData(bind) @@ -713,7 +785,8 @@ class UnicodeReflectionTest(TestBase): metadata.create_all() reflected = set(bind.table_names()) - if not names.issubset(reflected): + # Jython 2.5 on Java 5 lacks unicodedata.normalize + if not names.issubset(reflected) and hasattr(unicodedata, 'normalize'): # Python source files in the utf-8 coding seem to normalize # literals as NFC (and the above are explicitly NFC). Maybe # this database normalizes NFD on reflection. @@ -741,23 +814,15 @@ class SchemaTest(TestBase): Column('col1', sa.Integer, primary_key=True), Column('col2', sa.Integer, sa.ForeignKey('someschema.table1.col1')), schema='someschema') - # ensure this doesnt crash - print [t for t in metadata.sorted_tables] - buf = StringIO.StringIO() - def foo(s, p=None): - buf.write(s) - gen = sa.create_engine(testing.db.name + "://", strategy="mock", executor=foo) - gen = gen.dialect.schemagenerator(gen.dialect, gen) - gen.traverse(table1) - gen.traverse(table2) - buf = buf.getvalue() - print buf + + t1 = str(schema.CreateTable(table1).compile(bind=testing.db)) + t2 = str(schema.CreateTable(table2).compile(bind=testing.db)) if testing.db.dialect.preparer(testing.db.dialect).omit_schema: - assert buf.index("CREATE TABLE table1") > -1 - assert buf.index("CREATE TABLE table2") > -1 + assert t1.index("CREATE TABLE table1") > -1 + assert t2.index("CREATE TABLE table2") > -1 else: - assert buf.index("CREATE TABLE someschema.table1") > -1 - assert buf.index("CREATE TABLE someschema.table2") > -1 + assert t1.index("CREATE TABLE someschema.table1") > -1 + assert t2.index("CREATE TABLE someschema.table2") > -1 @testing.crashes('firebird', 'No schema support') @testing.fails_on('sqlite', 'FIXME: unknown') @@ -767,9 +832,9 @@ class SchemaTest(TestBase): def test_explicit_default_schema(self): engine = testing.db - if testing.against('mysql'): + if testing.against('mysql+mysqldb'): schema = testing.db.url.database - elif testing.against('postgres'): + elif testing.against('postgresql'): schema = 'public' elif testing.against('sqlite'): # Works for CREATE TABLE main.foo, SELECT FROM main.foo, etc., @@ -820,4 +885,324 @@ class HasSequenceTest(TestBase): metadata.drop_all(bind=testing.db) eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False) +# Tests related to engine.reflection + +def get_schema(): + if testing.against('oracle'): + return 'scott' + return 'test_schema' + +def createTables(meta, schema=None): + if schema: + parent_user_id = Column('parent_user_id', sa.Integer, + sa.ForeignKey('%s.users.user_id' % schema) + ) + else: + parent_user_id = Column('parent_user_id', sa.Integer, + sa.ForeignKey('users.user_id') + ) + + users = Table('users', meta, + Column('user_id', sa.INT, primary_key=True), + Column('user_name', sa.VARCHAR(20), nullable=False), + Column('test1', sa.CHAR(5), nullable=False), + Column('test2', sa.Float(5), nullable=False), + Column('test3', sa.Text), + Column('test4', sa.Numeric(10, 2), nullable = False), + Column('test5', sa.DateTime), + Column('test5-1', sa.TIMESTAMP), + parent_user_id, + Column('test6', sa.DateTime, nullable=False), + Column('test7', sa.Text), + Column('test8', sa.Binary), + Column('test_passivedefault2', sa.Integer, server_default='5'), + Column('test9', sa.Binary(100)), + Column('test10', sa.Numeric(10, 2)), + schema=schema, + test_needs_fk=True, + ) + addresses = Table('email_addresses', meta, + Column('address_id', sa.Integer, primary_key = True), + Column('remote_user_id', sa.Integer, + sa.ForeignKey(users.c.user_id)), + Column('email_address', sa.String(20)), + schema=schema, + test_needs_fk=True, + ) + return (users, addresses) + +def createIndexes(con, schema=None): + fullname = 'users' + if schema: + fullname = "%s.%s" % (schema, 'users') + query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname + con.execute(sa.sql.text(query)) + +def createViews(con, schema=None): + for table_name in ('users', 'email_addresses'): + fullname = table_name + if schema: + fullname = "%s.%s" % (schema, table_name) + view_name = fullname + '_v' + query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name, + fullname) + con.execute(sa.sql.text(query)) + +def dropViews(con, schema=None): + for table_name in ('email_addresses', 'users'): + fullname = table_name + if schema: + fullname = "%s.%s" % (schema, table_name) + view_name = fullname + '_v' + query = "DROP VIEW %s" % view_name + con.execute(sa.sql.text(query)) + + +class ComponentReflectionTest(TestBase): + + @testing.requires.schemas + def test_get_schema_names(self): + meta = MetaData(testing.db) + insp = Inspector(meta.bind) + + self.assert_(get_schema() in insp.get_schema_names()) + + def _test_get_table_names(self, schema=None, table_type='table', + order_by=None): + meta = MetaData(testing.db) + (users, addresses) = createTables(meta, schema) + meta.create_all() + createViews(meta.bind, schema) + try: + insp = Inspector(meta.bind) + if table_type == 'view': + table_names = insp.get_view_names(schema) + table_names.sort() + answer = ['email_addresses_v', 'users_v'] + else: + table_names = insp.get_table_names(schema, + order_by=order_by) + table_names.sort() + if order_by == 'foreign_key': + answer = ['users', 'email_addresses'] + else: + answer = ['email_addresses', 'users'] + eq_(table_names, answer) + finally: + dropViews(meta.bind, schema) + addresses.drop() + users.drop() + + def test_get_table_names(self): + self._test_get_table_names() + + @testing.requires.schemas + def test_get_table_names_with_schema(self): + self._test_get_table_names(get_schema()) + + def test_get_view_names(self): + self._test_get_table_names(table_type='view') + + @testing.requires.schemas + def test_get_view_names_with_schema(self): + self._test_get_table_names(get_schema(), table_type='view') + + def _test_get_columns(self, schema=None, table_type='table'): + meta = MetaData(testing.db) + (users, addresses) = createTables(meta, schema) + table_names = ['users', 'email_addresses'] + meta.create_all() + if table_type == 'view': + createViews(meta.bind, schema) + table_names = ['users_v', 'email_addresses_v'] + try: + insp = Inspector(meta.bind) + for (table_name, table) in zip(table_names, (users, addresses)): + schema_name = schema + cols = insp.get_columns(table_name, schema=schema_name) + self.assert_(len(cols) > 0, len(cols)) + # should be in order + for (i, col) in enumerate(table.columns): + eq_(col.name, cols[i]['name']) + ctype = cols[i]['type'].__class__ + ctype_def = col.type + if isinstance(ctype_def, sa.types.TypeEngine): + ctype_def = ctype_def.__class__ + + # Oracle returns Date for DateTime. + if testing.against('oracle') \ + and ctype_def in (sql_types.Date, sql_types.DateTime): + ctype_def = sql_types.Date + + # assert that the desired type and return type + # share a base within one of the generic types. + self.assert_( + len( + set( + ctype.__mro__ + ).intersection(ctype_def.__mro__) + .intersection([sql_types.Integer, sql_types.Numeric, + sql_types.DateTime, sql_types.Date, sql_types.Time, + sql_types.String, sql_types.Binary]) + ) > 0 + ,("%s(%s), %s(%s)" % (col.name, col.type, cols[i]['name'], + ctype))) + finally: + if table_type == 'view': + dropViews(meta.bind, schema) + addresses.drop() + users.drop() + + def test_get_columns(self): + self._test_get_columns() + + @testing.requires.schemas + def test_get_columns_with_schema(self): + self._test_get_columns(schema=get_schema()) + + def test_get_view_columns(self): + self._test_get_columns(table_type='view') + + @testing.requires.schemas + def test_get_view_columns_with_schema(self): + self._test_get_columns(schema=get_schema(), table_type='view') + + def _test_get_primary_keys(self, schema=None): + meta = MetaData(testing.db) + (users, addresses) = createTables(meta, schema) + meta.create_all() + insp = Inspector(meta.bind) + try: + users_pkeys = insp.get_primary_keys(users.name, + schema=schema) + eq_(users_pkeys, ['user_id']) + addr_pkeys = insp.get_primary_keys(addresses.name, + schema=schema) + eq_(addr_pkeys, ['address_id']) + + finally: + addresses.drop() + users.drop() + + def test_get_primary_keys(self): + self._test_get_primary_keys() + + @testing.fails_on('sqlite', 'no schemas') + def test_get_primary_keys_with_schema(self): + self._test_get_primary_keys(schema=get_schema()) + + def _test_get_foreign_keys(self, schema=None): + meta = MetaData(testing.db) + (users, addresses) = createTables(meta, schema) + meta.create_all() + insp = Inspector(meta.bind) + try: + expected_schema = schema + # users + users_fkeys = insp.get_foreign_keys(users.name, + schema=schema) + fkey1 = users_fkeys[0] + self.assert_(fkey1['name'] is not None) + eq_(fkey1['referred_schema'], expected_schema) + eq_(fkey1['referred_table'], users.name) + eq_(fkey1['referred_columns'], ['user_id', ]) + eq_(fkey1['constrained_columns'], ['parent_user_id']) + #addresses + addr_fkeys = insp.get_foreign_keys(addresses.name, + schema=schema) + fkey1 = addr_fkeys[0] + self.assert_(fkey1['name'] is not None) + eq_(fkey1['referred_schema'], expected_schema) + eq_(fkey1['referred_table'], users.name) + eq_(fkey1['referred_columns'], ['user_id', ]) + eq_(fkey1['constrained_columns'], ['remote_user_id']) + finally: + addresses.drop() + users.drop() + + def test_get_foreign_keys(self): + self._test_get_foreign_keys() + + @testing.requires.schemas + def test_get_foreign_keys_with_schema(self): + self._test_get_foreign_keys(schema=get_schema()) + + def _test_get_indexes(self, schema=None): + meta = MetaData(testing.db) + (users, addresses) = createTables(meta, schema) + meta.create_all() + createIndexes(meta.bind, schema) + try: + # The database may decide to create indexes for foreign keys, etc. + # so there may be more indexes than expected. + insp = Inspector(meta.bind) + indexes = insp.get_indexes('users', schema=schema) + indexes.sort() + expected_indexes = [ + {'unique': False, + 'column_names': ['test1', 'test2'], + 'name': 'users_t_idx'}] + index_names = [d['name'] for d in indexes] + for e_index in expected_indexes: + assert e_index['name'] in index_names + index = indexes[index_names.index(e_index['name'])] + for key in e_index: + eq_(e_index[key], index[key]) + + finally: + addresses.drop() + users.drop() + + def test_get_indexes(self): + self._test_get_indexes() + + @testing.requires.schemas + def test_get_indexes_with_schema(self): + self._test_get_indexes(schema=get_schema()) + + def _test_get_view_definition(self, schema=None): + meta = MetaData(testing.db) + (users, addresses) = createTables(meta, schema) + meta.create_all() + createViews(meta.bind, schema) + view_name1 = 'users_v' + view_name2 = 'email_addresses_v' + try: + insp = Inspector(meta.bind) + v1 = insp.get_view_definition(view_name1, schema=schema) + self.assert_(v1) + v2 = insp.get_view_definition(view_name2, schema=schema) + self.assert_(v2) + finally: + dropViews(meta.bind, schema) + addresses.drop() + users.drop() + + def test_get_view_definition(self): + self._test_get_view_definition() + + @testing.requires.schemas + def test_get_view_definition_with_schema(self): + self._test_get_view_definition(schema=get_schema()) + + def _test_get_table_oid(self, table_name, schema=None): + if testing.against('postgresql'): + meta = MetaData(testing.db) + (users, addresses) = createTables(meta, schema) + meta.create_all() + try: + insp = create_inspector(meta.bind) + oid = insp.get_table_oid(table_name, schema) + self.assert_(isinstance(oid, (int, long))) + finally: + addresses.drop() + users.drop() + + def test_get_table_oid(self): + self._test_get_table_oid('users') + + @testing.requires.schemas + def test_get_table_oid_with_schema(self): + self._test_get_table_oid('users', schema=get_schema()) + |
