diff options
Diffstat (limited to 'test/dialect/oracle/test_reflection.py')
| -rw-r--r-- | test/dialect/oracle/test_reflection.py | 553 |
1 files changed, 333 insertions, 220 deletions
diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py index f749e513a..a88703ab0 100644 --- a/test/dialect/oracle/test_reflection.py +++ b/test/dialect/oracle/test_reflection.py @@ -6,22 +6,61 @@ from sqlalchemy import exc from sqlalchemy.sql import table from sqlalchemy.testing import fixtures, AssertsCompiledSQL from sqlalchemy import testing -from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\ - Index, MetaData, select, inspect, ForeignKey, String, func, \ - TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \ - literal_column, VARCHAR, create_engine, Date, NVARCHAR, \ - ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \ - union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \ - PrimaryKeyConstraint, FLOAT, INTEGER -from sqlalchemy.dialects.oracle.base import NUMBER, BINARY_DOUBLE, \ - BINARY_FLOAT, DOUBLE_PRECISION +from sqlalchemy import ( + Integer, + Text, + LargeBinary, + Unicode, + UniqueConstraint, + Index, + MetaData, + select, + inspect, + ForeignKey, + String, + func, + TypeDecorator, + bindparam, + Numeric, + TIMESTAMP, + CHAR, + text, + literal_column, + VARCHAR, + create_engine, + Date, + NVARCHAR, + ForeignKeyConstraint, + Sequence, + Float, + DateTime, + cast, + UnicodeText, + union, + except_, + type_coerce, + or_, + outerjoin, + DATE, + NCHAR, + outparam, + PrimaryKeyConstraint, + FLOAT, + INTEGER, +) +from sqlalchemy.dialects.oracle.base import ( + NUMBER, + BINARY_DOUBLE, + BINARY_FLOAT, + DOUBLE_PRECISION, +) from sqlalchemy.testing import assert_raises from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.schema import Table, Column class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @classmethod @@ -30,7 +69,8 @@ class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): # don't really know how else to go here unless # we connect as the other user. - for stmt in (""" + for stmt in ( + """ create table %(test_schema)s.parent( id integer primary key, data varchar2(50) @@ -60,13 +100,16 @@ create synonym %(test_schema)s.local_table for local_table; -- so we give it to public. ideas welcome. grant references on %(test_schema)s.parent to public; grant references on %(test_schema)s.child to public; -""" % {"test_schema": testing.config.test_schema}).split(";"): +""" + % {"test_schema": testing.config.test_schema} + ).split(";"): if stmt.strip(): testing.db.execute(stmt) @classmethod def teardown_class(cls): - for stmt in (""" + for stmt in ( + """ drop table %(test_schema)s.child; drop table %(test_schema)s.parent; drop table local_table; @@ -75,7 +118,9 @@ drop synonym %(test_schema)s.ptable; drop synonym %(test_schema)s_pt; drop synonym %(test_schema)s.local_table; -""" % {"test_schema": testing.config.test_schema}).split(";"): +""" + % {"test_schema": testing.config.test_schema} + ).split(";"): if stmt.strip(): testing.db.execute(stmt) @@ -83,192 +128,235 @@ drop synonym %(test_schema)s.local_table; def test_create_same_names_explicit_schema(self): schema = testing.db.dialect.default_schema_name meta = self.metadata - parent = Table('parent', meta, - Column('pid', Integer, primary_key=True), - schema=schema) - child = Table('child', meta, - Column('cid', Integer, primary_key=True), - Column('pid', - Integer, - ForeignKey('%s.parent.pid' % schema)), - schema=schema) + parent = Table( + "parent", + meta, + Column("pid", Integer, primary_key=True), + schema=schema, + ) + child = Table( + "child", + meta, + Column("cid", Integer, primary_key=True), + Column("pid", Integer, ForeignKey("%s.parent.pid" % schema)), + schema=schema, + ) meta.create_all() - parent.insert().execute({'pid': 1}) - child.insert().execute({'cid': 1, 'pid': 1}) + parent.insert().execute({"pid": 1}) + child.insert().execute({"cid": 1, "pid": 1}) eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_table_owner_local_synonym(self): meta = MetaData(testing.db) - parent = Table('%s_pt' % testing.config.test_schema, - meta, - autoload=True, - oracle_resolve_synonyms=True) - self.assert_compile(parent.select(), - "SELECT %(test_schema)s_pt.id, " - "%(test_schema)s_pt.data FROM %(test_schema)s_pt" - % {"test_schema": testing.config.test_schema}) + parent = Table( + "%s_pt" % testing.config.test_schema, + meta, + autoload=True, + oracle_resolve_synonyms=True, + ) + self.assert_compile( + parent.select(), + "SELECT %(test_schema)s_pt.id, " + "%(test_schema)s_pt.data FROM %(test_schema)s_pt" + % {"test_schema": testing.config.test_schema}, + ) select([parent]).execute().fetchall() def test_reflect_alt_synonym_owner_local_table(self): meta = MetaData(testing.db) parent = Table( - 'local_table', meta, autoload=True, - oracle_resolve_synonyms=True, schema=testing.config.test_schema) + "local_table", + meta, + autoload=True, + oracle_resolve_synonyms=True, + schema=testing.config.test_schema, + ) self.assert_compile( parent.select(), "SELECT %(test_schema)s.local_table.id, " "%(test_schema)s.local_table.data " - "FROM %(test_schema)s.local_table" % - {"test_schema": testing.config.test_schema} + "FROM %(test_schema)s.local_table" + % {"test_schema": testing.config.test_schema}, ) select([parent]).execute().fetchall() @testing.provide_metadata def test_create_same_names_implicit_schema(self): meta = self.metadata - parent = Table('parent', - meta, - Column('pid', Integer, primary_key=True)) - child = Table('child', meta, - Column('cid', Integer, primary_key=True), - Column('pid', Integer, ForeignKey('parent.pid'))) + parent = Table( + "parent", meta, Column("pid", Integer, primary_key=True) + ) + child = Table( + "child", + meta, + Column("cid", Integer, primary_key=True), + Column("pid", Integer, ForeignKey("parent.pid")), + ) meta.create_all() - parent.insert().execute({'pid': 1}) - child.insert().execute({'cid': 1, 'pid': 1}) + parent.insert().execute({"pid": 1}) + child.insert().execute({"cid": 1, "pid": 1}) eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_owner_explicit(self): meta = MetaData(testing.db) parent = Table( - 'parent', meta, autoload=True, - schema=testing.config.test_schema) + "parent", meta, autoload=True, schema=testing.config.test_schema + ) child = Table( - 'child', meta, autoload=True, - schema=testing.config.test_schema) + "child", meta, autoload=True, schema=testing.config.test_schema + ) self.assert_compile( parent.join(child), "%(test_schema)s.parent JOIN %(test_schema)s.child ON " - "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" % { - "test_schema": testing.config.test_schema - }) - select([parent, child]).\ - select_from(parent.join(child)).\ - execute().fetchall() + "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, child]).select_from( + parent.join(child) + ).execute().fetchall() def test_reflect_local_to_remote(self): testing.db.execute( - 'CREATE TABLE localtable (id INTEGER ' - 'PRIMARY KEY, parent_id INTEGER REFERENCES ' - '%(test_schema)s.parent(id))' % { - "test_schema": testing.config.test_schema}) + "CREATE TABLE localtable (id INTEGER " + "PRIMARY KEY, parent_id INTEGER REFERENCES " + "%(test_schema)s.parent(id))" + % {"test_schema": testing.config.test_schema} + ) try: meta = MetaData(testing.db) - lcl = Table('localtable', meta, autoload=True) - parent = meta.tables['%s.parent' % testing.config.test_schema] - self.assert_compile(parent.join(lcl), - '%(test_schema)s.parent JOIN localtable ON ' - '%(test_schema)s.parent.id = ' - 'localtable.parent_id' % { - "test_schema": testing.config.test_schema} - ) - select([parent, - lcl]).select_from(parent.join(lcl)).execute().fetchall() + lcl = Table("localtable", meta, autoload=True) + parent = meta.tables["%s.parent" % testing.config.test_schema] + self.assert_compile( + parent.join(lcl), + "%(test_schema)s.parent JOIN localtable ON " + "%(test_schema)s.parent.id = " + "localtable.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, lcl]).select_from( + parent.join(lcl) + ).execute().fetchall() finally: - testing.db.execute('DROP TABLE localtable') + testing.db.execute("DROP TABLE localtable") def test_reflect_alt_owner_implicit(self): meta = MetaData(testing.db) parent = Table( - 'parent', meta, autoload=True, - schema=testing.config.test_schema) + "parent", meta, autoload=True, schema=testing.config.test_schema + ) child = Table( - 'child', meta, autoload=True, - schema=testing.config.test_schema) + "child", meta, autoload=True, schema=testing.config.test_schema + ) self.assert_compile( parent.join(child), - '%(test_schema)s.parent JOIN %(test_schema)s.child ' - 'ON %(test_schema)s.parent.id = ' - '%(test_schema)s.child.parent_id' % { - "test_schema": testing.config.test_schema}) - select([parent, - child]).select_from(parent.join(child)).execute().fetchall() + "%(test_schema)s.parent JOIN %(test_schema)s.child " + "ON %(test_schema)s.parent.id = " + "%(test_schema)s.child.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, child]).select_from( + parent.join(child) + ).execute().fetchall() def test_reflect_alt_owner_synonyms(self): - testing.db.execute('CREATE TABLE localtable (id INTEGER ' - 'PRIMARY KEY, parent_id INTEGER REFERENCES ' - '%s.ptable(id))' % testing.config.test_schema) + testing.db.execute( + "CREATE TABLE localtable (id INTEGER " + "PRIMARY KEY, parent_id INTEGER REFERENCES " + "%s.ptable(id))" % testing.config.test_schema + ) try: meta = MetaData(testing.db) - lcl = Table('localtable', meta, autoload=True, - oracle_resolve_synonyms=True) - parent = meta.tables['%s.ptable' % testing.config.test_schema] + lcl = Table( + "localtable", meta, autoload=True, oracle_resolve_synonyms=True + ) + parent = meta.tables["%s.ptable" % testing.config.test_schema] self.assert_compile( parent.join(lcl), - '%(test_schema)s.ptable JOIN localtable ON ' - '%(test_schema)s.ptable.id = ' - 'localtable.parent_id' % { - "test_schema": testing.config.test_schema}) - select([parent, - lcl]).select_from(parent.join(lcl)).execute().fetchall() + "%(test_schema)s.ptable JOIN localtable ON " + "%(test_schema)s.ptable.id = " + "localtable.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, lcl]).select_from( + parent.join(lcl) + ).execute().fetchall() finally: - testing.db.execute('DROP TABLE localtable') + testing.db.execute("DROP TABLE localtable") def test_reflect_remote_synonyms(self): meta = MetaData(testing.db) - parent = Table('ptable', meta, autoload=True, - schema=testing.config.test_schema, - oracle_resolve_synonyms=True) - child = Table('ctable', meta, autoload=True, - schema=testing.config.test_schema, - oracle_resolve_synonyms=True) + parent = Table( + "ptable", + meta, + autoload=True, + schema=testing.config.test_schema, + oracle_resolve_synonyms=True, + ) + child = Table( + "ctable", + meta, + autoload=True, + schema=testing.config.test_schema, + oracle_resolve_synonyms=True, + ) self.assert_compile( parent.join(child), - '%(test_schema)s.ptable JOIN ' - '%(test_schema)s.ctable ' - 'ON %(test_schema)s.ptable.id = ' - '%(test_schema)s.ctable.parent_id' % { - "test_schema": testing.config.test_schema}) - select([parent, - child]).select_from(parent.join(child)).execute().fetchall() + "%(test_schema)s.ptable JOIN " + "%(test_schema)s.ctable " + "ON %(test_schema)s.ptable.id = " + "%(test_schema)s.ctable.parent_id" + % {"test_schema": testing.config.test_schema}, + ) + select([parent, child]).select_from( + parent.join(child) + ).execute().fetchall() class ConstraintTest(fixtures.TablesTest): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True run_deletes = None @classmethod def define_tables(cls, metadata): - Table('foo', metadata, Column('id', Integer, primary_key=True)) + Table("foo", metadata, Column("id", Integer, primary_key=True)) def test_oracle_has_no_on_update_cascade(self): - bar = Table('bar', self.metadata, - Column('id', Integer, primary_key=True), - Column('foo_id', - Integer, - ForeignKey('foo.id', onupdate='CASCADE'))) + bar = Table( + "bar", + self.metadata, + Column("id", Integer, primary_key=True), + Column( + "foo_id", Integer, ForeignKey("foo.id", onupdate="CASCADE") + ), + ) assert_raises(exc.SAWarning, bar.create) - bat = Table('bat', self.metadata, - Column('id', Integer, primary_key=True), - Column('foo_id', Integer), - ForeignKeyConstraint(['foo_id'], ['foo.id'], - onupdate='CASCADE')) + bat = Table( + "bat", + self.metadata, + Column("id", Integer, primary_key=True), + Column("foo_id", Integer), + ForeignKeyConstraint(["foo_id"], ["foo.id"], onupdate="CASCADE"), + ) assert_raises(exc.SAWarning, bat.create) def test_reflect_check_include_all(self): insp = inspect(testing.db) - eq_(insp.get_check_constraints('foo'), []) + eq_(insp.get_check_constraints("foo"), []) eq_( - [rec['sqltext'] - for rec in insp.get_check_constraints('foo', include_all=True)], - ['"ID" IS NOT NULL']) + [ + rec["sqltext"] + for rec in insp.get_check_constraints("foo", include_all=True) + ], + ['"ID" IS NOT NULL'], + ) class SystemTableTablenamesTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True def setup(self): @@ -287,23 +375,20 @@ class SystemTableTablenamesTest(fixtures.TestBase): def test_table_names_no_system(self): insp = inspect(testing.db) - eq_( - insp.get_table_names(), ["my_table"] - ) + eq_(insp.get_table_names(), ["my_table"]) def test_temp_table_names_no_system(self): insp = inspect(testing.db) - eq_( - insp.get_temp_table_names(), ["my_temp_table"] - ) + eq_(insp.get_temp_table_names(), ["my_temp_table"]) def test_table_names_w_system(self): engine = testing_engine(options={"exclude_tablespaces": ["FOO"]}) insp = inspect(engine) eq_( - set(insp.get_table_names()).intersection(["my_table", - "foo_table"]), - set(["my_table", "foo_table"]) + set(insp.get_table_names()).intersection( + ["my_table", "foo_table"] + ), + set(["my_table", "foo_table"]), ) @@ -311,11 +396,12 @@ class DontReflectIOTTest(fixtures.TestBase): """test that index overflow tables aren't included in table_names.""" - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True def setup(self): - testing.db.execute(""" + testing.db.execute( + """ CREATE TABLE admin_docindex( token char(20), doc_id NUMBER, @@ -326,7 +412,8 @@ class DontReflectIOTTest(fixtures.TestBase): TABLESPACE users PCTTHRESHOLD 20 OVERFLOW TABLESPACE users - """) + """ + ) def teardown(self): testing.db.execute("drop table admin_docindex") @@ -334,35 +421,37 @@ class DontReflectIOTTest(fixtures.TestBase): def test_reflect_all(self): m = MetaData(testing.db) m.reflect() - eq_( - set(t.name for t in m.tables.values()), - set(['admin_docindex']) - ) + eq_(set(t.name for t in m.tables.values()), set(["admin_docindex"])) class UnsupportedIndexReflectTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @testing.emits_warning("No column names") @testing.provide_metadata def test_reflect_functional_index(self): metadata = self.metadata - Table('test_index_reflect', metadata, - Column('data', String(20), primary_key=True)) + Table( + "test_index_reflect", + metadata, + Column("data", String(20), primary_key=True), + ) metadata.create_all() - testing.db.execute('CREATE INDEX DATA_IDX ON ' - 'TEST_INDEX_REFLECT (UPPER(DATA))') + testing.db.execute( + "CREATE INDEX DATA_IDX ON " "TEST_INDEX_REFLECT (UPPER(DATA))" + ) m2 = MetaData(testing.db) - Table('test_index_reflect', m2, autoload=True) + Table("test_index_reflect", m2, autoload=True) def all_tables_compression_missing(): try: - testing.db.execute('SELECT compression FROM all_tables') + testing.db.execute("SELECT compression FROM all_tables") if "Enterprise Edition" not in testing.db.scalar( - "select * from v$version"): + "select * from v$version" + ): return True return False except Exception: @@ -371,9 +460,10 @@ def all_tables_compression_missing(): def all_tables_compress_for_missing(): try: - testing.db.execute('SELECT compress_for FROM all_tables') + testing.db.execute("SELECT compress_for FROM all_tables") if "Enterprise Edition" not in testing.db.scalar( - "select * from v$version"): + "select * from v$version" + ): return True return False except Exception: @@ -381,7 +471,7 @@ def all_tables_compress_for_missing(): class TableReflectionTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @testing.provide_metadata @@ -389,35 +479,41 @@ class TableReflectionTest(fixtures.TestBase): def test_reflect_basic_compression(self): metadata = self.metadata - tbl = Table('test_compress', metadata, - Column('data', Integer, primary_key=True), - oracle_compress=True) + tbl = Table( + "test_compress", + metadata, + Column("data", Integer, primary_key=True), + oracle_compress=True, + ) metadata.create_all() m2 = MetaData(testing.db) - tbl = Table('test_compress', m2, autoload=True) + tbl = Table("test_compress", m2, autoload=True) # Don't hardcode the exact value, but it must be non-empty - assert tbl.dialect_options['oracle']['compress'] + assert tbl.dialect_options["oracle"]["compress"] @testing.provide_metadata @testing.fails_if(all_tables_compress_for_missing) def test_reflect_oltp_compression(self): metadata = self.metadata - tbl = Table('test_compress', metadata, - Column('data', Integer, primary_key=True), - oracle_compress="OLTP") + tbl = Table( + "test_compress", + metadata, + Column("data", Integer, primary_key=True), + oracle_compress="OLTP", + ) metadata.create_all() m2 = MetaData(testing.db) - tbl = Table('test_compress', m2, autoload=True) - assert tbl.dialect_options['oracle']['compress'] == "OLTP" + tbl = Table("test_compress", m2, autoload=True) + assert tbl.dialect_options["oracle"]["compress"] == "OLTP" class RoundTripIndexTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @testing.provide_metadata @@ -425,22 +521,27 @@ class RoundTripIndexTest(fixtures.TestBase): metadata = self.metadata s_table = Table( - "sometable", metadata, + "sometable", + metadata, Column("id_a", Unicode(255), primary_key=True), - Column("id_b", - Unicode(255), - primary_key=True, - unique=True), + Column("id_b", Unicode(255), primary_key=True, unique=True), Column("group", Unicode(255), primary_key=True), Column("col", Unicode(255)), - UniqueConstraint('col', 'group')) + UniqueConstraint("col", "group"), + ) # "group" is a keyword, so lower case - normalind = Index('tableind', s_table.c.id_b, s_table.c.group) - Index('compress1', s_table.c.id_a, s_table.c.id_b, - oracle_compress=True) - Index('compress2', s_table.c.id_a, s_table.c.id_b, s_table.c.col, - oracle_compress=1) + normalind = Index("tableind", s_table.c.id_b, s_table.c.group) + Index( + "compress1", s_table.c.id_a, s_table.c.id_b, oracle_compress=True + ) + Index( + "compress2", + s_table.c.id_a, + s_table.c.id_b, + s_table.c.col, + oracle_compress=1, + ) metadata.create_all() mirror = MetaData(testing.db) @@ -452,9 +553,11 @@ class RoundTripIndexTest(fixtures.TestBase): inspect.reflect() def obj_definition(obj): - return (obj.__class__, - tuple([c.name for c in obj.columns]), - getattr(obj, 'unique', None)) + return ( + obj.__class__, + tuple([c.name for c in obj.columns]), + getattr(obj, "unique", None), + ) # find what the primary k constraint name should be primaryconsname = testing.db.scalar( @@ -463,62 +566,72 @@ class RoundTripIndexTest(fixtures.TestBase): FROM all_constraints WHERE table_name = :table_name AND owner = :owner - AND constraint_type = 'P' """), + AND constraint_type = 'P' """ + ), table_name=s_table.name.upper(), - owner=testing.db.dialect.default_schema_name.upper()) + owner=testing.db.dialect.default_schema_name.upper(), + ) reflectedtable = inspect.tables[s_table.name] # make a dictionary of the reflected objects: - reflected = dict([(obj_definition(i), i) for i in - reflectedtable.indexes - | reflectedtable.constraints]) + reflected = dict( + [ + (obj_definition(i), i) + for i in reflectedtable.indexes | reflectedtable.constraints + ] + ) # assert we got primary key constraint and its name, Error # if not in dict - assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', - 'group'), None)].name.upper() \ + assert ( + reflected[ + (PrimaryKeyConstraint, ("id_a", "id_b", "group"), None) + ].name.upper() == primaryconsname.upper() + ) # Error if not in dict - eq_( - reflected[(Index, ('id_b', 'group'), False)].name, - normalind.name - ) - assert (Index, ('id_b', ), True) in reflected - assert (Index, ('col', 'group'), True) in reflected + eq_(reflected[(Index, ("id_b", "group"), False)].name, normalind.name) + assert (Index, ("id_b",), True) in reflected + assert (Index, ("col", "group"), True) in reflected - idx = reflected[(Index, ('id_a', 'id_b', ), False)] - assert idx.dialect_options['oracle']['compress'] == 2 + idx = reflected[(Index, ("id_a", "id_b"), False)] + assert idx.dialect_options["oracle"]["compress"] == 2 - idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)] - assert idx.dialect_options['oracle']['compress'] == 1 + idx = reflected[(Index, ("id_a", "id_b", "col"), False)] + assert idx.dialect_options["oracle"]["compress"] == 1 eq_(len(reflectedtable.constraints), 1) eq_(len(reflectedtable.indexes), 5) class DBLinkReflectionTest(fixtures.TestBase): - __requires__ = 'oracle_test_dblink', - __only_on__ = 'oracle' + __requires__ = ("oracle_test_dblink",) + __only_on__ = "oracle" __backend__ = True @classmethod def setup_class(cls): from sqlalchemy.testing import config - cls.dblink = config.file_config.get('sqla_testing', 'oracle_db_link') + + cls.dblink = config.file_config.get("sqla_testing", "oracle_db_link") # note that the synonym here is still not totally functional # when accessing via a different username as we do with the # multiprocess test suite, so testing here is minimal with testing.db.connect() as conn: - conn.execute("create table test_table " - "(id integer primary key, data varchar2(50))") - conn.execute("create synonym test_table_syn " - "for test_table@%s" % cls.dblink) + conn.execute( + "create table test_table " + "(id integer primary key, data varchar2(50))" + ) + conn.execute( + "create synonym test_table_syn " + "for test_table@%s" % cls.dblink + ) @classmethod def teardown_class(cls): @@ -530,24 +643,29 @@ class DBLinkReflectionTest(fixtures.TestBase): """test the resolution of the synonym/dblink. """ m = MetaData() - t = Table('test_table_syn', m, autoload=True, - autoload_with=testing.db, oracle_resolve_synonyms=True) - eq_(list(t.c.keys()), ['id', 'data']) + t = Table( + "test_table_syn", + m, + autoload=True, + autoload_with=testing.db, + oracle_resolve_synonyms=True, + ) + eq_(list(t.c.keys()), ["id", "data"]) eq_(list(t.primary_key), [t.c.id]) class TypeReflectionTest(fixtures.TestBase): - __only_on__ = 'oracle' + __only_on__ = "oracle" __backend__ = True @testing.provide_metadata def _run_test(self, specs, attributes): - columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)] + columns = [Column("c%i" % (i + 1), t[0]) for i, t in enumerate(specs)] m = self.metadata - Table('oracle_types', m, *columns) + Table("oracle_types", m, *columns) m.create_all() m2 = MetaData(testing.db) - table = Table('oracle_types', m2, autoload=True) + table = Table("oracle_types", m2, autoload=True) for i, (reflected_col, spec) in enumerate(zip(table.c, specs)): expected_spec = spec[1] reflected_type = reflected_col.type @@ -557,28 +675,23 @@ class TypeReflectionTest(fixtures.TestBase): getattr(reflected_type, attr), getattr(expected_spec, attr), "Column %s: Attribute %s value of %s does not " - "match %s for type %s" % ( + "match %s for type %s" + % ( "c%i" % (i + 1), attr, getattr(reflected_type, attr), getattr(expected_spec, attr), - spec[0] - ) + spec[0], + ), ) def test_integer_types(self): - specs = [ - (Integer, INTEGER(),), - (Numeric, INTEGER(),), - ] + specs = [(Integer, INTEGER()), (Numeric, INTEGER())] self._run_test(specs, []) def test_number_types(self): - specs = [ - (Numeric(5, 2), NUMBER(5, 2),), - (NUMBER, NUMBER(),), - ] - self._run_test(specs, ['precision', 'scale']) + specs = [(Numeric(5, 2), NUMBER(5, 2)), (NUMBER, NUMBER())] + self._run_test(specs, ["precision", "scale"]) def test_float_types(self): specs = [ @@ -587,11 +700,11 @@ class TypeReflectionTest(fixtures.TestBase): # (DOUBLE_PRECISION(), oracle.FLOAT(binary_precision=126)), (BINARY_DOUBLE(), BINARY_DOUBLE()), (BINARY_FLOAT(), BINARY_FLOAT()), - (FLOAT(5), FLOAT(),), + (FLOAT(5), FLOAT()), # when binary_precision is supported # (FLOAT(5), oracle.FLOAT(binary_precision=5),), (FLOAT(), FLOAT()), # when binary_precision is supported # (FLOAT(5), oracle.FLOAT(binary_precision=126),), ] - self._run_test(specs, ['precision']) + self._run_test(specs, ["precision"]) |
