diff options
Diffstat (limited to 'test/dialect/oracle/test_reflection.py')
-rw-r--r-- | test/dialect/oracle/test_reflection.py | 100 |
1 files changed, 55 insertions, 45 deletions
diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py index bd881ac1c..efa21fc1a 100644 --- a/test/dialect/oracle/test_reflection.py +++ b/test/dialect/oracle/test_reflection.py @@ -123,17 +123,18 @@ drop synonym %(test_schema)s.local_table; Column("pid", Integer, ForeignKey("%s.parent.pid" % schema)), schema=schema, ) - meta.create_all() - parent.insert().execute({"pid": 1}) - child.insert().execute({"cid": 1, "pid": 1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) + with testing.db.begin() as conn: + meta.create_all(conn) + conn.execute(parent.insert(), {"pid": 1}) + conn.execute(child.insert(), {"cid": 1, "pid": 1}) + eq_(conn.execute(child.select()).fetchall(), [(1, 1)]) def test_reflect_alt_table_owner_local_synonym(self): - meta = MetaData(testing.db) + meta = MetaData() parent = Table( "%s_pt" % testing.config.test_schema, meta, - autoload=True, + autoload_with=testing.db, oracle_resolve_synonyms=True, ) self.assert_compile( @@ -142,14 +143,13 @@ drop synonym %(test_schema)s.local_table; "%(test_schema)s_pt.data FROM %(test_schema)s_pt" % {"test_schema": testing.config.test_schema}, ) - select(parent).execute().fetchall() def test_reflect_alt_synonym_owner_local_table(self): - meta = MetaData(testing.db) + meta = MetaData() parent = Table( "local_table", meta, - autoload=True, + autoload_with=testing.db, oracle_resolve_synonyms=True, schema=testing.config.test_schema, ) @@ -160,7 +160,6 @@ drop synonym %(test_schema)s.local_table; "FROM %(test_schema)s.local_table" % {"test_schema": testing.config.test_schema}, ) - select(parent).execute().fetchall() @testing.provide_metadata def test_create_same_names_implicit_schema(self): @@ -180,12 +179,18 @@ drop synonym %(test_schema)s.local_table; eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_owner_explicit(self): - meta = MetaData(testing.db) + meta = MetaData() parent = Table( - "parent", meta, autoload=True, schema=testing.config.test_schema + "parent", + meta, + autoload_with=testing.db, + schema=testing.config.test_schema, ) child = Table( - "child", meta, autoload=True, schema=testing.config.test_schema + "child", + meta, + autoload_with=testing.db, + schema=testing.config.test_schema, ) self.assert_compile( @@ -194,9 +199,10 @@ drop synonym %(test_schema)s.local_table; "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" % {"test_schema": testing.config.test_schema}, ) - select(parent, child).select_from( - parent.join(child) - ).execute().fetchall() + with testing.db.connect() as conn: + conn.execute( + select(parent, child).select_from(parent.join(child)) + ).fetchall() # check table comment (#5146) eq_(parent.comment, "my table comment") @@ -241,8 +247,8 @@ drop synonym %(test_schema)s.local_table; % {"test_schema": testing.config.test_schema}, ) try: - meta = MetaData(testing.db) - lcl = Table("localtable", meta, autoload=True) + meta = MetaData() + lcl = Table("localtable", meta, autoload_with=testing.db) parent = meta.tables["%s.parent" % testing.config.test_schema] self.assert_compile( parent.join(lcl), @@ -251,19 +257,22 @@ drop synonym %(test_schema)s.local_table; "localtable.parent_id" % {"test_schema": testing.config.test_schema}, ) - select(parent, lcl).select_from( - parent.join(lcl) - ).execute().fetchall() finally: exec_sql(testing.db, "DROP TABLE localtable") def test_reflect_alt_owner_implicit(self): - meta = MetaData(testing.db) + meta = MetaData() parent = Table( - "parent", meta, autoload=True, schema=testing.config.test_schema + "parent", + meta, + autoload_with=testing.db, + schema=testing.config.test_schema, ) child = Table( - "child", meta, autoload=True, schema=testing.config.test_schema + "child", + meta, + autoload_with=testing.db, + schema=testing.config.test_schema, ) self.assert_compile( parent.join(child), @@ -272,9 +281,10 @@ drop synonym %(test_schema)s.local_table; "%(test_schema)s.child.parent_id" % {"test_schema": testing.config.test_schema}, ) - select(parent, child).select_from( - parent.join(child) - ).execute().fetchall() + with testing.db.connect() as conn: + conn.execute( + select(parent, child).select_from(parent.join(child)) + ).fetchall() def test_reflect_alt_owner_synonyms(self): exec_sql( @@ -284,9 +294,12 @@ drop synonym %(test_schema)s.local_table; "%s.ptable(id))" % testing.config.test_schema, ) try: - meta = MetaData(testing.db) + meta = MetaData() lcl = Table( - "localtable", meta, autoload=True, oracle_resolve_synonyms=True + "localtable", + meta, + autoload_with=testing.db, + oracle_resolve_synonyms=True, ) parent = meta.tables["%s.ptable" % testing.config.test_schema] self.assert_compile( @@ -296,25 +309,26 @@ drop synonym %(test_schema)s.local_table; "localtable.parent_id" % {"test_schema": testing.config.test_schema}, ) - select(parent, lcl).select_from( - parent.join(lcl) - ).execute().fetchall() + with testing.db.connect() as conn: + conn.execute( + select(parent, lcl).select_from(parent.join(lcl)) + ).fetchall() finally: exec_sql(testing.db, "DROP TABLE localtable") def test_reflect_remote_synonyms(self): - meta = MetaData(testing.db) + meta = MetaData() parent = Table( "ptable", meta, - autoload=True, + autoload_with=testing.db, schema=testing.config.test_schema, oracle_resolve_synonyms=True, ) child = Table( "ctable", meta, - autoload=True, + autoload_with=testing.db, schema=testing.config.test_schema, oracle_resolve_synonyms=True, ) @@ -326,9 +340,6 @@ drop synonym %(test_schema)s.local_table; "%(test_schema)s.ctable.parent_id" % {"test_schema": testing.config.test_schema}, ) - select(parent, child).select_from( - parent.join(child) - ).execute().fetchall() class ConstraintTest(fixtures.TablesTest): @@ -488,9 +499,9 @@ class TableReflectionTest(fixtures.TestBase): ) metadata.create_all() - m2 = MetaData(testing.db) + m2 = MetaData() - tbl = Table("test_compress", m2, autoload=True) + tbl = Table("test_compress", m2, autoload_with=testing.db) # Don't hardcode the exact value, but it must be non-empty assert tbl.dialect_options["oracle"]["compress"] @@ -507,9 +518,9 @@ class TableReflectionTest(fixtures.TestBase): ) metadata.create_all() - m2 = MetaData(testing.db) + m2 = MetaData() - tbl = Table("test_compress", m2, autoload=True) + tbl = Table("test_compress", m2, autoload_with=testing.db) assert tbl.dialect_options["oracle"]["compress"] == "OLTP" @@ -760,7 +771,6 @@ class DBLinkReflectionTest(fixtures.TestBase): t = Table( "test_table_syn", m, - autoload=True, autoload_with=testing.db, oracle_resolve_synonyms=True, ) @@ -778,8 +788,8 @@ class TypeReflectionTest(fixtures.TestBase): m = self.metadata Table("oracle_types", m, *columns) m.create_all() - m2 = MetaData(testing.db) - table = Table("oracle_types", m2, autoload=True) + m2 = MetaData() + table = Table("oracle_types", m2, autoload_with=testing.db) for i, (reflected_col, spec) in enumerate(zip(table.c, specs)): expected_spec = spec[1] reflected_type = reflected_col.type |