summaryrefslogtreecommitdiff
path: root/test/dialect/oracle/test_reflection.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/oracle/test_reflection.py')
-rw-r--r--test/dialect/oracle/test_reflection.py553
1 files changed, 333 insertions, 220 deletions
diff --git a/test/dialect/oracle/test_reflection.py b/test/dialect/oracle/test_reflection.py
index f749e513a..a88703ab0 100644
--- a/test/dialect/oracle/test_reflection.py
+++ b/test/dialect/oracle/test_reflection.py
@@ -6,22 +6,61 @@ from sqlalchemy import exc
from sqlalchemy.sql import table
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
from sqlalchemy import testing
-from sqlalchemy import Integer, Text, LargeBinary, Unicode, UniqueConstraint,\
- Index, MetaData, select, inspect, ForeignKey, String, func, \
- TypeDecorator, bindparam, Numeric, TIMESTAMP, CHAR, text, \
- literal_column, VARCHAR, create_engine, Date, NVARCHAR, \
- ForeignKeyConstraint, Sequence, Float, DateTime, cast, UnicodeText, \
- union, except_, type_coerce, or_, outerjoin, DATE, NCHAR, outparam, \
- PrimaryKeyConstraint, FLOAT, INTEGER
-from sqlalchemy.dialects.oracle.base import NUMBER, BINARY_DOUBLE, \
- BINARY_FLOAT, DOUBLE_PRECISION
+from sqlalchemy import (
+ Integer,
+ Text,
+ LargeBinary,
+ Unicode,
+ UniqueConstraint,
+ Index,
+ MetaData,
+ select,
+ inspect,
+ ForeignKey,
+ String,
+ func,
+ TypeDecorator,
+ bindparam,
+ Numeric,
+ TIMESTAMP,
+ CHAR,
+ text,
+ literal_column,
+ VARCHAR,
+ create_engine,
+ Date,
+ NVARCHAR,
+ ForeignKeyConstraint,
+ Sequence,
+ Float,
+ DateTime,
+ cast,
+ UnicodeText,
+ union,
+ except_,
+ type_coerce,
+ or_,
+ outerjoin,
+ DATE,
+ NCHAR,
+ outparam,
+ PrimaryKeyConstraint,
+ FLOAT,
+ INTEGER,
+)
+from sqlalchemy.dialects.oracle.base import (
+ NUMBER,
+ BINARY_DOUBLE,
+ BINARY_FLOAT,
+ DOUBLE_PRECISION,
+)
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Table, Column
class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL):
- __only_on__ = 'oracle'
+ __only_on__ = "oracle"
__backend__ = True
@classmethod
@@ -30,7 +69,8 @@ class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL):
# don't really know how else to go here unless
# we connect as the other user.
- for stmt in ("""
+ for stmt in (
+ """
create table %(test_schema)s.parent(
id integer primary key,
data varchar2(50)
@@ -60,13 +100,16 @@ create synonym %(test_schema)s.local_table for local_table;
-- so we give it to public. ideas welcome.
grant references on %(test_schema)s.parent to public;
grant references on %(test_schema)s.child to public;
-""" % {"test_schema": testing.config.test_schema}).split(";"):
+"""
+ % {"test_schema": testing.config.test_schema}
+ ).split(";"):
if stmt.strip():
testing.db.execute(stmt)
@classmethod
def teardown_class(cls):
- for stmt in ("""
+ for stmt in (
+ """
drop table %(test_schema)s.child;
drop table %(test_schema)s.parent;
drop table local_table;
@@ -75,7 +118,9 @@ drop synonym %(test_schema)s.ptable;
drop synonym %(test_schema)s_pt;
drop synonym %(test_schema)s.local_table;
-""" % {"test_schema": testing.config.test_schema}).split(";"):
+"""
+ % {"test_schema": testing.config.test_schema}
+ ).split(";"):
if stmt.strip():
testing.db.execute(stmt)
@@ -83,192 +128,235 @@ drop synonym %(test_schema)s.local_table;
def test_create_same_names_explicit_schema(self):
schema = testing.db.dialect.default_schema_name
meta = self.metadata
- parent = Table('parent', meta,
- Column('pid', Integer, primary_key=True),
- schema=schema)
- child = Table('child', meta,
- Column('cid', Integer, primary_key=True),
- Column('pid',
- Integer,
- ForeignKey('%s.parent.pid' % schema)),
- schema=schema)
+ parent = Table(
+ "parent",
+ meta,
+ Column("pid", Integer, primary_key=True),
+ schema=schema,
+ )
+ child = Table(
+ "child",
+ meta,
+ Column("cid", Integer, primary_key=True),
+ Column("pid", Integer, ForeignKey("%s.parent.pid" % schema)),
+ schema=schema,
+ )
meta.create_all()
- parent.insert().execute({'pid': 1})
- child.insert().execute({'cid': 1, 'pid': 1})
+ parent.insert().execute({"pid": 1})
+ child.insert().execute({"cid": 1, "pid": 1})
eq_(child.select().execute().fetchall(), [(1, 1)])
def test_reflect_alt_table_owner_local_synonym(self):
meta = MetaData(testing.db)
- parent = Table('%s_pt' % testing.config.test_schema,
- meta,
- autoload=True,
- oracle_resolve_synonyms=True)
- self.assert_compile(parent.select(),
- "SELECT %(test_schema)s_pt.id, "
- "%(test_schema)s_pt.data FROM %(test_schema)s_pt"
- % {"test_schema": testing.config.test_schema})
+ parent = Table(
+ "%s_pt" % testing.config.test_schema,
+ meta,
+ autoload=True,
+ oracle_resolve_synonyms=True,
+ )
+ self.assert_compile(
+ parent.select(),
+ "SELECT %(test_schema)s_pt.id, "
+ "%(test_schema)s_pt.data FROM %(test_schema)s_pt"
+ % {"test_schema": testing.config.test_schema},
+ )
select([parent]).execute().fetchall()
def test_reflect_alt_synonym_owner_local_table(self):
meta = MetaData(testing.db)
parent = Table(
- 'local_table', meta, autoload=True,
- oracle_resolve_synonyms=True, schema=testing.config.test_schema)
+ "local_table",
+ meta,
+ autoload=True,
+ oracle_resolve_synonyms=True,
+ schema=testing.config.test_schema,
+ )
self.assert_compile(
parent.select(),
"SELECT %(test_schema)s.local_table.id, "
"%(test_schema)s.local_table.data "
- "FROM %(test_schema)s.local_table" %
- {"test_schema": testing.config.test_schema}
+ "FROM %(test_schema)s.local_table"
+ % {"test_schema": testing.config.test_schema},
)
select([parent]).execute().fetchall()
@testing.provide_metadata
def test_create_same_names_implicit_schema(self):
meta = self.metadata
- parent = Table('parent',
- meta,
- Column('pid', Integer, primary_key=True))
- child = Table('child', meta,
- Column('cid', Integer, primary_key=True),
- Column('pid', Integer, ForeignKey('parent.pid')))
+ parent = Table(
+ "parent", meta, Column("pid", Integer, primary_key=True)
+ )
+ child = Table(
+ "child",
+ meta,
+ Column("cid", Integer, primary_key=True),
+ Column("pid", Integer, ForeignKey("parent.pid")),
+ )
meta.create_all()
- parent.insert().execute({'pid': 1})
- child.insert().execute({'cid': 1, 'pid': 1})
+ parent.insert().execute({"pid": 1})
+ child.insert().execute({"cid": 1, "pid": 1})
eq_(child.select().execute().fetchall(), [(1, 1)])
def test_reflect_alt_owner_explicit(self):
meta = MetaData(testing.db)
parent = Table(
- 'parent', meta, autoload=True,
- schema=testing.config.test_schema)
+ "parent", meta, autoload=True, schema=testing.config.test_schema
+ )
child = Table(
- 'child', meta, autoload=True,
- schema=testing.config.test_schema)
+ "child", meta, autoload=True, schema=testing.config.test_schema
+ )
self.assert_compile(
parent.join(child),
"%(test_schema)s.parent JOIN %(test_schema)s.child ON "
- "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id" % {
- "test_schema": testing.config.test_schema
- })
- select([parent, child]).\
- select_from(parent.join(child)).\
- execute().fetchall()
+ "%(test_schema)s.parent.id = %(test_schema)s.child.parent_id"
+ % {"test_schema": testing.config.test_schema},
+ )
+ select([parent, child]).select_from(
+ parent.join(child)
+ ).execute().fetchall()
def test_reflect_local_to_remote(self):
testing.db.execute(
- 'CREATE TABLE localtable (id INTEGER '
- 'PRIMARY KEY, parent_id INTEGER REFERENCES '
- '%(test_schema)s.parent(id))' % {
- "test_schema": testing.config.test_schema})
+ "CREATE TABLE localtable (id INTEGER "
+ "PRIMARY KEY, parent_id INTEGER REFERENCES "
+ "%(test_schema)s.parent(id))"
+ % {"test_schema": testing.config.test_schema}
+ )
try:
meta = MetaData(testing.db)
- lcl = Table('localtable', meta, autoload=True)
- parent = meta.tables['%s.parent' % testing.config.test_schema]
- self.assert_compile(parent.join(lcl),
- '%(test_schema)s.parent JOIN localtable ON '
- '%(test_schema)s.parent.id = '
- 'localtable.parent_id' % {
- "test_schema": testing.config.test_schema}
- )
- select([parent,
- lcl]).select_from(parent.join(lcl)).execute().fetchall()
+ lcl = Table("localtable", meta, autoload=True)
+ parent = meta.tables["%s.parent" % testing.config.test_schema]
+ self.assert_compile(
+ parent.join(lcl),
+ "%(test_schema)s.parent JOIN localtable ON "
+ "%(test_schema)s.parent.id = "
+ "localtable.parent_id"
+ % {"test_schema": testing.config.test_schema},
+ )
+ select([parent, lcl]).select_from(
+ parent.join(lcl)
+ ).execute().fetchall()
finally:
- testing.db.execute('DROP TABLE localtable')
+ testing.db.execute("DROP TABLE localtable")
def test_reflect_alt_owner_implicit(self):
meta = MetaData(testing.db)
parent = Table(
- 'parent', meta, autoload=True,
- schema=testing.config.test_schema)
+ "parent", meta, autoload=True, schema=testing.config.test_schema
+ )
child = Table(
- 'child', meta, autoload=True,
- schema=testing.config.test_schema)
+ "child", meta, autoload=True, schema=testing.config.test_schema
+ )
self.assert_compile(
parent.join(child),
- '%(test_schema)s.parent JOIN %(test_schema)s.child '
- 'ON %(test_schema)s.parent.id = '
- '%(test_schema)s.child.parent_id' % {
- "test_schema": testing.config.test_schema})
- select([parent,
- child]).select_from(parent.join(child)).execute().fetchall()
+ "%(test_schema)s.parent JOIN %(test_schema)s.child "
+ "ON %(test_schema)s.parent.id = "
+ "%(test_schema)s.child.parent_id"
+ % {"test_schema": testing.config.test_schema},
+ )
+ select([parent, child]).select_from(
+ parent.join(child)
+ ).execute().fetchall()
def test_reflect_alt_owner_synonyms(self):
- testing.db.execute('CREATE TABLE localtable (id INTEGER '
- 'PRIMARY KEY, parent_id INTEGER REFERENCES '
- '%s.ptable(id))' % testing.config.test_schema)
+ testing.db.execute(
+ "CREATE TABLE localtable (id INTEGER "
+ "PRIMARY KEY, parent_id INTEGER REFERENCES "
+ "%s.ptable(id))" % testing.config.test_schema
+ )
try:
meta = MetaData(testing.db)
- lcl = Table('localtable', meta, autoload=True,
- oracle_resolve_synonyms=True)
- parent = meta.tables['%s.ptable' % testing.config.test_schema]
+ lcl = Table(
+ "localtable", meta, autoload=True, oracle_resolve_synonyms=True
+ )
+ parent = meta.tables["%s.ptable" % testing.config.test_schema]
self.assert_compile(
parent.join(lcl),
- '%(test_schema)s.ptable JOIN localtable ON '
- '%(test_schema)s.ptable.id = '
- 'localtable.parent_id' % {
- "test_schema": testing.config.test_schema})
- select([parent,
- lcl]).select_from(parent.join(lcl)).execute().fetchall()
+ "%(test_schema)s.ptable JOIN localtable ON "
+ "%(test_schema)s.ptable.id = "
+ "localtable.parent_id"
+ % {"test_schema": testing.config.test_schema},
+ )
+ select([parent, lcl]).select_from(
+ parent.join(lcl)
+ ).execute().fetchall()
finally:
- testing.db.execute('DROP TABLE localtable')
+ testing.db.execute("DROP TABLE localtable")
def test_reflect_remote_synonyms(self):
meta = MetaData(testing.db)
- parent = Table('ptable', meta, autoload=True,
- schema=testing.config.test_schema,
- oracle_resolve_synonyms=True)
- child = Table('ctable', meta, autoload=True,
- schema=testing.config.test_schema,
- oracle_resolve_synonyms=True)
+ parent = Table(
+ "ptable",
+ meta,
+ autoload=True,
+ schema=testing.config.test_schema,
+ oracle_resolve_synonyms=True,
+ )
+ child = Table(
+ "ctable",
+ meta,
+ autoload=True,
+ schema=testing.config.test_schema,
+ oracle_resolve_synonyms=True,
+ )
self.assert_compile(
parent.join(child),
- '%(test_schema)s.ptable JOIN '
- '%(test_schema)s.ctable '
- 'ON %(test_schema)s.ptable.id = '
- '%(test_schema)s.ctable.parent_id' % {
- "test_schema": testing.config.test_schema})
- select([parent,
- child]).select_from(parent.join(child)).execute().fetchall()
+ "%(test_schema)s.ptable JOIN "
+ "%(test_schema)s.ctable "
+ "ON %(test_schema)s.ptable.id = "
+ "%(test_schema)s.ctable.parent_id"
+ % {"test_schema": testing.config.test_schema},
+ )
+ select([parent, child]).select_from(
+ parent.join(child)
+ ).execute().fetchall()
class ConstraintTest(fixtures.TablesTest):
- __only_on__ = 'oracle'
+ __only_on__ = "oracle"
__backend__ = True
run_deletes = None
@classmethod
def define_tables(cls, metadata):
- Table('foo', metadata, Column('id', Integer, primary_key=True))
+ Table("foo", metadata, Column("id", Integer, primary_key=True))
def test_oracle_has_no_on_update_cascade(self):
- bar = Table('bar', self.metadata,
- Column('id', Integer, primary_key=True),
- Column('foo_id',
- Integer,
- ForeignKey('foo.id', onupdate='CASCADE')))
+ bar = Table(
+ "bar",
+ self.metadata,
+ Column("id", Integer, primary_key=True),
+ Column(
+ "foo_id", Integer, ForeignKey("foo.id", onupdate="CASCADE")
+ ),
+ )
assert_raises(exc.SAWarning, bar.create)
- bat = Table('bat', self.metadata,
- Column('id', Integer, primary_key=True),
- Column('foo_id', Integer),
- ForeignKeyConstraint(['foo_id'], ['foo.id'],
- onupdate='CASCADE'))
+ bat = Table(
+ "bat",
+ self.metadata,
+ Column("id", Integer, primary_key=True),
+ Column("foo_id", Integer),
+ ForeignKeyConstraint(["foo_id"], ["foo.id"], onupdate="CASCADE"),
+ )
assert_raises(exc.SAWarning, bat.create)
def test_reflect_check_include_all(self):
insp = inspect(testing.db)
- eq_(insp.get_check_constraints('foo'), [])
+ eq_(insp.get_check_constraints("foo"), [])
eq_(
- [rec['sqltext']
- for rec in insp.get_check_constraints('foo', include_all=True)],
- ['"ID" IS NOT NULL'])
+ [
+ rec["sqltext"]
+ for rec in insp.get_check_constraints("foo", include_all=True)
+ ],
+ ['"ID" IS NOT NULL'],
+ )
class SystemTableTablenamesTest(fixtures.TestBase):
- __only_on__ = 'oracle'
+ __only_on__ = "oracle"
__backend__ = True
def setup(self):
@@ -287,23 +375,20 @@ class SystemTableTablenamesTest(fixtures.TestBase):
def test_table_names_no_system(self):
insp = inspect(testing.db)
- eq_(
- insp.get_table_names(), ["my_table"]
- )
+ eq_(insp.get_table_names(), ["my_table"])
def test_temp_table_names_no_system(self):
insp = inspect(testing.db)
- eq_(
- insp.get_temp_table_names(), ["my_temp_table"]
- )
+ eq_(insp.get_temp_table_names(), ["my_temp_table"])
def test_table_names_w_system(self):
engine = testing_engine(options={"exclude_tablespaces": ["FOO"]})
insp = inspect(engine)
eq_(
- set(insp.get_table_names()).intersection(["my_table",
- "foo_table"]),
- set(["my_table", "foo_table"])
+ set(insp.get_table_names()).intersection(
+ ["my_table", "foo_table"]
+ ),
+ set(["my_table", "foo_table"]),
)
@@ -311,11 +396,12 @@ class DontReflectIOTTest(fixtures.TestBase):
"""test that index overflow tables aren't included in
table_names."""
- __only_on__ = 'oracle'
+ __only_on__ = "oracle"
__backend__ = True
def setup(self):
- testing.db.execute("""
+ testing.db.execute(
+ """
CREATE TABLE admin_docindex(
token char(20),
doc_id NUMBER,
@@ -326,7 +412,8 @@ class DontReflectIOTTest(fixtures.TestBase):
TABLESPACE users
PCTTHRESHOLD 20
OVERFLOW TABLESPACE users
- """)
+ """
+ )
def teardown(self):
testing.db.execute("drop table admin_docindex")
@@ -334,35 +421,37 @@ class DontReflectIOTTest(fixtures.TestBase):
def test_reflect_all(self):
m = MetaData(testing.db)
m.reflect()
- eq_(
- set(t.name for t in m.tables.values()),
- set(['admin_docindex'])
- )
+ eq_(set(t.name for t in m.tables.values()), set(["admin_docindex"]))
class UnsupportedIndexReflectTest(fixtures.TestBase):
- __only_on__ = 'oracle'
+ __only_on__ = "oracle"
__backend__ = True
@testing.emits_warning("No column names")
@testing.provide_metadata
def test_reflect_functional_index(self):
metadata = self.metadata
- Table('test_index_reflect', metadata,
- Column('data', String(20), primary_key=True))
+ Table(
+ "test_index_reflect",
+ metadata,
+ Column("data", String(20), primary_key=True),
+ )
metadata.create_all()
- testing.db.execute('CREATE INDEX DATA_IDX ON '
- 'TEST_INDEX_REFLECT (UPPER(DATA))')
+ testing.db.execute(
+ "CREATE INDEX DATA_IDX ON " "TEST_INDEX_REFLECT (UPPER(DATA))"
+ )
m2 = MetaData(testing.db)
- Table('test_index_reflect', m2, autoload=True)
+ Table("test_index_reflect", m2, autoload=True)
def all_tables_compression_missing():
try:
- testing.db.execute('SELECT compression FROM all_tables')
+ testing.db.execute("SELECT compression FROM all_tables")
if "Enterprise Edition" not in testing.db.scalar(
- "select * from v$version"):
+ "select * from v$version"
+ ):
return True
return False
except Exception:
@@ -371,9 +460,10 @@ def all_tables_compression_missing():
def all_tables_compress_for_missing():
try:
- testing.db.execute('SELECT compress_for FROM all_tables')
+ testing.db.execute("SELECT compress_for FROM all_tables")
if "Enterprise Edition" not in testing.db.scalar(
- "select * from v$version"):
+ "select * from v$version"
+ ):
return True
return False
except Exception:
@@ -381,7 +471,7 @@ def all_tables_compress_for_missing():
class TableReflectionTest(fixtures.TestBase):
- __only_on__ = 'oracle'
+ __only_on__ = "oracle"
__backend__ = True
@testing.provide_metadata
@@ -389,35 +479,41 @@ class TableReflectionTest(fixtures.TestBase):
def test_reflect_basic_compression(self):
metadata = self.metadata
- tbl = Table('test_compress', metadata,
- Column('data', Integer, primary_key=True),
- oracle_compress=True)
+ tbl = Table(
+ "test_compress",
+ metadata,
+ Column("data", Integer, primary_key=True),
+ oracle_compress=True,
+ )
metadata.create_all()
m2 = MetaData(testing.db)
- tbl = Table('test_compress', m2, autoload=True)
+ tbl = Table("test_compress", m2, autoload=True)
# Don't hardcode the exact value, but it must be non-empty
- assert tbl.dialect_options['oracle']['compress']
+ assert tbl.dialect_options["oracle"]["compress"]
@testing.provide_metadata
@testing.fails_if(all_tables_compress_for_missing)
def test_reflect_oltp_compression(self):
metadata = self.metadata
- tbl = Table('test_compress', metadata,
- Column('data', Integer, primary_key=True),
- oracle_compress="OLTP")
+ tbl = Table(
+ "test_compress",
+ metadata,
+ Column("data", Integer, primary_key=True),
+ oracle_compress="OLTP",
+ )
metadata.create_all()
m2 = MetaData(testing.db)
- tbl = Table('test_compress', m2, autoload=True)
- assert tbl.dialect_options['oracle']['compress'] == "OLTP"
+ tbl = Table("test_compress", m2, autoload=True)
+ assert tbl.dialect_options["oracle"]["compress"] == "OLTP"
class RoundTripIndexTest(fixtures.TestBase):
- __only_on__ = 'oracle'
+ __only_on__ = "oracle"
__backend__ = True
@testing.provide_metadata
@@ -425,22 +521,27 @@ class RoundTripIndexTest(fixtures.TestBase):
metadata = self.metadata
s_table = Table(
- "sometable", metadata,
+ "sometable",
+ metadata,
Column("id_a", Unicode(255), primary_key=True),
- Column("id_b",
- Unicode(255),
- primary_key=True,
- unique=True),
+ Column("id_b", Unicode(255), primary_key=True, unique=True),
Column("group", Unicode(255), primary_key=True),
Column("col", Unicode(255)),
- UniqueConstraint('col', 'group'))
+ UniqueConstraint("col", "group"),
+ )
# "group" is a keyword, so lower case
- normalind = Index('tableind', s_table.c.id_b, s_table.c.group)
- Index('compress1', s_table.c.id_a, s_table.c.id_b,
- oracle_compress=True)
- Index('compress2', s_table.c.id_a, s_table.c.id_b, s_table.c.col,
- oracle_compress=1)
+ normalind = Index("tableind", s_table.c.id_b, s_table.c.group)
+ Index(
+ "compress1", s_table.c.id_a, s_table.c.id_b, oracle_compress=True
+ )
+ Index(
+ "compress2",
+ s_table.c.id_a,
+ s_table.c.id_b,
+ s_table.c.col,
+ oracle_compress=1,
+ )
metadata.create_all()
mirror = MetaData(testing.db)
@@ -452,9 +553,11 @@ class RoundTripIndexTest(fixtures.TestBase):
inspect.reflect()
def obj_definition(obj):
- return (obj.__class__,
- tuple([c.name for c in obj.columns]),
- getattr(obj, 'unique', None))
+ return (
+ obj.__class__,
+ tuple([c.name for c in obj.columns]),
+ getattr(obj, "unique", None),
+ )
# find what the primary k constraint name should be
primaryconsname = testing.db.scalar(
@@ -463,62 +566,72 @@ class RoundTripIndexTest(fixtures.TestBase):
FROM all_constraints
WHERE table_name = :table_name
AND owner = :owner
- AND constraint_type = 'P' """),
+ AND constraint_type = 'P' """
+ ),
table_name=s_table.name.upper(),
- owner=testing.db.dialect.default_schema_name.upper())
+ owner=testing.db.dialect.default_schema_name.upper(),
+ )
reflectedtable = inspect.tables[s_table.name]
# make a dictionary of the reflected objects:
- reflected = dict([(obj_definition(i), i) for i in
- reflectedtable.indexes
- | reflectedtable.constraints])
+ reflected = dict(
+ [
+ (obj_definition(i), i)
+ for i in reflectedtable.indexes | reflectedtable.constraints
+ ]
+ )
# assert we got primary key constraint and its name, Error
# if not in dict
- assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b',
- 'group'), None)].name.upper() \
+ assert (
+ reflected[
+ (PrimaryKeyConstraint, ("id_a", "id_b", "group"), None)
+ ].name.upper()
== primaryconsname.upper()
+ )
# Error if not in dict
- eq_(
- reflected[(Index, ('id_b', 'group'), False)].name,
- normalind.name
- )
- assert (Index, ('id_b', ), True) in reflected
- assert (Index, ('col', 'group'), True) in reflected
+ eq_(reflected[(Index, ("id_b", "group"), False)].name, normalind.name)
+ assert (Index, ("id_b",), True) in reflected
+ assert (Index, ("col", "group"), True) in reflected
- idx = reflected[(Index, ('id_a', 'id_b', ), False)]
- assert idx.dialect_options['oracle']['compress'] == 2
+ idx = reflected[(Index, ("id_a", "id_b"), False)]
+ assert idx.dialect_options["oracle"]["compress"] == 2
- idx = reflected[(Index, ('id_a', 'id_b', 'col', ), False)]
- assert idx.dialect_options['oracle']['compress'] == 1
+ idx = reflected[(Index, ("id_a", "id_b", "col"), False)]
+ assert idx.dialect_options["oracle"]["compress"] == 1
eq_(len(reflectedtable.constraints), 1)
eq_(len(reflectedtable.indexes), 5)
class DBLinkReflectionTest(fixtures.TestBase):
- __requires__ = 'oracle_test_dblink',
- __only_on__ = 'oracle'
+ __requires__ = ("oracle_test_dblink",)
+ __only_on__ = "oracle"
__backend__ = True
@classmethod
def setup_class(cls):
from sqlalchemy.testing import config
- cls.dblink = config.file_config.get('sqla_testing', 'oracle_db_link')
+
+ cls.dblink = config.file_config.get("sqla_testing", "oracle_db_link")
# note that the synonym here is still not totally functional
# when accessing via a different username as we do with the
# multiprocess test suite, so testing here is minimal
with testing.db.connect() as conn:
- conn.execute("create table test_table "
- "(id integer primary key, data varchar2(50))")
- conn.execute("create synonym test_table_syn "
- "for test_table@%s" % cls.dblink)
+ conn.execute(
+ "create table test_table "
+ "(id integer primary key, data varchar2(50))"
+ )
+ conn.execute(
+ "create synonym test_table_syn "
+ "for test_table@%s" % cls.dblink
+ )
@classmethod
def teardown_class(cls):
@@ -530,24 +643,29 @@ class DBLinkReflectionTest(fixtures.TestBase):
"""test the resolution of the synonym/dblink. """
m = MetaData()
- t = Table('test_table_syn', m, autoload=True,
- autoload_with=testing.db, oracle_resolve_synonyms=True)
- eq_(list(t.c.keys()), ['id', 'data'])
+ t = Table(
+ "test_table_syn",
+ m,
+ autoload=True,
+ autoload_with=testing.db,
+ oracle_resolve_synonyms=True,
+ )
+ eq_(list(t.c.keys()), ["id", "data"])
eq_(list(t.primary_key), [t.c.id])
class TypeReflectionTest(fixtures.TestBase):
- __only_on__ = 'oracle'
+ __only_on__ = "oracle"
__backend__ = True
@testing.provide_metadata
def _run_test(self, specs, attributes):
- columns = [Column('c%i' % (i + 1), t[0]) for i, t in enumerate(specs)]
+ columns = [Column("c%i" % (i + 1), t[0]) for i, t in enumerate(specs)]
m = self.metadata
- Table('oracle_types', m, *columns)
+ Table("oracle_types", m, *columns)
m.create_all()
m2 = MetaData(testing.db)
- table = Table('oracle_types', m2, autoload=True)
+ table = Table("oracle_types", m2, autoload=True)
for i, (reflected_col, spec) in enumerate(zip(table.c, specs)):
expected_spec = spec[1]
reflected_type = reflected_col.type
@@ -557,28 +675,23 @@ class TypeReflectionTest(fixtures.TestBase):
getattr(reflected_type, attr),
getattr(expected_spec, attr),
"Column %s: Attribute %s value of %s does not "
- "match %s for type %s" % (
+ "match %s for type %s"
+ % (
"c%i" % (i + 1),
attr,
getattr(reflected_type, attr),
getattr(expected_spec, attr),
- spec[0]
- )
+ spec[0],
+ ),
)
def test_integer_types(self):
- specs = [
- (Integer, INTEGER(),),
- (Numeric, INTEGER(),),
- ]
+ specs = [(Integer, INTEGER()), (Numeric, INTEGER())]
self._run_test(specs, [])
def test_number_types(self):
- specs = [
- (Numeric(5, 2), NUMBER(5, 2),),
- (NUMBER, NUMBER(),),
- ]
- self._run_test(specs, ['precision', 'scale'])
+ specs = [(Numeric(5, 2), NUMBER(5, 2)), (NUMBER, NUMBER())]
+ self._run_test(specs, ["precision", "scale"])
def test_float_types(self):
specs = [
@@ -587,11 +700,11 @@ class TypeReflectionTest(fixtures.TestBase):
# (DOUBLE_PRECISION(), oracle.FLOAT(binary_precision=126)),
(BINARY_DOUBLE(), BINARY_DOUBLE()),
(BINARY_FLOAT(), BINARY_FLOAT()),
- (FLOAT(5), FLOAT(),),
+ (FLOAT(5), FLOAT()),
# when binary_precision is supported
# (FLOAT(5), oracle.FLOAT(binary_precision=5),),
(FLOAT(), FLOAT()),
# when binary_precision is supported
# (FLOAT(5), oracle.FLOAT(binary_precision=126),),
]
- self._run_test(specs, ['precision'])
+ self._run_test(specs, ["precision"])