diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-11-22 17:48:55 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2013-11-22 17:52:08 -0500 |
commit | bad8be3bde5797dd694a88a2ef7304d1691577b2 (patch) | |
tree | bf89d8b7fe1494635e2784f9fe054a74d9459302 | |
parent | 038f93a49bbdab532bf822d00df23c6346698a47 (diff) | |
download | sqlalchemy-bad8be3bde5797dd694a88a2ef7304d1691577b2.tar.gz |
- cleanup
-rw-r--r-- | test/dialect/test_oracle.py | 562 |
1 files changed, 288 insertions, 274 deletions
diff --git a/test/dialect/test_oracle.py b/test/dialect/test_oracle.py index 953dfe4e3..41996682d 100644 --- a/test/dialect/test_oracle.py +++ b/test/dialect/test_oracle.py @@ -11,12 +11,13 @@ from sqlalchemy.testing import assert_raises, assert_raises_message from sqlalchemy.testing.engines import testing_engine from sqlalchemy.dialects.oracle import cx_oracle, base as oracle from sqlalchemy.engine import default +from sqlalchemy.util import u import decimal from sqlalchemy.testing.schema import Table, Column import datetime import os from sqlalchemy import sql - +from sqlalchemy.testing.mock import Mock class OutParamTest(fixtures.TestBase, AssertsExecutionResults): __only_on__ = 'oracle+cx_oracle' @@ -24,31 +25,31 @@ class OutParamTest(fixtures.TestBase, AssertsExecutionResults): @classmethod def setup_class(cls): testing.db.execute(""" -create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT number, z_out OUT varchar) IS - retval number; - begin - retval := 6; - x_out := 10; - y_out := x_in * 15; - z_out := NULL; - end; + create or replace procedure foo(x_in IN number, x_out OUT number, + y_out OUT number, z_out OUT varchar) IS + retval number; + begin + retval := 6; + x_out := 10; + y_out := x_in * 15; + z_out := NULL; + end; """) def test_out_params(self): - result = \ - testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' + result = testing.db.execute(text('begin foo(:x_in, :x_out, :y_out, ' ':z_out); end;', bindparams=[bindparam('x_in', Float), outparam('x_out', Integer), outparam('y_out', Float), outparam('z_out', String)]), x_in=5) - eq_(result.out_parameters, {'x_out': 10, 'y_out': 75, 'z_out' - : None}) + eq_(result.out_parameters, + {'x_out': 10, 'y_out': 75, 'z_out': None}) assert isinstance(result.out_parameters['x_out'], int) @classmethod def teardown_class(cls): - testing.db.execute("DROP PROCEDURE foo") + testing.db.execute("DROP PROCEDURE foo") class CXOracleArgsTest(fixtures.TestBase): __only_on__ = 'oracle+cx_oracle' @@ -90,7 +91,7 @@ class QuotedBindRoundTripTest(fixtures.TestBase): metadata.create_all() table.insert().execute( - {"option":1, "plain":1, "union":1} + {"option": 1, "plain": 1, "union": 1} ) eq_( testing.db.execute(table.select()).first(), @@ -104,7 +105,6 @@ class QuotedBindRoundTripTest(fixtures.TestBase): class CompileTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = oracle.dialect() def test_true_false(self): @@ -248,7 +248,7 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_use_binds_for_limits_enabled(self): t = table('sometable', column('col1'), column('col2')) - dialect = oracle.OracleDialect(use_binds_for_limits = True) + dialect = oracle.OracleDialect(use_binds_for_limits=True) self.assert_compile(select([t]).limit(10), "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, " @@ -346,8 +346,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) query = select([table1, table2], or_(table1.c.name == 'fred', - table1.c.myid == 10, table2.c.othername != 'jack' - , 'EXISTS (select yay from foo where boo = lar)' + table1.c.myid == 10, table2.c.othername != 'jack', + 'EXISTS (select yay from foo where boo = lar)' ), from_obj=[outerjoin(table1, table2, table1.c.myid == table2.c.otherid)]) self.assert_compile(query, @@ -433,8 +433,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): 'mytable.description AS description FROM ' 'mytable LEFT OUTER JOIN myothertable ON ' 'mytable.myid = myothertable.otherid) ' - 'anon_1 ON thirdtable.userid = anon_1.myid' - , dialect=oracle.dialect(use_ansi=True)) + 'anon_1 ON thirdtable.userid = anon_1.myid', + dialect=oracle.dialect(use_ansi=True)) self.assert_compile(q, 'SELECT thirdtable.userid, ' @@ -514,7 +514,8 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): def test_returning_insert_labeled(self): t1 = table('t1', column('c1'), column('c2'), column('c3')) self.assert_compile( - t1.insert().values(c1=1).returning(t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), + t1.insert().values(c1=1).returning( + t1.c.c2.label('c2_l'), t1.c.c3.label('c3_l')), "INSERT INTO t1 (c1) VALUES (:c1) RETURNING " "t1.c2, t1.c3 INTO :ret_0, :ret_1" ) @@ -564,32 +565,40 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): ) class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): - __only_on__ = 'oracle' - def test_ora8_flags(self): - def server_version_info(self): - return (8, 2, 5) + def _dialect(self, server_version, **kw): + def server_version_info(conn): + return server_version - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) + dialect = oracle.dialect( + dbapi=Mock(version="0.0.0", paramstyle="named"), + **kw) dialect._get_server_version_info = server_version_info + dialect._check_unicode_returns = Mock() + dialect._check_unicode_description = Mock() + dialect._get_default_schema_name = Mock() + return dialect + + + def test_ora8_flags(self): + dialect = self._dialect((8, 2, 5)) # before connect, assume modern DB assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - dialect.initialize(testing.db.connect()) + dialect.initialize(Mock()) assert not dialect.implicit_returning assert not dialect._supports_char_length assert not dialect._supports_nchar assert not dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50)",dialect=dialect) - self.assert_compile(Unicode(50),"VARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"CLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(Unicode(50), "VARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "CLOB", dialect=dialect) - dialect = oracle.dialect(implicit_returning=True, - dbapi=testing.db.dialect.dbapi) - dialect._get_server_version_info = server_version_info + + dialect = self._dialect((8, 2, 5), implicit_returning=True) dialect.initialize(testing.db.connect()) assert dialect.implicit_returning @@ -597,26 +606,25 @@ class CompatFlagsTest(fixtures.TestBase, AssertsCompiledSQL): def test_default_flags(self): """test with no initialization or server version info""" - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) + dialect = self._dialect(None) + assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect) - self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) def test_ora10_flags(self): - def server_version_info(self): - return (10, 2, 5) - dialect = oracle.dialect(dbapi=testing.db.dialect.dbapi) - dialect._get_server_version_info = server_version_info - dialect.initialize(testing.db.connect()) + dialect = self._dialect((10, 2, 5)) + + dialect.initialize(Mock()) assert dialect._supports_char_length assert dialect._supports_nchar assert dialect.use_ansi - self.assert_compile(String(50),"VARCHAR2(50 CHAR)",dialect=dialect) - self.assert_compile(Unicode(50),"NVARCHAR2(50)",dialect=dialect) - self.assert_compile(UnicodeText(),"NCLOB",dialect=dialect) + self.assert_compile(String(50), "VARCHAR2(50 CHAR)", dialect=dialect) + self.assert_compile(Unicode(50), "NVARCHAR2(50)", dialect=dialect) + self.assert_compile(UnicodeText(), "NCLOB", dialect=dialect) class MultiSchemaTest(fixtures.TestBase, AssertsCompiledSQL): @@ -677,9 +685,10 @@ drop synonym test_schema.local_table; if stmt.strip(): testing.db.execute(stmt) + @testing.provide_metadata def test_create_same_names_explicit_schema(self): schema = testing.db.dialect.default_schema_name - meta = MetaData(testing.db) + meta = self.metadata parent = Table('parent', meta, Column('pid', Integer, primary_key=True), schema=schema @@ -690,16 +699,14 @@ drop synonym test_schema.local_table; schema=schema ) meta.create_all() - try: - parent.insert().execute({'pid':1}) - child.insert().execute({'cid':1, 'pid':1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - finally: - meta.drop_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_table_owner_local_synonym(self): meta = MetaData(testing.db) - parent = Table('test_schema_ptable', meta, autoload=True, oracle_resolve_synonyms=True) + parent = Table('test_schema_ptable', meta, autoload=True, + oracle_resolve_synonyms=True) self.assert_compile(parent.select(), "SELECT test_schema_ptable.id, " "test_schema_ptable.data FROM test_schema_ptable") @@ -707,14 +714,16 @@ drop synonym test_schema.local_table; def test_reflect_alt_synonym_owner_local_table(self): meta = MetaData(testing.db) - parent = Table('local_table', meta, autoload=True, oracle_resolve_synonyms=True, schema="test_schema") + parent = Table('local_table', meta, autoload=True, + oracle_resolve_synonyms=True, schema="test_schema") self.assert_compile(parent.select(), "SELECT test_schema.local_table.id, " "test_schema.local_table.data FROM test_schema.local_table") select([parent]).execute().fetchall() + @testing.provide_metadata def test_create_same_names_implicit_schema(self): - meta = MetaData(testing.db) + meta = self.metadata parent = Table('parent', meta, Column('pid', Integer, primary_key=True), ) @@ -723,12 +732,9 @@ drop synonym test_schema.local_table; Column('pid', Integer, ForeignKey('parent.pid')), ) meta.create_all() - try: - parent.insert().execute({'pid':1}) - child.insert().execute({'cid':1, 'pid':1}) - eq_(child.select().execute().fetchall(), [(1, 1)]) - finally: - meta.drop_all() + parent.insert().execute({'pid': 1}) + child.insert().execute({'cid': 1, 'pid': 1}) + eq_(child.select().execute().fetchall(), [(1, 1)]) def test_reflect_alt_owner_explicit(self): @@ -918,10 +924,16 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): dbapi = FakeDBAPI() b = bindparam("foo", "hello world!") - assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) - b = bindparam("foo", u"hello world!") - assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING' + b = bindparam("foo", "hello world!") + eq_( + b.type.dialect_impl(dialect).get_dbapi_type(dbapi), + 'STRING' + ) def test_long(self): self.assert_compile(oracle.LONG(), "LONG") @@ -950,14 +962,14 @@ class DialectTypesTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile(oracle.RAW(35), "RAW(35)") def test_char_length(self): - self.assert_compile(VARCHAR(50),"VARCHAR(50 CHAR)") + self.assert_compile(VARCHAR(50), "VARCHAR(50 CHAR)") oracle8dialect = oracle.dialect() oracle8dialect.server_version_info = (8, 0) - self.assert_compile(VARCHAR(50),"VARCHAR(50)",dialect=oracle8dialect) + self.assert_compile(VARCHAR(50), "VARCHAR(50)", dialect=oracle8dialect) - self.assert_compile(NVARCHAR(50),"NVARCHAR2(50)") - self.assert_compile(CHAR(50),"CHAR(50)") + self.assert_compile(NVARCHAR(50), "NVARCHAR2(50)") + self.assert_compile(CHAR(50), "CHAR(50)") def test_varchar_types(self): dialect = oracle.dialect() @@ -1005,36 +1017,36 @@ class TypesTest(fixtures.TestBase): dict(id=3, data="value 3") ) - eq_(t.select().where(t.c.data=='value 2').execute().fetchall(), + eq_( + t.select().where(t.c.data == 'value 2').execute().fetchall(), [(2, 'value 2 ')] - ) + ) m2 = MetaData(testing.db) t2 = Table('t1', m2, autoload=True) assert type(t2.c.data.type) is CHAR - eq_(t2.select().where(t2.c.data=='value 2').execute().fetchall(), + eq_( + t2.select().where(t2.c.data == 'value 2').execute().fetchall(), [(2, 'value 2 ')] - ) + ) finally: t.drop() @testing.requires.returning + @testing.provide_metadata def test_int_not_float(self): - m = MetaData(testing.db) + m = self.metadata t1 = Table('t1', m, Column('foo', Integer)) t1.create() - try: - r = t1.insert().values(foo=5).returning(t1.c.foo).execute() - x = r.scalar() - assert x == 5 - assert isinstance(x, int) - - x = t1.select().scalar() - assert x == 5 - assert isinstance(x, int) - finally: - t1.drop() + r = t1.insert().values(foo=5).returning(t1.c.foo).execute() + x = r.scalar() + assert x == 5 + assert isinstance(x, int) + + x = t1.select().scalar() + assert x == 5 + assert isinstance(x, int) @testing.provide_metadata def test_rowid(self): @@ -1051,7 +1063,7 @@ class TypesTest(fixtures.TestBase): # the ROWID type is not really needed here, # as cx_oracle just treats it as a string, # but we want to make sure the ROWID works... - rowid_col= column('rowid', oracle.ROWID) + rowid_col = column('rowid', oracle.ROWID) s3 = select([t.c.x, rowid_col]).\ where(rowid_col == cast(rowid, oracle.ROWID)) eq_(s3.select().execute().fetchall(), @@ -1077,8 +1089,9 @@ class TypesTest(fixtures.TestBase): eq_(row['day_interval'], datetime.timedelta(days=35, seconds=5743)) + @testing.provide_metadata def test_numerics(self): - m = MetaData(testing.db) + m = self.metadata t1 = Table('t1', m, Column('intcol', Integer), Column('numericcol', Numeric(precision=9, scale=2)), @@ -1091,41 +1104,38 @@ class TypesTest(fixtures.TestBase): ) t1.create() - try: - t1.insert().execute( - intcol=1, - numericcol=5.2, - floatcol1=6.5, - floatcol2 = 8.5, - doubleprec = 9.5, - numbercol1=12, - numbercol2=14.85, - numbercol3=15.76 - ) - - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) + t1.insert().execute( + intcol=1, + numericcol=5.2, + floatcol1=6.5, + floatcol2=8.5, + doubleprec=9.5, + numbercol1=12, + numbercol2=14.85, + numbercol3=15.76 + ) - for row in ( - t1.select().execute().first(), - t2.select().execute().first() - ): - for i, (val, type_) in enumerate(( - (1, int), - (decimal.Decimal("5.2"), decimal.Decimal), - (6.5, float), - (8.5, float), - (9.5, float), - (12, int), - (decimal.Decimal("14.85"), decimal.Decimal), - (15.76, float), - )): - eq_(row[i], val) - assert isinstance(row[i], type_), '%r is not %r' \ - % (row[i], type_) + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + + for row in ( + t1.select().execute().first(), + t2.select().execute().first() + ): + for i, (val, type_) in enumerate(( + (1, int), + (decimal.Decimal("5.2"), decimal.Decimal), + (6.5, float), + (8.5, float), + (9.5, float), + (12, int), + (decimal.Decimal("14.85"), decimal.Decimal), + (15.76, float), + )): + eq_(row[i], val) + assert isinstance(row[i], type_), '%r is not %r' \ + % (row[i], type_) - finally: - t1.drop() def test_numeric_no_decimal_mode(self): @@ -1157,28 +1167,26 @@ class TypesTest(fixtures.TestBase): ) foo.create() - foo.insert().execute( - {'idata':5, 'ndata':decimal.Decimal("45.6"), - 'ndata2':decimal.Decimal("45.0"), - 'nidata':decimal.Decimal('53'), 'fdata':45.68392}, - ) + foo.insert().execute({ + 'idata': 5, + 'ndata': decimal.Decimal("45.6"), + 'ndata2': decimal.Decimal("45.0"), + 'nidata': decimal.Decimal('53'), + 'fdata': 45.68392 + }) - stmt = """ - SELECT - idata, - ndata, - ndata2, - nidata, - fdata - FROM foo - """ + stmt = "SELECT idata, ndata, ndata2, nidata, fdata FROM foo" row = testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, int, float]) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, int, float] + ) eq_( row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), 53, 45.683920000000001) + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + 53, 45.683920000000001) ) # with a nested subquery, @@ -1202,7 +1210,10 @@ class TypesTest(fixtures.TestBase): FROM dual """ row = testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal]) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) eq_( row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) @@ -1210,15 +1221,20 @@ class TypesTest(fixtures.TestBase): row = testing.db.execute(text(stmt, typemap={ - 'idata':Integer(), - 'ndata':Numeric(20, 2), - 'ndata2':Numeric(20, 2), - 'nidata':Numeric(5, 0), - 'fdata':Float() + 'idata': Integer(), + 'ndata': Numeric(20, 2), + 'ndata2': Numeric(20, 2), + 'nidata': Numeric(5, 0), + 'fdata': Float() })).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]) - eq_(row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) ) stmt = """ @@ -1244,39 +1260,55 @@ class TypesTest(fixtures.TestBase): ) WHERE ROWNUM >= 0) anon_1 """ - row =testing.db.execute(stmt).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, int, int, decimal.Decimal]) - eq_(row, (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392'))) + row = testing.db.execute(stmt).fetchall()[0] + eq_( + [type(x) for x in row], + [int, decimal.Decimal, int, int, decimal.Decimal] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), 45, 53, decimal.Decimal('45.68392')) + ) row = testing.db.execute(text(stmt, typemap={ - 'anon_1_idata':Integer(), - 'anon_1_ndata':Numeric(20, 2), - 'anon_1_ndata2':Numeric(20, 2), - 'anon_1_nidata':Numeric(5, 0), - 'anon_1_fdata':Float() + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2), + 'anon_1_ndata2': Numeric(20, 2), + 'anon_1_nidata': Numeric(5, 0), + 'anon_1_fdata': Float() })).fetchall()[0] - eq_([type(x) for x in row], [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float]) - eq_(row, - (5, decimal.Decimal('45.6'), decimal.Decimal('45'), decimal.Decimal('53'), 45.683920000000001) + eq_( + [type(x) for x in row], + [int, decimal.Decimal, decimal.Decimal, decimal.Decimal, float] + ) + eq_( + row, + (5, decimal.Decimal('45.6'), decimal.Decimal('45'), + decimal.Decimal('53'), 45.683920000000001) ) row = testing.db.execute(text(stmt, typemap={ - 'anon_1_idata':Integer(), - 'anon_1_ndata':Numeric(20, 2, asdecimal=False), - 'anon_1_ndata2':Numeric(20, 2, asdecimal=False), - 'anon_1_nidata':Numeric(5, 0, asdecimal=False), - 'anon_1_fdata':Float(asdecimal=True) + 'anon_1_idata': Integer(), + 'anon_1_ndata': Numeric(20, 2, asdecimal=False), + 'anon_1_ndata2': Numeric(20, 2, asdecimal=False), + 'anon_1_nidata': Numeric(5, 0, asdecimal=False), + 'anon_1_fdata': Float(asdecimal=True) })).fetchall()[0] - eq_([type(x) for x in row], [int, float, float, float, decimal.Decimal]) - eq_(row, + eq_( + [type(x) for x in row], + [int, float, float, float, decimal.Decimal] + ) + eq_( + row, (5, 45.6, 45, 53, decimal.Decimal('45.68392')) ) + @testing.provide_metadata def test_reflect_dates(self): - metadata = MetaData(testing.db) + metadata = self.metadata Table( "date_types", metadata, Column('d1', DATE), @@ -1285,20 +1317,16 @@ class TypesTest(fixtures.TestBase): Column('d4', oracle.INTERVAL(second_precision=5)), ) metadata.create_all() - try: - m = MetaData(testing.db) - t1 = Table( - "date_types", m, - autoload=True) - assert isinstance(t1.c.d1.type, DATE) - assert isinstance(t1.c.d2.type, TIMESTAMP) - assert not t1.c.d2.type.timezone - assert isinstance(t1.c.d3.type, TIMESTAMP) - assert t1.c.d3.type.timezone - assert isinstance(t1.c.d4.type, oracle.INTERVAL) - - finally: - metadata.drop_all() + m = MetaData(testing.db) + t1 = Table( + "date_types", m, + autoload=True) + assert isinstance(t1.c.d1.type, DATE) + assert isinstance(t1.c.d2.type, TIMESTAMP) + assert not t1.c.d2.type.timezone + assert isinstance(t1.c.d3.type, TIMESTAMP) + assert t1.c.d3.type.timezone + assert isinstance(t1.c.d4.type, oracle.INTERVAL) def test_reflect_all_types_schema(self): types_table = Table('all_types', MetaData(testing.db), @@ -1326,7 +1354,7 @@ class TypesTest(fixtures.TestBase): @testing.provide_metadata def test_reflect_nvarchar(self): metadata = self.metadata - t = Table('t', metadata, + Table('t', metadata, Column('data', sqltypes.NVARCHAR(255)) ) metadata.create_all() @@ -1348,22 +1376,20 @@ class TypesTest(fixtures.TestBase): assert isinstance(res, unicode) + @testing.provide_metadata def test_char_length(self): - metadata = MetaData(testing.db) + metadata = self.metadata t1 = Table('t1', metadata, Column("c1", VARCHAR(50)), Column("c2", NVARCHAR(250)), Column("c3", CHAR(200)) ) t1.create() - try: - m2 = MetaData(testing.db) - t2 = Table('t1', m2, autoload=True) - eq_(t2.c.c1.type.length, 50) - eq_(t2.c.c2.type.length, 250) - eq_(t2.c.c3.type.length, 200) - finally: - t1.drop() + m2 = MetaData(testing.db) + t2 = Table('t1', m2, autoload=True) + eq_(t2.c.c1.type.length, 50) + eq_(t2.c.c2.type.length, 250) + eq_(t2.c.c3.type.length, 200) @testing.provide_metadata def test_long_type(self): @@ -1379,8 +1405,6 @@ class TypesTest(fixtures.TestBase): "xyz" ) - - def test_longstring(self): metadata = MetaData(testing.db) testing.db.execute(""" @@ -1431,15 +1455,16 @@ class EuroNumericTest(fixtures.TestBase): del os.environ['NLS_LANG'] self.engine.dispose() - @testing.provide_metadata def test_output_type_handler(self): - metadata = self.metadata for stmt, exp, kw in [ ("SELECT 0.1 FROM DUAL", decimal.Decimal("0.1"), {}), ("SELECT 15 FROM DUAL", 15, {}), - ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", decimal.Decimal("15"), {}), - ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", decimal.Decimal("0.1"), {}), - ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), {'num':decimal.Decimal("2.5")}) + ("SELECT CAST(15 AS NUMERIC(3, 1)) FROM DUAL", + decimal.Decimal("15"), {}), + ("SELECT CAST(0.1 AS NUMERIC(5, 2)) FROM DUAL", + decimal.Decimal("0.1"), {}), + ("SELECT :num FROM DUAL", decimal.Decimal("2.5"), + {'num': decimal.Decimal("2.5")}) ]: test_exp = self.engine.scalar(stmt, **kw) eq_( @@ -1519,97 +1544,86 @@ class BufferedColumnTest(fixtures.TestBase, AssertsCompiledSQL): class UnsupportedIndexReflectTest(fixtures.TestBase): __only_on__ = 'oracle' - def setup(self): - global metadata - metadata = MetaData(testing.db) - t1 = Table('test_index_reflect', metadata, + @testing.emits_warning("No column names") + @testing.provide_metadata + def test_reflect_functional_index(self): + metadata = self.metadata + Table('test_index_reflect', metadata, Column('data', String(20), primary_key=True) ) metadata.create_all() - def teardown(self): - metadata.drop_all() - - @testing.emits_warning("No column names") - def test_reflect_functional_index(self): testing.db.execute('CREATE INDEX DATA_IDX ON ' 'TEST_INDEX_REFLECT (UPPER(DATA))') m2 = MetaData(testing.db) - t2 = Table('test_index_reflect', m2, autoload=True) + Table('test_index_reflect', m2, autoload=True) class RoundTripIndexTest(fixtures.TestBase): __only_on__ = 'oracle' + @testing.provide_metadata def test_basic(self): - engine = testing.db - metadata = MetaData(engine) + metadata = self.metadata - table=Table("sometable", metadata, + table = Table("sometable", metadata, Column("id_a", Unicode(255), primary_key=True), Column("id_b", Unicode(255), primary_key=True, unique=True), Column("group", Unicode(255), primary_key=True), Column("col", Unicode(255)), - UniqueConstraint('col','group'), + UniqueConstraint('col', 'group'), ) # "group" is a keyword, so lower case normalind = Index('tableind', table.c.id_b, table.c.group) - # create metadata.create_all() - try: - # round trip, create from reflection - mirror = MetaData(engine) - mirror.reflect() - metadata.drop_all() - mirror.create_all() - - # inspect the reflected creation - inspect = MetaData(engine) - inspect.reflect() - - def obj_definition(obj): - return obj.__class__, tuple([c.name for c in - obj.columns]), getattr(obj, 'unique', None) - - # find what the primary k constraint name should be - primaryconsname = engine.execute( - text("""SELECT constraint_name - FROM all_constraints - WHERE table_name = :table_name - AND owner = :owner - AND constraint_type = 'P' """), - table_name=table.name.upper(), - owner=engine.url.username.upper()).fetchall()[0][0] - - reflectedtable = inspect.tables[table.name] - - # make a dictionary of the reflected objects: - - reflected = dict([(obj_definition(i), i) for i in - reflectedtable.indexes - | reflectedtable.constraints]) - - # assert we got primary key constraint and its name, Error - # if not in dict - - assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', - 'group'), None)].name.upper() \ - == primaryconsname.upper() - - # Error if not in dict - - assert reflected[(Index, ('id_b', 'group'), False)].name \ - == normalind.name - assert (Index, ('id_b', ), True) in reflected - assert (Index, ('col', 'group'), True) in reflected - assert len(reflectedtable.constraints) == 1 - assert len(reflectedtable.indexes) == 3 + mirror = MetaData(testing.db) + mirror.reflect() + metadata.drop_all() + mirror.create_all() - finally: - metadata.drop_all() + inspect = MetaData(testing.db) + inspect.reflect() + + def obj_definition(obj): + return obj.__class__, tuple([c.name for c in + obj.columns]), getattr(obj, 'unique', None) + + # find what the primary k constraint name should be + primaryconsname = testing.db.execute( + text("""SELECT constraint_name + FROM all_constraints + WHERE table_name = :table_name + AND owner = :owner + AND constraint_type = 'P' """), + table_name=table.name.upper(), + owner=testing.db.url.username.upper()).fetchall()[0][0] + reflectedtable = inspect.tables[table.name] + # make a dictionary of the reflected objects: + + reflected = dict([(obj_definition(i), i) for i in + reflectedtable.indexes + | reflectedtable.constraints]) + + # assert we got primary key constraint and its name, Error + # if not in dict + + assert reflected[(PrimaryKeyConstraint, ('id_a', 'id_b', + 'group'), None)].name.upper() \ + == primaryconsname.upper() + + # Error if not in dict + + eq_( + reflected[(Index, ('id_b', 'group'), False)].name, + normalind.name + ) + assert (Index, ('id_b', ), True) in reflected + assert (Index, ('col', 'group'), True) in reflected + eq_(len(reflectedtable.constraints), 1) + eq_(len(reflectedtable.indexes), 3) class SequenceTest(fixtures.TestBase, AssertsCompiledSQL): @@ -1656,11 +1670,11 @@ class ExecuteTest(fixtures.TestBase): metadata.create_all() t.insert().execute( - {'id':1, 'data':1}, - {'id':2, 'data':7}, - {'id':3, 'data':12}, - {'id':4, 'data':15}, - {'id':5, 'data':32}, + {'id': 1, 'data': 1}, + {'id': 2, 'data': 7}, + {'id': 3, 'data': 12}, + {'id': 4, 'data': 15}, + {'id': 5, 'data': 32}, ) # here, we can't use ORDER BY. @@ -1685,34 +1699,34 @@ class UnicodeSchemaTest(fixtures.TestBase): @testing.provide_metadata def test_quoted_column_non_unicode(self): metadata = self.metadata - table=Table("atable", metadata, + table = Table("atable", metadata, Column("_underscorecolumn", Unicode(255), primary_key=True), ) metadata.create_all() table.insert().execute( - {'_underscorecolumn': u'’é'}, + {'_underscorecolumn': u('’é')}, ) result = testing.db.execute( - table.select().where(table.c._underscorecolumn==u'’é') + table.select().where(table.c._underscorecolumn == u('’é')) ).scalar() - eq_(result, u'’é') + eq_(result, u('’é')) @testing.provide_metadata def test_quoted_column_unicode(self): metadata = self.metadata - table=Table("atable", metadata, - Column(u"méil", Unicode(255), primary_key=True), + table = Table("atable", metadata, + Column(u("méil"), Unicode(255), primary_key=True), ) metadata.create_all() table.insert().execute( - {u'méil': u'’é'}, + {u('méil'): u('’é')}, ) result = testing.db.execute( - table.select().where(table.c[u'méil']==u'’é') + table.select().where(table.c[u('méil')] == u('’é')) ).scalar() - eq_(result, u'’é') + eq_(result, u('’é')) class DBLinkReflectionTest(fixtures.TestBase): |