diff options
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_case_statement.py | 18 | ||||
| -rw-r--r-- | test/sql/test_compiler.py | 226 | ||||
| -rw-r--r-- | test/sql/test_constraints.py | 44 | ||||
| -rw-r--r-- | test/sql/test_defaults.py | 26 | ||||
| -rw-r--r-- | test/sql/test_functions.py | 30 | ||||
| -rw-r--r-- | test/sql/test_generative.py | 110 | ||||
| -rw-r--r-- | test/sql/test_labels.py | 18 | ||||
| -rw-r--r-- | test/sql/test_metadata.py | 63 | ||||
| -rw-r--r-- | test/sql/test_query.py | 132 | ||||
| -rw-r--r-- | test/sql/test_quote.py | 4 | ||||
| -rw-r--r-- | test/sql/test_returning.py | 28 | ||||
| -rw-r--r-- | test/sql/test_rowcount.py | 4 | ||||
| -rw-r--r-- | test/sql/test_selectable.py | 68 | ||||
| -rw-r--r-- | test/sql/test_types.py | 272 | ||||
| -rw-r--r-- | test/sql/test_unicode.py | 4 |
15 files changed, 523 insertions, 524 deletions
diff --git a/test/sql/test_case_statement.py b/test/sql/test_case_statement.py index 7bc3ab31f..97220d4dd 100644 --- a/test/sql/test_case_statement.py +++ b/test/sql/test_case_statement.py @@ -94,31 +94,31 @@ class CaseTest(TestBase, AssertsCompiledSQL): def test_literal_interpretation(self): t = table('test', column('col1')) - + assert_raises(exc.ArgumentError, case, [("x", "y")]) - + self.assert_compile(case([("x", "y")], value=t.c.col1), "CASE test.col1 WHEN :param_1 THEN :param_2 END") self.assert_compile(case([(t.c.col1==7, "y")], else_="z"), "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END") - + def test_text_doesnt_explode(self): for s in [ select([case([(info_table.c.info == 'pk_4_data', text("'yes'"))], else_=text("'no'" ))]).order_by(info_table.c.info), - + select([case([(info_table.c.info == 'pk_4_data', literal_column("'yes'"))], else_=literal_column("'no'" ))]).order_by(info_table.c.info), - + ]: eq_(s.execute().fetchall(), [ (u'no', ), (u'no', ), (u'no', ), (u'yes', ), (u'no', ), (u'no', ), ]) - - - + + + @testing.fails_on('firebird', 'FIXME: unknown') @testing.fails_on('maxdb', 'FIXME: unknown') def testcase_with_dict(self): @@ -146,7 +146,7 @@ class CaseTest(TestBase, AssertsCompiledSQL): ], whereclause=info_table.c.pk < 4, from_obj=[info_table]) - + assert simple_query.execute().fetchall() == [ ('one', 1), ('two', 2), diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index b34eaeaae..d63e41e90 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -85,15 +85,15 @@ class SelectTest(TestBase, AssertsCompiledSQL): "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername FROM mytable, " "myothertable") - + def test_invalid_col_argument(self): assert_raises(exc.ArgumentError, select, table1) assert_raises(exc.ArgumentError, select, table1.c.myid) - + def test_from_subquery(self): """tests placing select statements in the column clause of another select, for the purposes of selecting from the exported columns of that select.""" - + s = select([table1], table1.c.name == 'jack') self.assert_compile( select( @@ -163,7 +163,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): select([ClauseList(column('a'), column('b'))]).select_from('sometable'), 'SELECT a, b FROM sometable' ) - + def test_use_labels(self): self.assert_compile( select([table1.c.myid==5], use_labels=True), @@ -184,15 +184,15 @@ class SelectTest(TestBase, AssertsCompiledSQL): select([cast("data", Integer)], use_labels=True), "SELECT CAST(:param_1 AS INTEGER) AS anon_1" ) - + self.assert_compile( select([func.sum(func.lala(table1.c.myid).label('foo')).label('bar')]), "SELECT sum(lala(mytable.myid)) AS bar FROM mytable" ) - + def test_paramstyles(self): stmt = text("select :foo, :bar, :bat from sometable") - + self.assert_compile( stmt, "select ?, ?, ? from sometable" @@ -218,10 +218,10 @@ class SelectTest(TestBase, AssertsCompiledSQL): "select %(foo)s, %(bar)s, %(bat)s from sometable" , dialect=default.DefaultDialect(paramstyle='pyformat') ) - + def test_dupe_columns(self): """test that deduping is performed against clause element identity, not rendered result.""" - + self.assert_compile( select([column('a'), column('a'), column('a')]), "SELECT a, a, a" @@ -241,7 +241,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): "SELECT a, b" , dialect=default.DefaultDialect() ) - + self.assert_compile( select([bindparam('a'), bindparam('b'), bindparam('c')]), "SELECT :a, :b, :c" @@ -258,11 +258,11 @@ class SelectTest(TestBase, AssertsCompiledSQL): select(["a", "a", "a"]), "SELECT a, a, a" ) - + s = select([bindparam('a'), bindparam('b'), bindparam('c')]) s = s.compile(dialect=default.DefaultDialect(paramstyle='qmark')) eq_(s.positiontup, ['a', 'b', 'c']) - + def test_nested_uselabels(self): """test nested anonymous label generation. this essentially tests the ANONYMOUS_LABEL regex. @@ -285,7 +285,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): 'mytable.name AS name, mytable.description ' 'AS description FROM mytable) AS anon_2) ' 'AS anon_1') - + def test_dont_overcorrelate(self): self.assert_compile(select([table1], from_obj=[table1, table1.select()]), @@ -294,7 +294,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): "mytable.myid AS myid, mytable.name AS " "name, mytable.description AS description " "FROM mytable)") - + def test_full_correlate(self): # intentional t = table('t', column('a'), column('b')) @@ -302,7 +302,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): s2 = select([t.c.a, s]) self.assert_compile(s2, """SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t""") - + # unintentional t2 = table('t2', column('c'), column('d')) s = select([t.c.a]).where(t.c.a==t2.c.d).as_scalar() @@ -313,18 +313,18 @@ class SelectTest(TestBase, AssertsCompiledSQL): s = s.correlate(t, t2) s2 =select([t, t2, s]) self.assert_compile(s, "SELECT t.a WHERE t.a = t2.d") - + def test_exists(self): s = select([table1.c.myid]).where(table1.c.myid==5) - + self.assert_compile(exists(s), "EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)" ) - + self.assert_compile(exists(s.as_scalar()), "EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)" ) - + self.assert_compile(exists([table1.c.myid], table1.c.myid == 5).select(), 'SELECT EXISTS (SELECT mytable.myid FROM ' @@ -375,7 +375,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): 'WHERE EXISTS (SELECT * FROM myothertable ' 'AS myothertable_1 WHERE ' 'myothertable_1.otherid = mytable.myid)') - + self.assert_compile( select([ or_( @@ -388,7 +388,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): "OR (EXISTS (SELECT * FROM myothertable WHERE " "myothertable.otherid = :otherid_2)) AS anon_1" ) - + def test_where_subquery(self): s = select([addresses.c.street], addresses.c.user_id @@ -619,7 +619,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): self.assert_compile( label('bar', column('foo', type_=String))+ 'foo', 'foo || :param_1') - + def test_conjunctions(self): a, b, c = 'a', 'b', 'c' @@ -630,7 +630,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): select([x.label('foo')]), 'SELECT a AND b AND c AS foo' ) - + self.assert_compile( and_(table1.c.myid == 12, table1.c.name=='asdf', table2.c.othername == 'foo', "sysdate() = today()"), @@ -651,7 +651,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): 'today()', checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12} ) - + def test_distinct(self): self.assert_compile( @@ -678,7 +678,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): select([func.count(distinct(table1.c.myid))]), "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable" ) - + def test_operators(self): for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'), (operator.sub, '-'), @@ -730,7 +730,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): self.assert_(compiled == fwd_sql or compiled == rev_sql, "\n'" + compiled + "'\n does not match\n'" + fwd_sql + "'\n or\n'" + rev_sql + "'") - + for (py_op, op) in ( (operator.neg, '-'), (operator.inv, 'NOT '), @@ -739,11 +739,11 @@ class SelectTest(TestBase, AssertsCompiledSQL): (table1.c.myid, "mytable.myid"), (literal("foo"), ":param_1"), ): - + compiled = str(py_op(expr)) sql = "%s%s" % (op, sql) eq_(compiled, sql) - + self.assert_compile( table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')), "SELECT mytable.myid, mytable.name, mytable.description FROM " @@ -837,7 +837,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): postgresql.PGDialect()), ]: self.assert_compile(expr, check, dialect=dialect) - + def test_match(self): for expr, check, dialect in [ (table1.c.myid.match('somstr'), @@ -853,7 +853,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): postgresql.dialect()), (table1.c.myid.match('somstr'), "CONTAINS (mytable.myid, :myid_1)", - oracle.dialect()), + oracle.dialect()), ]: self.assert_compile(expr, check, dialect=dialect) @@ -1160,7 +1160,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): "SELECT column1 AS foobar, column2 AS hoho, myid FROM " "(SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)" ) - + self.assert_compile( select(['col1','col2'], from_obj='tablename').alias('myalias'), "SELECT col1, col2 FROM tablename" @@ -1189,7 +1189,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): checkparams={'bar':4, 'whee': 7}, dialect=dialect ) - + # test escaping out text() params with a backslash self.assert_compile( text("select * from foo where clock='05:06:07' and mork='\:mindy'"), @@ -1243,23 +1243,23 @@ class SelectTest(TestBase, AssertsCompiledSQL): "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': None, 'x': None, 'z': None} ) - + self.assert_compile( s.params(x=5, y=6, z=7), "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)", checkparams={'y': 6, 'x': 5, 'z': 7} ) - + @testing.emits_warning('.*empty sequence.*') def test_render_binds_as_literal(self): """test a compiler that renders binds inline into SQL in the columns clause.""" - + dialect = default.DefaultDialect() class Compiler(dialect.statement_compiler): ansi_bind_rules = True dialect.statement_compiler = Compiler - + self.assert_compile( select([literal("someliteral")]), "SELECT 'someliteral'", @@ -1283,23 +1283,23 @@ class SelectTest(TestBase, AssertsCompiledSQL): "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable", dialect=dialect ) - + self.assert_compile( select([literal("foo").in_([])]), "SELECT 'foo' != 'foo' AS anon_1", dialect=dialect ) - + assert_raises( exc.CompileError, bindparam("foo").in_([]).compile, dialect=dialect ) - - + + def test_literal(self): - + self.assert_compile(select([literal('foo')]), "SELECT :param_1") - + self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]), "SELECT :param_1 || :param_2 AS anon_1 FROM mytable") @@ -1334,7 +1334,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): expr, "SELECT mytable.name COLLATE latin1_german2_ci AS anon_1 FROM mytable") assert table1.c.name.collate('latin1_german2_ci').type is table1.c.name.type - + expr = select([table1.c.name.collate('latin1_german2_ci').label('k1')]).order_by('k1') self.assert_compile(expr,"SELECT mytable.name COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1") @@ -1384,8 +1384,8 @@ class SelectTest(TestBase, AssertsCompiledSQL): '''"table%name"."spaces % more spaces" AS "table%name_spaces % '''\ '''more spaces" FROM "table%name"''' ) - - + + def test_joins(self): self.assert_compile( join(table2, table1, table1.c.myid == table2.c.otherid).select(), @@ -1473,7 +1473,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): "select #1 has 2 columns, select #2 has 3", union, table3.select(), table1.select() ) - + x = union( select([table1], table1.c.myid == 5), select([table1], table1.c.myid == 12), @@ -1494,7 +1494,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): "FROM mytable UNION SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable) UNION SELECT mytable.myid," " mytable.name, mytable.description FROM mytable") - + u1 = union( select([table1.c.myid, table1.c.name]), select([table2]), @@ -1507,7 +1507,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): "FROM thirdtable") assert u1.corresponding_column(table2.c.otherid) is u1.c.myid - + self.assert_compile( union( select([table1.c.myid, table1.c.name]), @@ -1557,7 +1557,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): "SELECT thirdtable.userid FROM thirdtable)" ) - + s = select([column('foo'), column('bar')]) # ORDER BY's even though not supported by all DB's, are rendered if requested @@ -1569,9 +1569,9 @@ class SelectTest(TestBase, AssertsCompiledSQL): union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()), "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, bar ORDER BY bar LIMIT :param_1)", {'param_1':10} - + ) - + def test_compound_grouping(self): s = select([column('foo'), column('bar')]).select_from('bat') @@ -1580,19 +1580,19 @@ class SelectTest(TestBase, AssertsCompiledSQL): "((SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) " "UNION SELECT foo, bar FROM bat) UNION SELECT foo, bar FROM bat" ) - + self.assert_compile( union(s, s, s, s), "SELECT foo, bar FROM bat UNION SELECT foo, bar " "FROM bat UNION SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat" ) - + self.assert_compile( union(s, union(s, union(s, s))), "SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat " "UNION (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat))" ) - + self.assert_compile( select([s.alias()]), 'SELECT anon_1.foo, anon_1.bar FROM (SELECT foo, bar FROM bat) AS anon_1' @@ -1654,8 +1654,8 @@ class SelectTest(TestBase, AssertsCompiledSQL): "UNION SELECT foo, bar FROM bat) " "UNION (SELECT foo, bar FROM bat " "UNION SELECT foo, bar FROM bat)") - - + + self.assert_compile( union( intersect(s, s), @@ -1817,9 +1817,9 @@ class SelectTest(TestBase, AssertsCompiledSQL): def test_binds_no_hash_collision(self): """test that construct_params doesn't corrupt dict due to hash collisions""" - + total_params = 100000 - + in_clause = [':in%d' % i for i in range(total_params)] params = dict(('in%d' % i, i) for i in range(total_params)) sql = 'text clause %s' % ', '.join(in_clause) @@ -1829,14 +1829,14 @@ class SelectTest(TestBase, AssertsCompiledSQL): pp = c.construct_params(params) eq_(len(set(pp)), total_params, '%s %s' % (len(set(pp)), len(pp))) eq_(len(set(pp.values())), total_params) - + def test_bind_as_col(self): t = table('foo', column('id')) s = select([t, literal('lala').label('hoho')]) self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo") - + assert [str(c) for c in s.c] == ["id", "hoho"] def test_bind_callable(self): @@ -1846,8 +1846,8 @@ class SelectTest(TestBase, AssertsCompiledSQL): "x = :key", {'x':12} ) - - + + @testing.emits_warning('.*empty sequence.*') def test_in(self): self.assert_compile(table1.c.myid.in_(['a']), @@ -1969,7 +1969,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): ), "(mytable.myid, mytable.name) IN ((myothertable.otherid, myothertable.othername))" ) - + self.assert_compile( tuple_(table1.c.myid, table1.c.name).in_( select([table2.c.otherid, table2.c.othername]) @@ -1977,8 +1977,8 @@ class SelectTest(TestBase, AssertsCompiledSQL): "(mytable.myid, mytable.name) IN (SELECT " "myothertable.otherid, myothertable.othername FROM myothertable)" ) - - + + def test_cast(self): tbl = table('casttest', column('id', Integer), @@ -2039,7 +2039,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): self.assert_compile(cast(literal_column('NULL'), Integer), 'CAST(NULL AS INTEGER)', dialect=sqlite.dialect()) - + def test_date_between(self): import datetime table = Table('dt', metadata, @@ -2085,15 +2085,15 @@ class SelectTest(TestBase, AssertsCompiledSQL): "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2") self.assert_compile(table.select(between((table.c.field == table.c.field), False, True)), "SELECT op.field FROM op WHERE (op.field = op.field) BETWEEN :param_1 AND :param_2") - + def test_associativity(self): f = column('f') self.assert_compile( f - f, "f - f" ) self.assert_compile( f - f - f, "(f - f) - f" ) - + self.assert_compile( (f - f) - f, "(f - f) - f" ) self.assert_compile( (f - f).label('foo') - f, "(f - f) - f" ) - + self.assert_compile( f - (f - f), "f - (f - f)" ) self.assert_compile( f - (f - f).label('foo'), "f - (f - f)" ) @@ -2104,54 +2104,54 @@ class SelectTest(TestBase, AssertsCompiledSQL): self.assert_compile( f / f - f, "f / f - f" ) self.assert_compile( (f / f) - f, "f / f - f" ) self.assert_compile( (f / f).label('foo') - f, "f / f - f" ) - + # because / more precedent than - self.assert_compile( f - (f / f), "f - f / f" ) self.assert_compile( f - (f / f).label('foo'), "f - f / f" ) self.assert_compile( f - f / f, "f - f / f" ) self.assert_compile( (f - f) / f, "(f - f) / f" ) - + self.assert_compile( ((f - f) / f) - f, "(f - f) / f - f") self.assert_compile( (f - f) / (f - f), "(f - f) / (f - f)") - + # higher precedence self.assert_compile( (f / f) - (f / f), "f / f - f / f") self.assert_compile( (f / f) - (f - f), "f / f - (f - f)") self.assert_compile( (f / f) / (f - f), "(f / f) / (f - f)") self.assert_compile( f / (f / (f - f)), "f / (f / (f - f))") - - + + def test_delayed_col_naming(self): my_str = Column(String) - + sel1 = select([my_str]) - + assert_raises_message( exc.InvalidRequestError, "Cannot initialize a sub-selectable with this Column", lambda: sel1.c ) - + # calling label or as_scalar doesn't compile - # anything. + # anything. sel2 = select([func.substr(my_str, 2, 3)]).label('my_substr') - + assert_raises_message( exc.CompileError, "Cannot compile Column object until it's 'name' is assigned.", str, sel2 ) - + sel3 = select([my_str]).as_scalar() assert_raises_message( exc.CompileError, "Cannot compile Column object until it's 'name' is assigned.", str, sel3 ) - + my_str.name = 'foo' - + self.assert_compile( sel1, "SELECT foo", @@ -2160,18 +2160,18 @@ class SelectTest(TestBase, AssertsCompiledSQL): sel2, '(SELECT substr(foo, :substr_2, :substr_3) AS substr_1)', ) - + self.assert_compile( sel3, "(SELECT foo)" ) - + def test_naming(self): f1 = func.hoho(table1.c.name) s1 = select([table1.c.myid, table1.c.myid.label('foobar'), f1, func.lala(table1.c.name).label('gg')]) - + eq_( s1.c.keys(), ['myid', 'foobar', str(f1), 'gg'] @@ -2179,7 +2179,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): meta = MetaData() t1 = Table('mytable', meta, Column('col1', Integer)) - + exprs = ( table1.c.myid==12, func.hoho(table1.c.myid), @@ -2197,15 +2197,15 @@ class SelectTest(TestBase, AssertsCompiledSQL): t = col.table else: t = table1 - + s1 = select([col], from_obj=t) assert s1.c.keys() == [key], s1.c.keys() - + if label: self.assert_compile(s1, "SELECT %s AS %s FROM mytable" % (expr, label)) else: self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,)) - + s1 = select([s1]) if label: self.assert_compile(s1, @@ -2220,7 +2220,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): self.assert_compile(s1, "SELECT %s FROM (SELECT %s FROM mytable)" % (expr,expr)) - + def test_hints(self): s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s") @@ -2230,12 +2230,12 @@ class SelectTest(TestBase, AssertsCompiledSQL): a1 = table1.alias() s3 = select([a1.c.myid]).with_hint(a1, "index(%(name)s hint)") - + subs4 = select([ table1, table2 ]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).\ with_hint(table1, 'hint1') - + s4 = select([table3]).select_from( table3.join( subs4, @@ -2243,7 +2243,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): ) ).\ with_hint(table3, 'hint3') - + subs5 = select([ table1, table2 ]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)) @@ -2255,12 +2255,12 @@ class SelectTest(TestBase, AssertsCompiledSQL): ).\ with_hint(table3, 'hint3').\ with_hint(table1, 'hint1') - + t1 = table('QuotedName', column('col1')) s6 = select([t1.c.col1]).where(t1.c.col1>10).with_hint(t1, '%(name)s idx1') a2 = t1.alias('SomeName') s7 = select([a2.c.col1]).where(a2.c.col1>10).with_hint(a2, '%(name)s idx1') - + mysql_d, oracle_d, sybase_d = \ mysql.dialect(), \ oracle.dialect(), \ @@ -2336,13 +2336,13 @@ class SelectTest(TestBase, AssertsCompiledSQL): and_("a", "b"), "a AND b" ) - + def test_literal_as_text_nonstring_raise(self): assert_raises(exc.ArgumentError, and_, ("a",), ("b",) ) - - + + class CRUDTest(TestBase, AssertsCompiledSQL): def test_insert(self): # generic insert, will create bind params for all columns @@ -2514,7 +2514,7 @@ class CRUDTest(TestBase, AssertsCompiledSQL): where(table1.c.name=='somename'), "DELETE FROM mytable WHERE mytable.myid = :myid_1 " "AND mytable.name = :name_1") - + def test_correlated_delete(self): # test a non-correlated WHERE clause s = select([table2.c.othername], table2.c.otherid == 7) @@ -2529,26 +2529,26 @@ class CRUDTest(TestBase, AssertsCompiledSQL): "DELETE FROM mytable WHERE mytable.name = (SELECT " "myothertable.othername FROM myothertable WHERE " "myothertable.otherid = mytable.myid)") - + def test_binds_that_match_columns(self): """test bind params named after column names replace the normal SET/VALUES generation.""" - + t = table('foo', column('x'), column('y')) u = t.update().where(t.c.x==bindparam('x')) - + assert_raises(exc.CompileError, u.compile) - + self.assert_compile(u, "UPDATE foo SET WHERE foo.x = :x", params={}) assert_raises(exc.CompileError, u.values(x=7).compile) - + self.assert_compile(u.values(y=7), "UPDATE foo SET y=:y WHERE foo.x = :x") - + assert_raises(exc.CompileError, u.values(x=7).compile, column_keys=['x', 'y']) assert_raises(exc.CompileError, u.compile, column_keys=['x', 'y']) - + self.assert_compile(u.values(x=3 + bindparam('x')), "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x") @@ -2574,7 +2574,7 @@ class CRUDTest(TestBase, AssertsCompiledSQL): i = t.insert().values(x=3 + bindparam('y'), y=5) assert_raises(exc.CompileError, i.compile) - + i = t.insert().values(x=3 + bindparam('x2')) self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))") self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={}) @@ -2582,11 +2582,11 @@ class CRUDTest(TestBase, AssertsCompiledSQL): params={'x':1, 'y':2}) self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)", params={'x2':1, 'y':2}) - + def test_labels_no_collision(self): - + t = table('foo', column('id'), column('foo_id')) - + self.assert_compile( t.update().where(t.c.id==5), "UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :id_1" @@ -2596,7 +2596,7 @@ class CRUDTest(TestBase, AssertsCompiledSQL): t.update().where(t.c.id==bindparam(key=t.c.id._label)), "UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :foo_id_1" ) - + class InlineDefaultTest(TestBase, AssertsCompiledSQL): def test_insert(self): m = MetaData() @@ -2634,7 +2634,7 @@ class SchemaTest(TestBase, AssertsCompiledSQL): self.assert_compile(table4.select(), "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id," " remote_owner.remotetable.value FROM remote_owner.remotetable") - + self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')), "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id," " remote_owner.remotetable.value FROM remote_owner.remotetable WHERE " @@ -2664,7 +2664,7 @@ class SchemaTest(TestBase, AssertsCompiledSQL): ' "dbo.remote_owner".remotetable.value AS dbo_remote_owner_remotetable_value FROM' ' "dbo.remote_owner".remotetable' ) - + def test_alias(self): a = alias(table4, 'remtable') self.assert_compile(a.select(a.c.datatype_id==7), diff --git a/test/sql/test_constraints.py b/test/sql/test_constraints.py index 56c5c6205..fb07bf437 100644 --- a/test/sql/test_constraints.py +++ b/test/sql/test_constraints.py @@ -36,10 +36,10 @@ class ConstraintTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): def test_double_fk_usage_raises(self): f = ForeignKey('b.id') - + Column('x', Integer, f) assert_raises(exc.InvalidRequestError, Column, "y", Integer, f) - + def test_circular_constraint(self): a = Table("a", metadata, Column('id', Integer, primary_key=True), @@ -192,22 +192,22 @@ class ConstraintTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): ('sometable', 'this_name_is_too_long', 'ix_sometable_t_09aa'), ('sometable', 'this_name_alsois_long', 'ix_sometable_t_3cf1'), ]: - + t1 = Table(tname, MetaData(), Column(cname, Integer, index=True), ) ix1 = list(t1.indexes)[0] - + self.assert_compile( schema.CreateIndex(ix1), "CREATE INDEX %s " "ON %s (%s)" % (exp, tname, cname), dialect=dialect ) - + dialect.max_identifier_length = 22 dialect.max_index_name_length = None - + t1 = Table('t', MetaData(), Column('c', Integer)) assert_raises( exc.IdentifierError, @@ -217,7 +217,7 @@ class ConstraintTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): dialect=dialect ) - + class ConstraintCompilationTest(TestBase, AssertsCompiledSQL): def _test_deferrable(self, constraint_factory): @@ -225,11 +225,11 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL): Column('a', Integer), Column('b', Integer), constraint_factory(deferrable=True)) - + sql = str(schema.CreateTable(t).compile(bind=testing.db)) assert 'DEFERRABLE' in sql, sql assert 'NOT DEFERRABLE' not in sql, sql - + t = Table('tbl', MetaData(), Column('a', Integer), Column('b', Integer), @@ -291,18 +291,18 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL): CheckConstraint('a < b', deferrable=True, initially='DEFERRED'))) - + self.assert_compile( schema.CreateTable(t), "CREATE TABLE tbl (a INTEGER, b INTEGER CHECK (a < b) DEFERRABLE INITIALLY DEFERRED)" ) - + def test_use_alter(self): m = MetaData() t = Table('t', m, Column('a', Integer), ) - + t2 = Table('t2', m, Column('a', Integer, ForeignKey('t.a', use_alter=True, name='fk_ta')), Column('b', Integer, ForeignKey('t.a', name='fk_tb')), # to ensure create ordering ... @@ -320,25 +320,25 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL): 'DROP TABLE t2', 'DROP TABLE t' ]) - - + + def test_add_drop_constraint(self): m = MetaData() - + t = Table('tbl', m, Column('a', Integer), Column('b', Integer) ) - + t2 = Table('t2', m, Column('a', Integer), Column('b', Integer) ) - + constraint = CheckConstraint('a < b',name="my_test_constraint", deferrable=True,initially='DEFERRED', table=t) - + # before we create an AddConstraint, # the CONSTRAINT comes out inline self.assert_compile( @@ -397,13 +397,13 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL): schema.AddConstraint(constraint), "ALTER TABLE t2 ADD CONSTRAINT uq_cst UNIQUE (a, b)" ) - + constraint = UniqueConstraint(t2.c.a, t2.c.b, name="uq_cs2") self.assert_compile( schema.AddConstraint(constraint), "ALTER TABLE t2 ADD CONSTRAINT uq_cs2 UNIQUE (a, b)" ) - + assert t.c.a.primary_key is False constraint = PrimaryKeyConstraint(t.c.a) assert t.c.a.primary_key is True @@ -411,5 +411,5 @@ class ConstraintCompilationTest(TestBase, AssertsCompiledSQL): schema.AddConstraint(constraint), "ALTER TABLE tbl ADD PRIMARY KEY (a)" ) - - + + diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index 7ec43f8d2..31759a709 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -147,7 +147,7 @@ class DefaultTest(testing.TestBase): assert_raises_message(sa.exc.ArgumentError, ex_msg, sa.ColumnDefault, fn) - + def test_arg_signature(self): def fn1(): pass def fn2(): pass @@ -277,7 +277,7 @@ class DefaultTest(testing.TestBase): assert r.lastrow_has_defaults() eq_(set(r.context.postfetch_cols), set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])) - + eq_(t.select(t.c.col1==54).execute().fetchall(), [(54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, None)]) @@ -301,7 +301,7 @@ class DefaultTest(testing.TestBase): 12, today, 'py'), (53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today, 'py')]) - + def test_missing_many_param(self): assert_raises_message(exc.InvalidRequestError, "A value is required for bind parameter 'col7', in parameter group 1", @@ -310,7 +310,7 @@ class DefaultTest(testing.TestBase): {'col4':7, 'col8':19}, {'col4':7, 'col7':12, 'col8':19}, ) - + def test_insert_values(self): t.insert(values={'col3':50}).execute() l = t.select().execute() @@ -366,7 +366,7 @@ class DefaultTest(testing.TestBase): l = l.first() eq_(55, l['col3']) - + class PKDefaultTest(_base.TablesTest): __requires__ = ('subqueries',) @@ -379,14 +379,14 @@ class PKDefaultTest(_base.TablesTest): Column('id', Integer, primary_key=True, default=sa.select([func.max(t2.c.nextid)]).as_scalar()), Column('data', String(30))) - + @testing.requires.returning def test_with_implicit_returning(self): self._test(True) - + def test_regular(self): self._test(False) - + @testing.resolve_artifact_names def _test(self, returning): if not returning and not testing.db.dialect.implicit_returning: @@ -442,7 +442,7 @@ class PKIncrementTest(_base.TablesTest): ids.add(last) eq_(ids, set([1,2,3,4])) - + eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))), [(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)]) @@ -477,7 +477,7 @@ class EmptyInsertTest(testing.TestBase): t1 = Table('t1', metadata, Column('is_true', Boolean, server_default=('1'))) metadata.create_all() - + try: result = t1.insert().execute() eq_(1, select([func.count(text('*'))], from_obj=t1).scalar()) @@ -588,7 +588,7 @@ class SequenceTest(testing.TestBase, testing.AssertsCompiledSQL): "DROP SEQUENCE foo_seq", use_default_dialect=True, ) - + @testing.fails_on('firebird', 'no FB support for start/increment') @testing.requires.sequences @@ -605,10 +605,10 @@ class SequenceTest(testing.TestBase, testing.AssertsCompiledSQL): start = seq.start or 1 inc = seq.increment or 1 assert values == list(xrange(start, start + inc * 3, inc)) - + finally: seq.drop(testing.db) - + @testing.requires.sequences def test_seq_nonpk(self): """test sequences fire off as defaults on non-pk columns""" diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py index 0fb2ca5f7..13116a4bb 100644 --- a/test/sql/test_functions.py +++ b/test/sql/test_functions.py @@ -26,8 +26,8 @@ class CompileTest(TestBase, AssertsCompiledSQL): self.assert_compile(func.nosuchfunction(), "nosuchfunction", dialect=dialect) else: self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect) - - # test generic function compile + + # test generic function compile class fake_func(GenericFunction): __return_type__ = sqltypes.Integer @@ -39,14 +39,14 @@ class CompileTest(TestBase, AssertsCompiledSQL): "fake_func(%s)" % bindtemplate % {'name':'param_1', 'position':1}, dialect=dialect) - + def test_use_labels(self): self.assert_compile(select([func.foo()], use_labels=True), "SELECT foo() AS foo_1" ) def test_underscores(self): self.assert_compile(func.if_(), "if()") - + def test_generic_now(self): assert isinstance(func.now().type, sqltypes.DateTime) @@ -69,10 +69,10 @@ class CompileTest(TestBase, AssertsCompiledSQL): ('random', oracle.dialect()) ]: self.assert_compile(func.random(), ret, dialect=dialect) - + def test_namespacing_conflicts(self): self.assert_compile(func.text('foo'), 'text(:text_1)') - + def test_generic_count(self): assert isinstance(func.count().type, sqltypes.Integer) @@ -101,7 +101,7 @@ class CompileTest(TestBase, AssertsCompiledSQL): assert True def test_return_type_detection(self): - + for fn in [func.coalesce, func.max, func.min, func.sum]: for args, type_ in [ ((datetime.date(2007, 10, 5), @@ -113,7 +113,7 @@ class CompileTest(TestBase, AssertsCompiledSQL): datetime.datetime(2005, 10, 15, 14, 45, 33)), sqltypes.DateTime) ]: assert isinstance(fn(*args).type, type_), "%s / %s" % (fn(), type_) - + assert isinstance(func.concat("foo", "bar").type, sqltypes.String) @@ -193,7 +193,7 @@ class ExecuteTest(TestBase): @engines.close_first def tearDown(self): pass - + def test_standalone_execute(self): x = testing.db.func.current_date().execute().scalar() y = testing.db.func.current_date().select().execute().scalar() @@ -208,10 +208,10 @@ class ExecuteTest(TestBase): def test_conn_execute(self): from sqlalchemy.sql.expression import FunctionElement from sqlalchemy.ext.compiler import compiles - + class myfunc(FunctionElement): type = Date() - + @compiles(myfunc) def compile(elem, compiler, **kw): return compiler.process(func.current_date()) @@ -229,17 +229,17 @@ class ExecuteTest(TestBase): def test_exec_options(self): f = func.foo() eq_(f._execution_options, {}) - + f = f.execution_options(foo='bar') eq_(f._execution_options, {'foo':'bar'}) s = f.select() eq_(s._execution_options, {'foo':'bar'}) - + ret = testing.db.execute(func.now().execution_options(foo='bar')) eq_(ret.context.execution_options, {'foo':'bar'}) ret.close() - - + + @engines.close_first def test_update(self): """ diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index e129f69c4..627736370 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -27,7 +27,7 @@ class TraversalTest(TestBase, AssertsExecutionResults): return other is self __hash__ = ClauseElement.__hash__ - + def __eq__(self, other): return other.expr == self.expr @@ -100,7 +100,7 @@ class TraversalTest(TestBase, AssertsExecutionResults): s2 = vis.traverse(struct) assert struct == s2 assert not struct.is_other(s2) - + def test_no_clone(self): struct = B(A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) @@ -151,9 +151,9 @@ class TraversalTest(TestBase, AssertsExecutionResults): class CustomObj(Column): pass - + assert CustomObj.__visit_name__ == Column.__visit_name__ == 'column' - + foo, bar = CustomObj('foo', String), CustomObj('bar', String) bin = foo == bar s = set(ClauseVisitor().iterate(bin)) @@ -193,7 +193,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL): f = sql_util.ClauseAdapter(a).traverse(f) self.assert_compile(select([f]), "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1") - + def test_join(self): clause = t1.join(t2, t1.c.col2==t2.c.col2) c1 = str(clause) @@ -206,20 +206,20 @@ class ClauseTest(TestBase, AssertsCompiledSQL): clause2 = Vis().traverse(clause) assert c1 == str(clause) assert str(clause2) == str(t1.join(t2, t1.c.col2==t2.c.col3)) - + def test_aliased_column_adapt(self): clause = t1.select() - + aliased = t1.select().alias() aliased2 = t1.alias() adapter = sql_util.ColumnAdapter(aliased) - + f = select([ adapter.columns[c] for c in aliased2.c ]).select_from(aliased) - + s = select([aliased2]).select_from(aliased) eq_(str(s), str(f)) @@ -230,8 +230,8 @@ class ClauseTest(TestBase, AssertsCompiledSQL): str(select([func.count(aliased2.c.col1)]).select_from(aliased)), str(f) ) - - + + def test_text(self): clause = text("select * from table where foo=:bar", bindparams=[bindparam('bar')]) c1 = str(clause) @@ -288,7 +288,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL): print str(s5) assert str(s5) == s5_assert assert str(s4) == s4_assert - + def test_union(self): u = union(t1.select(), t2.select()) u2 = CloningVisitor().traverse(u) @@ -300,27 +300,27 @@ class ClauseTest(TestBase, AssertsCompiledSQL): u2 = CloningVisitor().traverse(u) assert str(u) == str(u2) assert [str(c) for c in u2.c] == cols - + s1 = select([t1], t1.c.col1 == bindparam('id_param')) s2 = select([t2]) u = union(s1, s2) - + u2 = u.params(id_param=7) u3 = u.params(id_param=10) assert str(u) == str(u2) == str(u3) assert u2.compile().params == {'id_param':7} assert u3.compile().params == {'id_param':10} - + def test_in(self): expr = t1.c.col1.in_(['foo', 'bar']) expr2 = CloningVisitor().traverse(expr) assert str(expr) == str(expr2) - + def test_adapt_union(self): u = union(t1.select().where(t1.c.col1==4), t1.select().where(t1.c.col1==5)).alias() - + assert sql_util.ClauseAdapter(u).traverse(t1) is u - + def test_binds(self): """test that unique bindparams change their name upon clone() to prevent conflicts""" @@ -340,17 +340,17 @@ class ClauseTest(TestBase, AssertsCompiledSQL): "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1, "\ "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 "\ "WHERE anon_1.col2 = anon_2.col2") - + def test_extract(self): s = select([extract('foo', t1.c.col1).label('col1')]) self.assert_compile(s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") - + s2 = CloningVisitor().traverse(s).alias() s3 = select([s2.c.col1]) self.assert_compile(s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") self.assert_compile(s3, "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1) AS anon_1") - - + + @testing.emits_warning('.*replaced by another column with the same key') def test_alias(self): subq = t2.select().alias('subq') @@ -372,7 +372,7 @@ class ClauseTest(TestBase, AssertsCompiledSQL): s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)]) s5 = CloningVisitor().traverse(s) assert orig == str(s) == str(s5) - + def test_correlated_select(self): s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2) class Vis(CloningVisitor): @@ -380,32 +380,32 @@ class ClauseTest(TestBase, AssertsCompiledSQL): select.append_whereclause(t1.c.col2==7) self.assert_compile(Vis().traverse(s), "SELECT * FROM table1 WHERE table1.col1 = table2.col1 AND table1.col2 = :col2_1") - + def test_this_thing(self): s = select([t1]).where(t1.c.col1=='foo').alias() s2 = select([s.c.col1]) - + self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1") t1a = t1.alias() s2 = sql_util.ClauseAdapter(t1a).traverse(s2) self.assert_compile(s2, "SELECT anon_1.col1 FROM (SELECT table1_1.col1 AS col1, table1_1.col2 AS col2, table1_1.col3 AS col3 FROM table1 AS table1_1 WHERE table1_1.col1 = :col1_1) AS anon_1") - + def test_select_fromtwice(self): t1a = t1.alias() - + s = select([1], t1.c.col1==t1a.c.col1, from_obj=t1a).correlate(t1) self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1") - + s = CloningVisitor().traverse(s) self.assert_compile(s, "SELECT 1 FROM table1 AS table1_1 WHERE table1.col1 = table1_1.col1") - + s = select([t1]).where(t1.c.col1=='foo').alias() - + s2 = select([1], t1.c.col1==s.c.col1, from_obj=s).correlate(t1) self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1") s2 = ReplacingCloningVisitor().traverse(s2) self.assert_compile(s2, "SELECT 1 FROM (SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) AS anon_1 WHERE table1.col1 = anon_1.col1") - + class ClauseAdapterTest(TestBase, AssertsCompiledSQL): @classmethod def setup_class(cls): @@ -446,7 +446,7 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)") s = CloningVisitor().traverse(s) self.assert_compile(select(['*'], t2alias.c.col1==s), "SELECT * FROM table2 AS t2alias WHERE t2alias.col1 = (SELECT * FROM table1 AS t1alias)") - + s = select(['*']).where(t1.c.col1==t2.c.col1).as_scalar() self.assert_compile(select([t1.c.col1, s]), "SELECT table1.col1, (SELECT * FROM table2 WHERE table1.col1 = table2.col1) AS anon_1 FROM table1") vis = sql_util.ClauseAdapter(t1alias) @@ -478,7 +478,7 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): j1 = addresses.join(ualias, addresses.c.user_id==ualias.c.id) self.assert_compile(sql_util.ClauseAdapter(j1).traverse(s), "SELECT count(addresses.id) AS count_1 FROM addresses WHERE users_1.id = addresses.user_id") - + def test_table_to_alias(self): t1alias = t1.alias('t1alias') @@ -543,7 +543,7 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): a = Table('a', m, Column('x', Integer), Column('y', Integer)) b = Table('b', m, Column('x', Integer), Column('y', Integer)) c = Table('c', m, Column('x', Integer), Column('y', Integer)) - + # force a recursion overflow, by linking a.c.x<->c.c.x, and # asking for a nonexistent col. corresponding_column should prevent # endless depth. @@ -557,13 +557,13 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): c = Table('c', m, Column('x', Integer), Column('y', Integer)) alias = select([a]).select_from(a.join(b, a.c.x==b.c.x)).alias() - + # two levels of indirection from c.x->b.x->a.x, requires recursive # corresponding_column call adapt = sql_util.ClauseAdapter(alias, equivalents= {b.c.x: set([ a.c.x]), c.c.x:set([b.c.x])}) assert adapt._corresponding_column(a.c.x, False) is alias.c.x assert adapt._corresponding_column(c.c.x, False) is alias.c.x - + def test_join_to_alias(self): metadata = MetaData() a = Table('a', metadata, @@ -642,13 +642,13 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1) AS foo LIMIT :param_1 OFFSET :param_2) AS anon_1 "\ "LEFT OUTER JOIN table1 AS bar ON anon_1.col1 = bar.col1", {'param_1':5, 'param_2':10}) - + def test_functions(self): self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(func.count(t1.c.col1)), "count(table1_1.col1)") s = select([func.count(t1.c.col1)]) self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(s), "SELECT count(table1_1.col1) AS count_1 FROM table1 AS table1_1") - + def test_recursive(self): metadata = MetaData() a = Table('a', metadata, @@ -670,8 +670,8 @@ class ClauseAdapterTest(TestBase, AssertsCompiledSQL): u = union( a.join(b).select().apply_labels(), a.join(d).select().apply_labels() - ).alias() - + ).alias() + self.assert_compile( sql_util.ClauseAdapter(u).traverse(select([c.c.bid]).where(c.c.bid==u.c.b_aid)), "SELECT c.bid "\ @@ -687,16 +687,16 @@ class SpliceJoinsTest(TestBase, AssertsCompiledSQL): global table1, table2, table3, table4 def _table(name): return table(name, column("col1"), column("col2"),column("col3")) - - table1, table2, table3, table4 = [_table(name) for name in ("table1", "table2", "table3", "table4")] + + table1, table2, table3, table4 = [_table(name) for name in ("table1", "table2", "table3", "table4")] def test_splice(self): (t1, t2, t3, t4) = (table1, table2, table1.alias(), table2.alias()) - + j = t1.join(t2, t1.c.col1==t2.c.col1).join(t3, t2.c.col1==t3.c.col1).join(t4, t4.c.col1==t1.c.col1) - + s = select([t1]).where(t1.c.col2<5).alias() - + self.assert_compile(sql_util.splice_joins(s, j), "(SELECT table1.col1 AS col1, table1.col2 AS col2, "\ "table1.col3 AS col3 FROM table1 WHERE table1.col2 < :col2_1) AS anon_1 "\ @@ -705,12 +705,12 @@ class SpliceJoinsTest(TestBase, AssertsCompiledSQL): def test_stop_on(self): (t1, t2, t3) = (table1, table2, table3) - + j1= t1.join(t2, t1.c.col1==t2.c.col1) j2 = j1.join(t3, t2.c.col1==t3.c.col1) - + s = select([t1]).select_from(j1).alias() - + self.assert_compile(sql_util.splice_joins(s, j2), "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 JOIN table2 "\ "ON table1.col1 = table2.col1) AS anon_1 JOIN table2 ON anon_1.col1 = table2.col1 JOIN table3 "\ @@ -720,27 +720,27 @@ class SpliceJoinsTest(TestBase, AssertsCompiledSQL): self.assert_compile(sql_util.splice_joins(s, j2, j1), "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 "\ "JOIN table2 ON table1.col1 = table2.col1) AS anon_1 JOIN table3 ON table2.col1 = table3.col1") - + def test_splice_2(self): t2a = table2.alias() t3a = table3.alias() j1 = table1.join(t2a, table1.c.col1==t2a.c.col1).join(t3a, t2a.c.col2==t3a.c.col2) - + t2b = table4.alias() j2 = table1.join(t2b, table1.c.col3==t2b.c.col3) - + self.assert_compile(sql_util.splice_joins(table1, j1), "table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\ "JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2") - + self.assert_compile(sql_util.splice_joins(table1, j2), "table1 JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3") self.assert_compile(sql_util.splice_joins(sql_util.splice_joins(table1, j1), j2), "table1 JOIN table2 AS table2_1 ON table1.col1 = table2_1.col1 "\ "JOIN table3 AS table3_1 ON table2_1.col2 = table3_1.col2 "\ "JOIN table4 AS table4_1 ON table1.col3 = table4_1.col3") - - + + class SelectTest(TestBase, AssertsCompiledSQL): """tests the generative capability of Select""" @@ -838,7 +838,7 @@ class SelectTest(TestBase, AssertsCompiledSQL): assert s._execution_options == dict(foo='bar') # s2 should have its execution_options based on s, though. assert s2._execution_options == dict(foo='bar', bar='baz') - + # this feature not available yet def _NOTYET_test_execution_options_in_text(self): s = text('select 42', execution_options=dict(foo='bar')) diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py index 0f84c30a0..9160aa3ee 100644 --- a/test/sql/test_labels.py +++ b/test/sql/test_labels.py @@ -51,7 +51,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL): assert_raises(exceptions.IdentifierError, m.drop_all) assert_raises(exceptions.IdentifierError, t1.create) assert_raises(exceptions.IdentifierError, t1.drop) - + def test_result(self): table1.insert().execute(**{"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"}) table1.insert().execute(**{"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"}) @@ -82,7 +82,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL): (1, "data1"), (2, "data2"), ], repr(result) - + @testing.requires.offset def go(): r = s.limit(2).offset(1).execute() @@ -94,7 +94,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL): (3, "data3"), ], repr(result) go() - + def test_table_alias_names(self): if testing.against('oracle'): self.assert_compile( @@ -113,7 +113,7 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL): self.assert_compile( select([table1, ta]).select_from(table1.join(ta, table1.c.this_is_the_data_column==ta.c.this_is_the_data_column)).\ where(ta.c.this_is_the_data_column=='data3'), - + "SELECT some_large_named_table.this_is_the_primarykey_column, some_large_named_table.this_is_the_data_column, " "table_with_exactly_29_c_1.this_is_the_primarykey_column, table_with_exactly_29_c_1.this_is_the_data_column FROM " "some_large_named_table JOIN table_with_exactly_29_characs AS table_with_exactly_29_c_1 ON " @@ -121,17 +121,17 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL): "WHERE table_with_exactly_29_c_1.this_is_the_data_column = :this_is_the_data_column_1", dialect=dialect ) - + table2.insert().execute( {"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"}, {"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"}, {"this_is_the_primarykey_column":3, "this_is_the_data_column":"data3"}, {"this_is_the_primarykey_column":4, "this_is_the_data_column":"data4"}, ) - + r = table2.alias().select().execute() assert r.fetchall() == [(x, "data%d" % x) for x in range(1, 5)] - + def test_colbinds(self): table1.insert().execute(**{"this_is_the_primarykey_column":1, "this_is_the_data_column":"data1"}) table1.insert().execute(**{"this_is_the_primarykey_column":2, "this_is_the_data_column":"data2"}) @@ -201,5 +201,5 @@ class LongLabelsTest(TestBase, AssertsCompiledSQL): self.assert_compile(x, "SELECT _1.this_is_the_primarykey_column AS _1, _1.this_is_the_data_column AS _2 FROM " "(SELECT some_large_named_table.this_is_the_primarykey_column AS _3, some_large_named_table.this_is_the_data_column AS _4 " "FROM some_large_named_table WHERE some_large_named_table.this_is_the_primarykey_column = :_1) AS _1", dialect=compile_dialect) - - + + diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index c5ef48154..75eac9ee8 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -32,7 +32,7 @@ class MetaDataTest(TestBase, ComparesTables): t2 = Table('t2', metadata, Column('x', Integer), schema='foo') t3 = Table('t2', MetaData(), Column('x', Integer)) t4 = Table('t1', MetaData(), Column('x', Integer), schema='foo') - + assert "t1" in metadata assert "foo.t2" in metadata assert "t2" not in metadata @@ -41,7 +41,7 @@ class MetaDataTest(TestBase, ComparesTables): assert t2 in metadata assert t3 not in metadata assert t4 not in metadata - + def test_uninitialized_column_copy(self): for col in [ Column('foo', String(), nullable=False), @@ -76,24 +76,24 @@ class MetaDataTest(TestBase, ComparesTables): cx = c1.copy() t = Table('foo%d' % i, m, cx) eq_(msgs, ['attach foo0.foo', 'attach foo1.foo', 'attach foo2.foo']) - + def test_schema_collection_add(self): metadata = MetaData() - + t1 = Table('t1', metadata, Column('x', Integer), schema='foo') t2 = Table('t2', metadata, Column('x', Integer), schema='bar') t3 = Table('t3', metadata, Column('x', Integer)) - + eq_(metadata._schemas, set(['foo', 'bar'])) eq_(len(metadata.tables), 3) - + def test_schema_collection_remove(self): metadata = MetaData() - + t1 = Table('t1', metadata, Column('x', Integer), schema='foo') t2 = Table('t2', metadata, Column('x', Integer), schema='bar') t3 = Table('t3', metadata, Column('x', Integer), schema='bar') - + metadata.remove(t3) eq_(metadata._schemas, set(['foo', 'bar'])) eq_(len(metadata.tables), 2) @@ -101,28 +101,28 @@ class MetaDataTest(TestBase, ComparesTables): metadata.remove(t1) eq_(metadata._schemas, set(['bar'])) eq_(len(metadata.tables), 1) - + def test_schema_collection_remove_all(self): metadata = MetaData() - + t1 = Table('t1', metadata, Column('x', Integer), schema='foo') t2 = Table('t2', metadata, Column('x', Integer), schema='bar') metadata.clear() eq_(metadata._schemas, set()) eq_(len(metadata.tables), 0) - + def test_metadata_tables_immutable(self): metadata = MetaData() - + t1 = Table('t1', metadata, Column('x', Integer)) assert 't1' in metadata.tables - + assert_raises( TypeError, lambda: metadata.tables.pop('t1') ) - + @testing.provide_metadata def test_dupe_tables(self): t1 = Table('table1', metadata, @@ -143,28 +143,28 @@ class MetaDataTest(TestBase, ComparesTables): "Table object.", go ) - + def test_fk_copy(self): c1 = Column('foo', Integer) c2 = Column('bar', Integer) m = MetaData() t1 = Table('t', m, c1, c2) - + kw = dict(onupdate="X", ondelete="Y", use_alter=True, name='f1', deferrable="Z", initially="Q", link_to_name=True) - + fk1 = ForeignKey(c1, **kw) fk2 = ForeignKeyConstraint((c1,), (c2,), **kw) - + t1.append_constraint(fk2) fk1c = fk1.copy() fk2c = fk2.copy() - + for k in kw: eq_(getattr(fk1c, k), kw[k]) eq_(getattr(fk2c, k), kw[k]) - + def test_fk_construct(self): c1 = Column('foo', Integer) c2 = Column('bar', Integer) @@ -172,7 +172,7 @@ class MetaDataTest(TestBase, ComparesTables): t1 = Table('t', m, c1, c2) fk1 = ForeignKeyConstraint(('foo', ), ('bar', ), table=t1) assert fk1 in t1.constraints - + @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely') def test_to_metadata(self): meta = MetaData() @@ -264,7 +264,7 @@ class MetaDataTest(TestBase, ComparesTables): assert not c.columns.contains_column(table.c.name) finally: meta.drop_all(testing.db) - + def test_tometadata_with_schema(self): meta = MetaData() @@ -314,7 +314,7 @@ class MetaDataTest(TestBase, ComparesTables): Column('data2', Integer), ) Index('multi',table.c.data1,table.c.data2), - + meta2 = MetaData() table_c = table.tometadata(meta2) @@ -322,7 +322,7 @@ class MetaDataTest(TestBase, ComparesTables): return [i.name,i.unique] + \ sorted(i.kwargs.items()) + \ i.columns.keys() - + eq_( sorted([_get_key(i) for i in table.indexes]), sorted([_get_key(i) for i in table_c.indexes]) @@ -330,7 +330,7 @@ class MetaDataTest(TestBase, ComparesTables): @emits_warning("Table '.+' already exists within the given MetaData") def test_tometadata_already_there(self): - + meta1 = MetaData() table1 = Table('mytable', meta1, Column('myid', Integer, primary_key=True), @@ -341,7 +341,7 @@ class MetaDataTest(TestBase, ComparesTables): ) meta3 = MetaData() - + table_c = table1.tometadata(meta2) table_d = table2.tometadata(meta2) @@ -384,7 +384,7 @@ class MetaDataTest(TestBase, ComparesTables): c = Table('c', meta, Column('foo', Integer)) d = Table('d', meta, Column('foo', Integer)) e = Table('e', meta, Column('foo', Integer)) - + e.add_is_dependent_on(c) a.add_is_dependent_on(b) b.add_is_dependent_on(d) @@ -394,7 +394,7 @@ class MetaDataTest(TestBase, ComparesTables): meta.sorted_tables, [d, b, a, c, e] ) - + def test_tometadata_strip_schema(self): meta = MetaData() @@ -433,7 +433,7 @@ class TableTest(TestBase, AssertsCompiledSQL): table1 = Table("temporary_table_1", MetaData(), Column("col1", Integer), prefixes = ["TEMPORARY"]) - + self.assert_compile( schema.CreateTable(table1), "CREATE TEMPORARY TABLE temporary_table_1 (col1 INTEGER)" @@ -480,8 +480,8 @@ class TableTest(TestBase, AssertsCompiledSQL): TypeError, assign ) - - + + class ColumnDefinitionTest(TestBase): """Test Column() construction.""" @@ -570,4 +570,3 @@ class ColumnOptionsTest(TestBase): c.info['bar'] = 'zip' assert c.info['bar'] == 'zip' -
\ No newline at end of file diff --git a/test/sql/test_query.py b/test/sql/test_query.py index 085845bc6..6a9055887 100644 --- a/test/sql/test_query.py +++ b/test/sql/test_query.py @@ -23,7 +23,7 @@ class QueryTest(TestBase): Column('address', String(30)), test_needs_acid=True ) - + users2 = Table('u2', metadata, Column('user_id', INT, primary_key = True), Column('user_name', VARCHAR(20)), @@ -47,7 +47,7 @@ class QueryTest(TestBase): def test_insert_heterogeneous_params(self): """test that executemany parameters are asserted to match the parameter set of the first.""" - + assert_raises_message(exc.InvalidRequestError, "A value is required for bind parameter 'user_name', in parameter group 2", users.insert().execute, @@ -87,10 +87,10 @@ class QueryTest(TestBase): comp = ins.compile(engine, column_keys=list(values)) if not set(values).issuperset(c.key for c in table.primary_key): assert comp.returning - + result = engine.execute(table.insert(), **values) ret = values.copy() - + for col, id in zip(table.primary_key, result.inserted_primary_key): ret[col.key] = id @@ -104,7 +104,7 @@ class QueryTest(TestBase): if testing.against('firebird', 'postgresql', 'oracle', 'mssql'): assert testing.db.dialect.implicit_returning - + if testing.db.dialect.implicit_returning: test_engines = [ engines.testing_engine(options={'implicit_returning':False}), @@ -112,7 +112,7 @@ class QueryTest(TestBase): ] else: test_engines = [testing.db] - + for engine in test_engines: metadata = MetaData() for supported, table, values, assertvalues in [ @@ -208,14 +208,14 @@ class QueryTest(TestBase): ] else: test_engines = [testing.db] - + for engine in test_engines: - + r = engine.execute(users.insert(), {'user_name':'jack'}, ) assert r.closed - + def test_row_iteration(self): users.insert().execute( {'user_id':7, 'user_name':'jack'}, @@ -245,25 +245,25 @@ class QueryTest(TestBase): @testing.fails_on('firebird', "kinterbasdb doesn't send full type information") def test_order_by_label(self): """test that a label within an ORDER BY works on each backend. - + This test should be modified to support [ticket:1068] when that ticket is implemented. For now, you need to put the actual string in the ORDER BY. - + """ users.insert().execute( {'user_id':7, 'user_name':'jack'}, {'user_id':8, 'user_name':'ed'}, {'user_id':9, 'user_name':'fred'}, ) - + concat = ("test: " + users.c.user_name).label('thedata') print select([concat]).order_by("thedata") eq_( select([concat]).order_by("thedata").execute().fetchall(), [("test: ed",), ("test: fred",), ("test: jack",)] ) - + eq_( select([concat]).order_by("thedata").execute().fetchall(), [("test: ed",), ("test: fred",), ("test: jack",)] @@ -285,8 +285,8 @@ class QueryTest(TestBase): [("test: ed",), ("test: fred",), ("test: jack",)] ) go() - - + + def test_row_comparison(self): users.insert().execute(user_id = 7, user_name = 'jack') rp = users.select().execute().first() @@ -311,10 +311,10 @@ class QueryTest(TestBase): for pickle in False, True: for use_labels in False, True: result = users.select(use_labels=use_labels).order_by(users.c.user_id).execute().fetchall() - + if pickle: result = util.pickle.loads(util.pickle.dumps(result)) - + eq_( result, [(7, "jack"), (8, "ed"), (9, "fred")] @@ -325,27 +325,27 @@ class QueryTest(TestBase): else: eq_(result[0]['user_id'], 7) eq_(result[0].keys(), ["user_id", "user_name"]) - + eq_(result[0][0], 7) eq_(result[0][users.c.user_id], 7) eq_(result[0][users.c.user_name], 'jack') - + if use_labels: assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.user_id]) else: # test with a different table. name resolution is # causing 'user_id' to match when use_labels wasn't used. eq_(result[0][addresses.c.user_id], 7) - + assert_raises(exc.NoSuchColumnError, lambda: result[0]['fake key']) assert_raises(exc.NoSuchColumnError, lambda: result[0][addresses.c.address_id]) - + @testing.requires.boolean_col_expressions def test_or_and_as_columns(self): true, false = literal(True), literal(False) - + eq_(testing.db.execute(select([and_(true, false)])).scalar(), False) eq_(testing.db.execute(select([and_(true, true)])).scalar(), True) eq_(testing.db.execute(select([or_(true, false)])).scalar(), True) @@ -359,7 +359,7 @@ class QueryTest(TestBase): row = testing.db.execute(select([or_(true, false).label("x"), and_(true, false).label("y")])).first() assert row.x == True assert row.y == False - + def test_fetchmany(self): users.insert().execute(user_id = 7, user_name = 'jack') users.insert().execute(user_id = 8, user_name = 'ed') @@ -394,7 +394,7 @@ class QueryTest(TestBase): ), [(5,)]), ): eq_(expr.execute().fetchall(), result) - + @testing.fails_on("firebird", "see dialect.test_firebird:MiscTest.test_percents_in_text") @testing.fails_on("oracle", "neither % nor %% are accepted") @testing.fails_on("informix", "neither % nor %% are accepted") @@ -410,7 +410,7 @@ class QueryTest(TestBase): (text("select 'hello % world'"), "hello % world") ): eq_(testing.db.scalar(expr), result) - + def test_ilike(self): users.insert().execute( {'user_id':1, 'user_name':'one'}, @@ -642,7 +642,7 @@ class QueryTest(TestBase): self.assert_(r[0:1] == (1,)) self.assert_(r[1:] == (2, 'foo@bar.com')) self.assert_(r[:-1] == (1, 2)) - + def test_column_accessor(self): users.insert().execute(user_id=1, user_name='john') users.insert().execute(user_id=2, user_name='jack') @@ -651,11 +651,11 @@ class QueryTest(TestBase): r = users.select(users.c.user_id==2).execute().first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - + r = text("select * from query_users where user_id=2", bind=testing.db).execute().first() self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2) self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack') - + # test a little sqlite weirdness - with the UNION, # cols come back as "query_users.user_id" in cursor.description r = text("select query_users.user_id, query_users.user_name from query_users " @@ -682,14 +682,14 @@ class QueryTest(TestBase): users.insert(), {'user_id':1, 'user_name':'ed'} ) - + eq_(r.lastrowid, 1) - - + + def test_graceful_fetch_on_non_rows(self): """test that calling fetchone() etc. on a result that doesn't return rows fails gracefully. - + """ # these proxies don't work with no cursor.description present. @@ -709,7 +709,7 @@ class QueryTest(TestBase): getattr(result, meth), ) trans.rollback() - + def test_no_inserted_pk_on_non_insert(self): result = testing.db.execute("select * from query_users") assert_raises_message( @@ -717,7 +717,7 @@ class QueryTest(TestBase): r"Statement is not an insert\(\) expression construct.", getattr, result, 'inserted_primary_key' ) - + @testing.requires.returning def test_no_inserted_pk_on_returning(self): result = testing.db.execute(users.insert().returning(users.c.user_id, users.c.user_name)) @@ -726,7 +726,7 @@ class QueryTest(TestBase): r"Can't call inserted_primary_key when returning\(\) is used.", getattr, result, 'inserted_primary_key' ) - + def test_fetchone_til_end(self): result = testing.db.execute("select * from query_users") eq_(result.fetchone(), None) @@ -738,17 +738,17 @@ class QueryTest(TestBase): def test_result_case_sensitivity(self): """test name normalization for result sets.""" - + row = testing.db.execute( select([ literal_column("1").label("case_insensitive"), literal_column("2").label("CaseSensitive") ]) ).first() - + assert row.keys() == ["case_insensitive", "CaseSensitive"] - + def test_row_as_args(self): users.insert().execute(user_id=1, user_name='john') r = users.select(users.c.user_id==1).execute().first() @@ -761,12 +761,12 @@ class QueryTest(TestBase): r = users.select().execute() users2.insert().execute(list(r)) assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')] - + users2.delete().execute() r = users.select().execute() users2.insert().execute(*list(r)) assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')] - + def test_ambiguous_column(self): users.insert().execute(user_id=1, user_name='john') r = users.outerjoin(addresses).select().execute().first() @@ -782,7 +782,7 @@ class QueryTest(TestBase): "Ambiguous column name", lambda: r['user_id'] ) - + result = users.outerjoin(addresses).select().execute() result = base.BufferedColumnResultProxy(result.context) r = result.first() @@ -821,7 +821,7 @@ class QueryTest(TestBase): users.insert().execute(user_id=1, user_name='foo') r = users.select().execute().first() eq_(len(r), 2) - + r = testing.db.execute('select user_name, user_id from query_users').first() eq_(len(r), 2) r = testing.db.execute('select user_name from query_users').first() @@ -924,10 +924,10 @@ class QueryTest(TestBase): "uses sql-92 rules") def test_bind_in(self): """test calling IN against a bind parameter. - + this isn't allowed on several platforms since we generate ? = ?. - + """ users.insert().execute(user_id = 7, user_name = 'jack') users.insert().execute(user_id = 8, user_name = 'fred') @@ -940,7 +940,7 @@ class QueryTest(TestBase): assert len(r) == 3 r = s.execute(search_key=None).fetchall() assert len(r) == 0 - + @testing.emits_warning('.*empty sequence.*') @testing.fails_on('firebird', 'uses sql-92 bind rules') def test_literal_in(self): @@ -953,15 +953,15 @@ class QueryTest(TestBase): s = users.select(not_(literal("john").in_([]))) r = s.execute().fetchall() assert len(r) == 3 - - + + @testing.emits_warning('.*empty sequence.*') @testing.requires.boolean_col_expressions def test_in_filtering_advanced(self): """test the behavior of the in_() function when comparing against an empty collection, specifically that a proper boolean value is generated. - + """ users.insert().execute(user_id = 7, user_name = 'jack') @@ -980,11 +980,11 @@ class QueryTest(TestBase): class PercentSchemaNamesTest(TestBase): """tests using percent signs, spaces in table and column names. - + Doesn't pass for mysql, postgresql, but this is really a SQLAlchemy bug - we should be escaping out %% signs for this operation the same way we do for text() and column labels. - + """ @classmethod @@ -999,11 +999,11 @@ class PercentSchemaNamesTest(TestBase): def teardown(self): percent_table.delete().execute() - + @classmethod def teardown_class(cls): metadata.drop_all() - + def test_single_roundtrip(self): percent_table.insert().execute( {'percent%':5, 'spaces % more spaces':12}, @@ -1018,7 +1018,7 @@ class PercentSchemaNamesTest(TestBase): {'percent%':11, 'spaces % more spaces':9}, ) self._assert_table() - + @testing.crashes('mysql+mysqldb', 'MySQLdb handles executemany() inconsistently vs. execute()') def test_executemany_roundtrip(self): percent_table.insert().execute( @@ -1030,7 +1030,7 @@ class PercentSchemaNamesTest(TestBase): {'percent%':11, 'spaces % more spaces':9}, ) self._assert_table() - + def _assert_table(self): for table in (percent_table, percent_table.alias()): eq_( @@ -1073,9 +1073,9 @@ class PercentSchemaNamesTest(TestBase): (11, 15) ] ) - - - + + + class LimitTest(TestBase): @classmethod @@ -1106,7 +1106,7 @@ class LimitTest(TestBase): addresses.insert().execute(address_id=6, user_id=6, address='addr5') users.insert().execute(user_id=7, user_name='fido') addresses.insert().execute(address_id=7, user_id=7, address='addr5') - + @classmethod def teardown_class(cls): metadata.drop_all() @@ -1189,11 +1189,11 @@ class CompoundTest(TestBase): dict(col2="t3col2r2", col3="bbb", col4="aaa"), dict(col2="t3col2r3", col3="ccc", col4="bbb"), ]) - + @engines.close_first def teardown(self): pass - + @classmethod def teardown_class(cls): metadata.drop_all() @@ -1274,13 +1274,13 @@ class CompoundTest(TestBase): """like test_union_all, but breaks the sub-union into a subquery with an explicit column reference on the outside, more palatable to a wider variety of engines. - + """ u = union( select([t1.c.col3]), select([t1.c.col3]), ).alias() - + e = union_all( select([t1.c.col3]), select([u.c.col3]) @@ -1327,7 +1327,7 @@ class CompoundTest(TestBase): def test_except_style2(self): # same as style1, but add alias().select() to the except_(). # sqlite can handle it now. - + e = except_(union( select([t1.c.col3, t1.c.col4]), select([t2.c.col3, t2.c.col4]), @@ -1368,7 +1368,7 @@ class CompoundTest(TestBase): select([t3.c.col3], t3.c.col3 == 'ccc'), #ccc ).alias().select() ) - + eq_(e.execute().fetchall(), [('ccc',)]) eq_( e.alias().select().execute().fetchall(), @@ -1409,7 +1409,7 @@ class CompoundTest(TestBase): found = self._fetchall_sorted(u.execute()) eq_(found, wanted) - + @testing.requires.intersect def test_intersect_unions_3(self): u = intersect( @@ -1733,7 +1733,7 @@ class OperatorTest(TestBase): @classmethod def teardown_class(cls): metadata.drop_all() - + # TODO: seems like more tests warranted for this setup. def test_modulo(self): diff --git a/test/sql/test_quote.py b/test/sql/test_quote.py index e880388d7..2aa5086c6 100644 --- a/test/sql/test_quote.py +++ b/test/sql/test_quote.py @@ -96,7 +96,7 @@ class QuoteTest(TestBase, AssertsCompiledSQL): '''SELECT 1 FROM (SELECT "foo"."t1"."col1" AS "col1" FROM '''\ '''"foo"."t1") AS anon WHERE anon."col1" = :col1_1''' ) - + metadata = MetaData() t1 = Table('TableOne', metadata, Column('ColumnOne', Integer, quote=False), quote=False, schema="FooBar", quote_schema=False) @@ -105,7 +105,7 @@ class QuoteTest(TestBase, AssertsCompiledSQL): self.assert_compile(t1.select().apply_labels(), "SELECT FooBar.TableOne.ColumnOne AS "\ "FooBar_TableOne_ColumnOne FROM FooBar.TableOne" # TODO: is this what we really want here ? what if table/schema - # *are* quoted? + # *are* quoted? ) a = t1.select().alias('anon') diff --git a/test/sql/test_returning.py b/test/sql/test_returning.py index 632a739f1..2a8e6fc2f 100644 --- a/test/sql/test_returning.py +++ b/test/sql/test_returning.py @@ -10,10 +10,10 @@ class ReturningTest(TestBase, AssertsExecutionResults): def setup(self): meta = MetaData(testing.db) global table, GoofyType - + class GoofyType(TypeDecorator): impl = String - + def process_bind_param(self, value, dialect): if value is None: return None @@ -23,7 +23,7 @@ class ReturningTest(TestBase, AssertsExecutionResults): if value is None: return None return value + "BAR" - + table = Table('tables', meta, Column('id', Integer, primary_key=True, test_needs_autoincrement=True), Column('persons', Integer), @@ -31,19 +31,19 @@ class ReturningTest(TestBase, AssertsExecutionResults): Column('goofy', GoofyType(50)) ) table.create(checkfirst=True) - + def teardown(self): table.drop() - + @testing.exclude('firebird', '<', (2, 0), '2.0+ feature') @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature') def test_column_targeting(self): result = table.insert().returning(table.c.id, table.c.full).execute({'persons': 1, 'full': False}) - + row = result.first() assert row[table.c.id] == row['id'] == 1 assert row[table.c.full] == row['full'] == False - + result = table.insert().values(persons=5, full=True, goofy="somegoofy").\ returning(table.c.persons, table.c.full, table.c.goofy).execute() row = result.first() @@ -52,7 +52,7 @@ class ReturningTest(TestBase, AssertsExecutionResults): eq_(row[table.c.goofy], row['goofy']) eq_(row['goofy'], "FOOsomegoofyBAR") - + @testing.fails_on('firebird', "fb can't handle returning x AS y") @testing.exclude('firebird', '<', (2, 0), '2.0+ feature') @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature') @@ -76,7 +76,7 @@ class ReturningTest(TestBase, AssertsExecutionResults): returning(table.c.persons + 18).execute() row = result.first() assert row[0] == 30 - + @testing.exclude('firebird', '<', (2, 1), '2.1+ feature') @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature') def test_update_returning(self): @@ -115,8 +115,8 @@ class ReturningTest(TestBase, AssertsExecutionResults): test_executemany() - - + + @testing.exclude('firebird', '<', (2, 1), '2.1+ feature') @testing.exclude('postgresql', '<', (8, 2), '8.2+ feature') @testing.fails_on_everything_except('postgresql', 'firebird') @@ -164,7 +164,7 @@ class SequenceReturningTest(TestBase): class KeyReturningTest(TestBase, AssertsExecutionResults): """test returning() works with columns that define 'key'.""" - + __unsupported_on__ = ('sqlite', 'mysql', 'maxdb', 'sybase', 'access') def setup(self): @@ -186,8 +186,8 @@ class KeyReturningTest(TestBase, AssertsExecutionResults): result = table.insert().returning(table.c.foo_id).execute(data='somedata') row = result.first() assert row[table.c.foo_id] == row['id'] == 1 - + result = table.select().execute().first() assert row[table.c.foo_id] == row['id'] == 1 - + diff --git a/test/sql/test_rowcount.py b/test/sql/test_rowcount.py index ed40f2801..fc74e8467 100644 --- a/test/sql/test_rowcount.py +++ b/test/sql/test_rowcount.py @@ -4,9 +4,9 @@ from test.lib import * class FoundRowsTest(TestBase, AssertsExecutionResults): """tests rowcount functionality""" - + __requires__ = ('sane_rowcount', ) - + @classmethod def setup_class(cls): global employees_table, metadata diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 77acf896b..243c56dcf 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -31,7 +31,7 @@ class SelectableTest(TestBase, AssertsExecutionResults): def test_indirect_correspondence_on_labels(self): # this test depends upon 'distance' to # get the right result - + # same column three times s = select([table1.c.col1.label('c2'), table1.c.col1, @@ -48,7 +48,7 @@ class SelectableTest(TestBase, AssertsExecutionResults): def test_direct_correspondence_on_labels(self): # this test depends on labels being part # of the proxy set to get the right result - + l1, l2 = table1.c.col1.label('foo'), table1.c.col1.label('bar') sel = select([l1, l2]) @@ -85,20 +85,20 @@ class SelectableTest(TestBase, AssertsExecutionResults): j2 = jjj.alias('foo') assert j2.corresponding_column(table1.c.col1) \ is j2.c.table1_col1 - + def test_against_cloned_non_table(self): # test that corresponding column digs across # clone boundaries with anonymous labeled elements col = func.count().label('foo') sel = select([col]) - + sel2 = visitors.ReplacingCloningVisitor().traverse(sel) assert sel2.corresponding_column(col) is sel2.c.foo sel3 = visitors.ReplacingCloningVisitor().traverse(sel2) assert sel3.corresponding_column(col) is sel3.c.foo - + def test_select_on_table(self): sel = select([table1, table2], use_labels=True) @@ -153,22 +153,22 @@ class SelectableTest(TestBase, AssertsExecutionResults): def test_union_precedence(self): # conflicting column correspondence should be resolved based on # the order of the select()s in the union - + s1 = select([table1.c.col1, table1.c.col2]) s2 = select([table1.c.col2, table1.c.col1]) s3 = select([table1.c.col3, table1.c.colx]) s4 = select([table1.c.colx, table1.c.col3]) - + u1 = union(s1, s2) assert u1.corresponding_column(table1.c.col1) is u1.c.col1 assert u1.corresponding_column(table1.c.col2) is u1.c.col2 - + u1 = union(s1, s2, s3, s4) assert u1.corresponding_column(table1.c.col1) is u1.c.col1 assert u1.corresponding_column(table1.c.col2) is u1.c.col2 assert u1.corresponding_column(table1.c.colx) is u1.c.col2 assert u1.corresponding_column(table1.c.col3) is u1.c.col1 - + def test_singular_union(self): u = union(select([table1.c.col1, table1.c.col2, @@ -254,13 +254,13 @@ class SelectableTest(TestBase, AssertsExecutionResults): j = join(a, table2) criterion = a.c.acol1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) - + def test_labeled_select_correspoinding(self): l1 = select([func.max(table1.c.col1)]).label('foo') s = select([l1]) eq_(s.corresponding_column(l1), s.c.foo) - + s = select([table1.c.col1, l1]) eq_(s.corresponding_column(l1), s.c.foo) @@ -300,7 +300,7 @@ class SelectableTest(TestBase, AssertsExecutionResults): s = select([t2, t3], use_labels=True) assert_raises(exc.NoReferencedTableError, s.join, t1) - + def test_join_condition(self): m = MetaData() @@ -326,7 +326,7 @@ class SelectableTest(TestBase, AssertsExecutionResults): ]: assert expected.compare(sql_util.join_condition(left, right, a_subset=a_subset)) - + # these are ambiguous, or have no joins for left, right, a_subset in [ (t1t2, t3, None), @@ -339,7 +339,7 @@ class SelectableTest(TestBase, AssertsExecutionResults): sql_util.join_condition, left, right, a_subset=a_subset ) - + als = t2t3.alias() # test join's behavior, including natural for left, right, expected in [ @@ -370,7 +370,7 @@ class SelectableTest(TestBase, AssertsExecutionResults): "Perhaps you meant to convert the right " "side to a subquery using alias\(\)\?", t1t2.join, t2t3.select(use_labels=True)) - + class PrimaryKeyTest(TestBase, AssertsExecutionResults): @@ -488,7 +488,7 @@ class ReduceTest(TestBase, AssertsExecutionResults): t3 = Table('t3', meta, Column('t3id', Integer, ForeignKey('t2.t2id'), primary_key=True), Column('t3data', String(30))) - + eq_(util.column_set(sql_util.reduce_columns([ t1.c.t1id, t1.c.t1data, @@ -498,7 +498,7 @@ class ReduceTest(TestBase, AssertsExecutionResults): t3.c.t3data, ])), util.column_set([t1.c.t1id, t1.c.t1data, t2.c.t2data, t3.c.t3data])) - + def test_reduce_selectable(self): metadata = MetaData() @@ -514,7 +514,7 @@ class ReduceTest(TestBase, AssertsExecutionResults): eq_(util.column_set(sql_util.reduce_columns(list(s.c), s)), util.column_set([s.c.engineer_id, s.c.engineer_name, s.c.manager_id])) - + def test_reduce_aliased_join(self): metadata = MetaData() @@ -543,7 +543,7 @@ class ReduceTest(TestBase, AssertsExecutionResults): eq_(util.column_set(sql_util.reduce_columns([pjoin.c.people_person_id, pjoin.c.engineers_person_id, pjoin.c.managers_person_id])), util.column_set([pjoin.c.people_person_id])) - + def test_reduce_aliased_union(self): metadata = MetaData() @@ -567,7 +567,7 @@ class ReduceTest(TestBase, AssertsExecutionResults): item_join.c.dummy, item_join.c.child_name])), util.column_set([item_join.c.id, item_join.c.dummy, item_join.c.child_name])) - + def test_reduce_aliased_union_2(self): metadata = MetaData() @@ -597,7 +597,7 @@ class ReduceTest(TestBase, AssertsExecutionResults): select_from( page_table.join(magazine_page_table). join(classified_page_table)), - + select([ page_table.c.id, magazine_page_table.c.page_id, @@ -622,7 +622,7 @@ class ReduceTest(TestBase, AssertsExecutionResults): cast(null(), Integer).label('magazine_page_id') ]). select_from(page_table.join(magazine_page_table)), - + select([ page_table.c.id, magazine_page_table.c.page_id, @@ -684,7 +684,7 @@ class AnnotationsTest(TestBase): def __init__(self): Column.__init__(self, 'foo', Integer) _constructor = Column - + t1 = Table('t1', MetaData(), MyColumn()) s1 = t1.select() assert isinstance(t1.c.foo, MyColumn) @@ -695,14 +695,14 @@ class AnnotationsTest(TestBase): assert isinstance(s2.c.foo, Column) annot_2 = s1._annotate({}) assert isinstance(annot_2.c.foo, Column) - + def test_annotated_corresponding_column(self): table1 = table('table1', column("col1")) - + s1 = select([table1.c.col1]) t1 = s1._annotate({}) t2 = s1 - + # t1 needs to share the same _make_proxy() columns as t2, even # though it's annotated. otherwise paths will diverge once they # are corresponded against "inner" below. @@ -723,12 +723,12 @@ class AnnotationsTest(TestBase): def test_annotated_visit(self): table1 = table('table1', column("col1"), column("col2")) - + bin = table1.c.col1 == bindparam('foo', value=None) assert str(bin) == "table1.col1 = :foo" def visit_binary(b): b.right = table1.c.col2 - + b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary}) assert str(b2) == "table1.col1 = table1.col2" @@ -739,13 +739,13 @@ class AnnotationsTest(TestBase): def visit_binary(b): b.left = bindparam('bar') - + b4 = visitors.cloned_traverse(b2, {}, {'binary':visit_binary}) assert str(b4) == ":bar = table1.col2" b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary}) assert str(b5) == ":bar = table1.col2" - + def test_annotate_expressions(self): table1 = table('table1', column('col1'), column('col2')) @@ -760,10 +760,10 @@ class AnnotationsTest(TestBase): eq_(str(sql_util._deep_annotate(expr, {})), expected) eq_(str(sql_util._deep_annotate(expr, {}, exclude=[table1.c.col1])), expected) - + def test_deannotate(self): table1 = table('table1', column("col1"), column("col2")) - + bin = table1.c.col1 == bindparam('foo', value=None) b2 = sql_util._deep_annotate(bin, {'_orm_adapt':True}) @@ -772,7 +772,7 @@ class AnnotationsTest(TestBase): for elem in (b2._annotations, b2.left._annotations): assert '_orm_adapt' in elem - + for elem in b3._annotations, b3.left._annotations, \ b4._annotations, b4.left._annotations: assert elem == {} @@ -781,4 +781,4 @@ class AnnotationsTest(TestBase): assert b3.left is not b2.left is not bin.left assert b4.left is bin.left # since column is immutable assert b4.right is not bin.right is not b2.right is not b3.right - + diff --git a/test/sql/test_types.py b/test/sql/test_types.py index 700f7b7de..1665e35c8 100644 --- a/test/sql/test_types.py +++ b/test/sql/test_types.py @@ -22,11 +22,11 @@ class AdaptTest(TestBase): for d in dialects.__all__ if not d.startswith('_') ] - + def _all_dialects(self): return [d.base.dialect() for d in self._all_dialect_modules()] - + def _all_types(self): def types_for_mod(mod): for key in dir(mod): @@ -34,24 +34,24 @@ class AdaptTest(TestBase): if not isinstance(typ, type) or not issubclass(typ, types.TypeEngine): continue yield typ - + for typ in types_for_mod(types): yield typ for dialect in self._all_dialect_modules(): for typ in types_for_mod(dialect): yield typ - + def test_uppercase_rendering(self): """Test that uppercase types from types.py always render as their type. - + As of SQLA 0.6, using an uppercase type means you want specifically that type. If the database in use doesn't support that DDL, it (the DB backend) should raise an error - it means you should be using a lowercased (genericized) type. - + """ - + for dialect in self._all_dialects(): for type_, expected in ( (FLOAT, "FLOAT"), @@ -77,29 +77,29 @@ class AdaptTest(TestBase): compiled = types.to_instance(type_).\ compile(dialect=dialect) - + assert compiled in expected, \ "%r matches none of %r for dialect %s" % \ (compiled, expected, dialect.name) - + assert str(types.to_instance(type_)) in expected, \ "default str() of type %r not expected, %r" % \ (type_, expected) - + @testing.uses_deprecated() def test_adapt_method(self): """ensure all types have a working adapt() method, - which creates a distinct copy. - + which creates a distinct copy. + The distinct copy ensures that when we cache the adapted() form of a type against the original in a weak key dictionary, a cycle is not formed. - + This test doesn't test type-specific arguments of adapt() beyond their defaults. - + """ - + for typ in self._all_types(): if typ in (types.TypeDecorator, types.TypeEngine): continue @@ -117,8 +117,8 @@ class AdaptTest(TestBase): if k == 'impl': continue eq_(getattr(t2, k), t1.__dict__[k]) - - + + class TypeAffinityTest(TestBase): def test_type_affinity(self): for type_, affin in [ @@ -128,7 +128,7 @@ class TypeAffinityTest(TestBase): (LargeBinary(), types._Binary) ]: eq_(type_._type_affinity, affin) - + for t1, t2, comp in [ (Integer(), SmallInteger(), True), (Integer(), String(), False), @@ -169,7 +169,7 @@ class PickleMetadataTest(TestBase): Table('foo', meta, column_type) ct = loads(dumps(column_type)) mt = loads(dumps(meta)) - + class UserDefinedTest(TestBase, AssertsCompiledSQL): """tests user-defined types.""" @@ -210,37 +210,37 @@ class UserDefinedTest(TestBase, AssertsCompiledSQL): ]: for dialect_ in (dialects.postgresql, dialects.mssql, dialects.mysql): dialect_ = dialect_.dialect() - + raw_impl = types.to_instance(impl_, **kw) - + class MyType(types.TypeDecorator): impl = impl_ - + dec_type = MyType(**kw) - + eq_(dec_type.impl.__class__, raw_impl.__class__) - + raw_dialect_impl = raw_impl.dialect_impl(dialect_) dec_dialect_impl = dec_type.dialect_impl(dialect_) eq_(dec_dialect_impl.__class__, MyType) eq_(raw_dialect_impl.__class__ , dec_dialect_impl.impl.__class__) - + self.assert_compile( MyType(**kw), exp, dialect=dialect_ ) - + def test_user_defined_typedec_impl(self): class MyType(types.TypeDecorator): impl = Float - + def load_dialect_impl(self, dialect): if dialect.name == 'sqlite': return String(50) else: return super(MyType, self).load_dialect_impl(dialect) - + sl = dialects.sqlite.dialect() pg = dialects.postgresql.dialect() t = MyType() @@ -254,35 +254,35 @@ class UserDefinedTest(TestBase, AssertsCompiledSQL): t.dialect_impl(dialect=pg).impl.__class__, Float().dialect_impl(pg).__class__ ) - + @testing.provide_metadata def test_type_coerce(self): """test ad-hoc usage of custom types with type_coerce().""" - + class MyType(types.TypeDecorator): impl = String def process_bind_param(self, value, dialect): return value[0:-8] - + def process_result_value(self, value, dialect): return value + "BIND_OUT" - + t = Table('t', metadata, Column('data', String(50))) metadata.create_all() - + t.insert().values(data=type_coerce('d1BIND_OUT',MyType)).execute() eq_( select([type_coerce(t.c.data, MyType)]).execute().fetchall(), [('d1BIND_OUT', )] ) - + eq_( select([t.c.data, type_coerce(t.c.data, MyType)]).execute().fetchall(), [('d1', 'd1BIND_OUT')] ) - + eq_( select([t.c.data, type_coerce(t.c.data, MyType)]).\ where(type_coerce(t.c.data, MyType) == 'd1BIND_OUT').\ @@ -310,7 +310,7 @@ class UserDefinedTest(TestBase, AssertsCompiledSQL): execute().fetchall(), [] ) - + @classmethod def setup_class(cls): global users, metadata @@ -432,7 +432,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults): Column('unicode_text', UnicodeText), ) metadata.create_all() - + @classmethod def teardown_class(cls): metadata.drop_all() @@ -440,26 +440,26 @@ class UnicodeTest(TestBase, AssertsExecutionResults): @engines.close_first def teardown(self): unicode_table.delete().execute() - + def test_native_unicode(self): """assert expected values for 'native unicode' mode""" - + if \ (testing.against('mssql+pyodbc') and not testing.db.dialect.freetds): assert testing.db.dialect.returns_unicode_strings == 'conditional' return - + if testing.against('mssql+pymssql'): assert testing.db.dialect.returns_unicode_strings == ('charset' in testing.db.url.query) return - + assert testing.db.dialect.returns_unicode_strings == \ ((testing.db.name, testing.db.driver) in \ ( ('postgresql','psycopg2'), ('postgresql','pypostgresql'), ('postgresql','pg8000'), - ('postgresql','zxjdbc'), + ('postgresql','zxjdbc'), ('mysql','oursql'), ('mysql','zxjdbc'), ('mysql','mysqlconnector'), @@ -471,14 +471,14 @@ class UnicodeTest(TestBase, AssertsExecutionResults): (testing.db.name, testing.db.driver, testing.db.dialect.returns_unicode_strings) - + def test_round_trip(self): unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, "\ u"quand une drôle de petit voix m’a réveillé. Elle "\ u"disait: « S’il vous plaît… dessine-moi un mouton! »" - + unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata) - + x = unicode_table.select().execute().first() assert isinstance(x['unicode_varchar'], unicode) assert isinstance(x['unicode_text'], unicode) @@ -488,7 +488,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults): def test_round_trip_executemany(self): # cx_oracle was producing different behavior for cursor.executemany() # vs. cursor.execute() - + unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\ u"une drôle de petit voix m’a réveillé. "\ u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »" @@ -512,12 +512,12 @@ class UnicodeTest(TestBase, AssertsExecutionResults): u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »" unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata) - + x = union( select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar]) ).execute().first() - + assert isinstance(x['unicode_varchar'], unicode) eq_(x['unicode_varchar'], unicodedata) @@ -529,13 +529,13 @@ class UnicodeTest(TestBase, AssertsExecutionResults): def test_unicode_warnings(self): """test the warnings raised when SQLA must coerce unicode binds, *and* is using the Unicode type. - + """ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\ u"une drôle de petit voix m’a réveillé. "\ u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »" - + # using Unicode explicly - warning should be emitted u = Unicode() uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect) @@ -557,14 +557,14 @@ class UnicodeTest(TestBase, AssertsExecutionResults): assert_raises(exc.SAWarning, uni, 'x') assert isinstance(uni(unicodedata), str) # end Py2K - + eq_(uni(unicodedata), unicodedata.encode('utf-8')) - + # using convert unicode at engine level - # this should not be raising a warning unicode_engine = engines.utf8_engine(options={'convert_unicode':True,}) unicode_engine.dialect.supports_unicode_binds = False - + s = String() uni = s.dialect_impl(unicode_engine.dialect).bind_processor(unicode_engine.dialect) # this is not the unicode type - no warning @@ -575,36 +575,36 @@ class UnicodeTest(TestBase, AssertsExecutionResults): uni('x') assert isinstance(uni(unicodedata), str) # end Py2K - + eq_(uni(unicodedata), unicodedata.encode('utf-8')) - + @testing.fails_if( lambda: testing.db_spec("postgresql+pg8000")(testing.db) and util.py3k, "pg8000 appropriately does not accept 'bytes' for a VARCHAR column." ) def test_ignoring_unicode_error(self): """checks String(unicode_error='ignore') is passed to underlying codec.""" - + unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\ u"une drôle de petit voix m’a réveillé. "\ u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »" - + asciidata = unicodedata.encode('ascii', 'ignore') - + m = MetaData() table = Table('unicode_err_table', m, Column('sort', Integer), Column('plain_varchar_no_coding_error', \ String(248, convert_unicode='force', unicode_error='ignore')) ) - + m2 = MetaData() utf8_table = Table('unicode_err_table', m2, Column('sort', Integer), Column('plain_varchar_no_coding_error', \ String(248, convert_unicode=True)) ) - + engine = engines.testing_engine(options={'encoding':'ascii'}) m.create_all(engine) try: @@ -619,7 +619,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults): # switch to utf-8 engine.dialect.encoding = 'utf-8' from binascii import hexlify - + # the row that we put in was stored as hexlified ascii row = engine.execute(utf8_table.select()).first() x = row['plain_varchar_no_coding_error'] @@ -629,7 +629,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults): a = hexlify(x) b = hexlify(asciidata) eq_(a, b) - + # insert another row which will be stored with # utf-8 only chars engine.execute( @@ -649,7 +649,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults): ascii_row = result.fetchone() utf8_row = result.fetchone() result.close() - + x = ascii_row['plain_varchar_no_coding_error'] # on python3 "x" comes back as string (i.e. unicode), # hexlify requires bytes @@ -671,7 +671,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults): else: a = hexlify(x) eq_(a, b) - + finally: m.drop_all(engine) @@ -692,11 +692,11 @@ class EnumTest(TestBase): ) metadata.create_all() - + def teardown(self): enum_table.delete().execute() non_native_enum_table.delete().execute() - + @classmethod def teardown_class(cls): metadata.drop_all() @@ -713,7 +713,7 @@ class EnumTest(TestBase): {'id':2, 'someenum':'two'}, {'id':3, 'someenum':'one'}, ]) - + eq_( enum_table.select().order_by(enum_table.c.id).execute().fetchall(), [ @@ -739,7 +739,7 @@ class EnumTest(TestBase): (3, 'one'), ] ) - + def test_adapt(self): from sqlalchemy.dialects.postgresql import ENUM e1 = Enum('one','two','three', native_enum=False) @@ -749,7 +749,7 @@ class EnumTest(TestBase): e1 = Enum('one','two','three', name='foo', schema='bar') eq_(e1.adapt(ENUM).name, 'foo') eq_(e1.adapt(ENUM).schema, 'bar') - + @testing.fails_on('mysql+mysqldb', "MySQL seems to issue a 'data truncated' warning.") def test_constraint(self): assert_raises(exc.DBAPIError, @@ -764,7 +764,7 @@ class EnumTest(TestBase): non_native_enum_table.insert().execute, {'id':4, 'someenum':'four'} ) - + class BinaryTest(TestBase, AssertsExecutionResults): __excluded_on__ = ( ('mysql', '<', (4, 1, 1)), # screwy varbinary types @@ -786,7 +786,7 @@ class BinaryTest(TestBase, AssertsExecutionResults): if value: value.stuff = 'this is the right stuff' return value - + metadata = MetaData(testing.db) binary_table = Table('binary_table', metadata, Column('primary_id', Integer, primary_key=True, test_needs_autoincrement=True), @@ -855,15 +855,15 @@ class BinaryTest(TestBase, AssertsExecutionResults): 'data, not really known how to make this work') def test_comparison(self): """test that type coercion occurs on comparison for binary""" - + expr = binary_table.c.data == 'foo' assert isinstance(expr.right.type, LargeBinary) - + data = os.urandom(32) binary_table.insert().execute(data=data) eq_(binary_table.select().where(binary_table.c.data==data).alias().count().scalar(), 1) - - + + def load_stream(self, name): f = os.path.join(os.path.dirname(__file__), "..", name) return open(f, mode='rb').read() @@ -886,16 +886,16 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): return process def adapt_operator(self, op): return {operators.add:operators.sub, operators.sub:operators.add}.get(op, op) - + class MyTypeDec(types.TypeDecorator): impl = String - + def process_bind_param(self, value, dialect): return "BIND_IN" + str(value) def process_result_value(self, value, dialect): return value + "BIND_OUT" - + meta = MetaData(testing.db) test_table = Table('test', meta, Column('id', Integer, primary_key=True), @@ -951,14 +951,14 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): expr = test_table.c.bvalue == bindparam("somevalue") eq_(expr.right.type._type_affinity, String) - + eq_( testing.db.execute(test_table.select().where(expr), {"somevalue":"foo"}).fetchall(), [(1, 'somedata', datetime.date(2007, 10, 15), 25, 'BIND_INfooBIND_OUT')] ) - + def test_literal_adapt(self): # literals get typed based on the types dictionary, unless # compatible with the left side type @@ -974,8 +974,8 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): expr = column('foo', CHAR) == "asdf" eq_(expr.right.type.__class__, CHAR) - - + + @testing.fails_on('firebird', 'Data type unknown on the parameter') def test_operator_adapt(self): """test type-based overloading of operators""" @@ -996,7 +996,7 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): assert testing.db.execute(select([expr.label('foo')])).scalar() == 21 expr = test_table.c.avalue + literal(40, type_=MyCustomType) - + # + operator converted to - # value is calculated as: (250 - (40 * 10)) / 10 == -15 assert testing.db.execute(select([expr.label('foo')])).scalar() == -15 @@ -1007,10 +1007,10 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): def test_typedec_operator_adapt(self): expr = test_table.c.bvalue + "hi" - + assert expr.type.__class__ is MyTypeDec assert expr.right.type.__class__ is MyTypeDec - + eq_( testing.db.execute(select([expr.label('foo')])).scalar(), "BIND_INfooBIND_INhiBIND_OUT" @@ -1019,7 +1019,7 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): def test_typedec_righthand_coercion(self): class MyTypeDec(types.TypeDecorator): impl = String - + def process_bind_param(self, value, dialect): return "BIND_IN" + str(value) @@ -1028,34 +1028,34 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): tab = table('test', column('bvalue', MyTypeDec)) expr = tab.c.bvalue + 6 - + self.assert_compile( expr, "test.bvalue || :bvalue_1", use_default_dialect=True ) - + assert expr.type.__class__ is MyTypeDec eq_( testing.db.execute(select([expr.label('foo')])).scalar(), "BIND_INfooBIND_IN6BIND_OUT" ) - - + + def test_bind_typing(self): from sqlalchemy.sql import column - + class MyFoobarType(types.UserDefinedType): pass - + class Foo(object): pass - + # unknown type + integer, right hand bind # is an Integer expr = column("foo", MyFoobarType) + 5 assert expr.right.type._type_affinity is types.Integer - + # untyped bind - it gets assigned MyFoobarType expr = column("foo", MyFoobarType) + bindparam("foo") assert expr.right.type._type_affinity is MyFoobarType @@ -1067,30 +1067,30 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): # coerces to the left expr = column("foo", MyFoobarType) + Foo() assert expr.right.type._type_affinity is MyFoobarType - + # including for non-commutative ops expr = column("foo", MyFoobarType) - Foo() assert expr.right.type._type_affinity is MyFoobarType expr = column("foo", MyFoobarType) - datetime.date(2010, 8, 25) assert expr.right.type._type_affinity is types.Date - + def test_date_coercion(self): from sqlalchemy.sql import column - + expr = column('bar', types.NULLTYPE) - column('foo', types.TIMESTAMP) eq_(expr.type._type_affinity, types.NullType) - + expr = func.sysdate() - column('foo', types.TIMESTAMP) eq_(expr.type._type_affinity, types.Interval) expr = func.current_date() - column('foo', types.TIMESTAMP) eq_(expr.type._type_affinity, types.Interval) - + def test_numerics_coercion(self): from sqlalchemy.sql import column import operator - + for op in ( operator.add, operator.mul, @@ -1114,15 +1114,15 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): str(column('a', types.NullType()) + column('b', types.NullType())), "a + b" ) - + def test_expression_typing(self): expr = column('bar', Integer) - 3 - + eq_(expr.type._type_affinity, Integer) expr = bindparam('bar') + bindparam('foo') eq_(expr.type, types.NULLTYPE) - + def test_distinct(self): s = select([distinct(test_table.c.avalue)]) eq_(testing.db.execute(s).scalar(), 25) @@ -1132,12 +1132,12 @@ class ExpressionTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL): assert distinct(test_table.c.data).type == test_table.c.data.type assert test_table.c.data.distinct().type == test_table.c.data.type - + class CompileTest(TestBase, AssertsCompiledSQL): def test_default_compile(self): """test that the base dialect of the type object is used for default compilation. - + """ for type_, expected in ( (String(), "VARCHAR"), @@ -1207,8 +1207,8 @@ class DateTest(TestBase, AssertsExecutionResults): datetime.time(23, 59, 59, time_micro)), (10, 'colber', None, None, None), ] - - + + fnames = ['user_id', 'user_name', 'user_datetime', 'user_date', 'user_time'] @@ -1300,10 +1300,10 @@ class NumericTest(TestBase): def setup(self): global metadata metadata = MetaData(testing.db) - + def teardown(self): metadata.drop_all() - + @testing.emits_warning(r".*does \*not\* support Decimal objects natively") def _do_test(self, type_, input_, output, filter_ = None): t = Table('t', metadata, Column('x', type_)) @@ -1318,7 +1318,7 @@ class NumericTest(TestBase): #print result #print output eq_(result, output) - + def test_numeric_as_decimal(self): self._do_test( Numeric(precision=8, scale=4), @@ -1354,7 +1354,7 @@ class NumericTest(TestBase): [15.7563], filter_ = lambda n:n is not None and round(n, 5) or None ) - + @testing.fails_on('mssql+pymssql', 'FIXME: improve pymssql dec handling') def test_precision_decimal(self): numbers = set([ @@ -1362,7 +1362,7 @@ class NumericTest(TestBase): decimal.Decimal("0.004354"), decimal.Decimal("900.0"), ]) - + self._do_test( Numeric(precision=18, scale=12), numbers, @@ -1372,12 +1372,12 @@ class NumericTest(TestBase): @testing.fails_on('mssql+pymssql', 'FIXME: improve pymssql dec handling') def test_enotation_decimal(self): """test exceedingly small decimals. - + Decimal reports values with E notation when the exponent is greater than 6. - + """ - + numbers = set([ decimal.Decimal('1E-2'), decimal.Decimal('1E-3'), @@ -1397,7 +1397,7 @@ class NumericTest(TestBase): numbers, numbers ) - + @testing.fails_on("sybase+pyodbc", "Don't know how do get these values through FreeTDS + Sybase") @testing.fails_on("firebird", "Precision must be from 1 to 18") @@ -1417,7 +1417,7 @@ class NumericTest(TestBase): numbers, numbers ) - + @testing.fails_on('sqlite', 'TODO') @testing.fails_on('postgresql+pg8000', 'TODO') @testing.fails_on("firebird", "Precision must be from 1 to 18") @@ -1434,11 +1434,11 @@ class NumericTest(TestBase): numbers, numbers ) - + class NumericRawSQLTest(TestBase): """Test what DBAPIs and dialects return without any typing information supplied at the SQLA level. - + """ def _fixture(self, metadata, type, data): t = Table('t', metadata, @@ -1446,7 +1446,7 @@ class NumericRawSQLTest(TestBase): ) metadata.create_all() t.insert().execute(val=data) - + @testing.fails_on('sqlite', "Doesn't provide Decimal results natively") @testing.provide_metadata def test_decimal_fp(self): @@ -1475,16 +1475,16 @@ class NumericRawSQLTest(TestBase): t = self._fixture(metadata, Float, 46.583) val = testing.db.execute("select val from t").scalar() assert isinstance(val, float) - + # some DBAPIs have unusual float handling if testing.against('oracle+cx_oracle', 'mysql+oursql'): eq_(round_decimal(val, 3), 46.583) else: eq_(val, 46.583) - - - - + + + + class IntervalTest(TestBase, AssertsExecutionResults): @classmethod def setup_class(cls): @@ -1529,8 +1529,8 @@ class IntervalTest(TestBase, AssertsExecutionResults): eq_(row['native_interval'], None) eq_(row['native_interval_args'], None) eq_(row['non_native_interval'], None) - - + + class BooleanTest(TestBase, AssertsExecutionResults): @classmethod def setup_class(cls): @@ -1542,14 +1542,14 @@ class BooleanTest(TestBase, AssertsExecutionResults): Column('unconstrained_value', Boolean(create_constraint=False)), ) bool_table.create() - + @classmethod def teardown_class(cls): bool_table.drop() - + def teardown(self): bool_table.delete().execute() - + def test_boolean(self): bool_table.insert().execute(id=1, value=True) bool_table.insert().execute(id=2, value=False) @@ -1573,11 +1573,11 @@ class BooleanTest(TestBase, AssertsExecutionResults): eq_(res3, [(1, True), (2, False), (3, True), (4, True), (5, True), (6, None)]) - + # ensure we're getting True/False, not just ints assert res3[0][1] is True assert res3[1][1] is False - + @testing.fails_on('mysql', "The CHECK clause is parsed but ignored by all storage engines.") @testing.fails_on('mssql', @@ -1592,11 +1592,11 @@ class BooleanTest(TestBase, AssertsExecutionResults): def test_unconstrained(self): testing.db.execute( "insert into booltest (id, unconstrained_value) values (1, 5)") - + class PickleTest(TestBase): def test_eq_comparison(self): p1 = PickleType() - + for obj in ( {'1':'2'}, pickleable.Bar(5, 6), @@ -1608,7 +1608,7 @@ class PickleTest(TestBase): p1.compare_values, pickleable.BrokenComparable('foo'), pickleable.BrokenComparable('foo')) - + def test_nonmutable_comparison(self): p1 = PickleType() @@ -1618,7 +1618,7 @@ class PickleTest(TestBase): pickleable.OldSchool(10, 11) ): assert p1.compare_values(p1.copy_value(obj), obj) - + class CallableTest(TestBase): @classmethod def setup_class(cls): diff --git a/test/sql/test_unicode.py b/test/sql/test_unicode.py index 2eda4ffa8..d6757caf1 100644 --- a/test/sql/test_unicode.py +++ b/test/sql/test_unicode.py @@ -122,13 +122,13 @@ class EscapesDefaultsTest(testing.TestBase): try: engine = metadata.bind - + # reset the identifier preparer, so that we can force it to cache # a unicode identifier engine.dialect.identifier_preparer = engine.dialect.preparer(engine.dialect) select([column(u'special_col')]).select_from(t1).execute().close() assert isinstance(engine.dialect.identifier_preparer.format_sequence(Sequence('special_col')), unicode) - + # now execute, run the sequence. it should run in u"Special_col.nextid" or similar as # a unicode object; cx_oracle asserts that this is None or a String (postgresql lets it pass thru). # ensure that executioncontext._exec_default() is encoding. |
