diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-07-18 17:40:58 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-07-18 17:40:58 -0400 |
commit | bb5f4392a4ecbcbaf4e34886a65a8bba42e227d5 (patch) | |
tree | 06e392471bc5a7dd866975530333d5a9e74f0757 /test/sql/test_compiler.py | |
parent | 0eb53b2e7936d2b0a17077a922ce1d97f102e38a (diff) | |
download | sqlalchemy-bb5f4392a4ecbcbaf4e34886a65a8bba42e227d5.tar.gz |
- update the flake8 rules again
- apply autopep8 + manual fixes to most of test/sql/
Diffstat (limited to 'test/sql/test_compiler.py')
-rw-r--r-- | test/sql/test_compiler.py | 1831 |
1 files changed, 968 insertions, 863 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index 301cf149c..2b2083bf7 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -16,7 +16,7 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL from sqlalchemy import Integer, String, MetaData, Table, Column, select, \ func, not_, cast, text, tuple_, exists, update, bindparam,\ literal, and_, null, type_coerce, alias, or_, literal_column,\ - Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\ + Float, TIMESTAMP, Numeric, Date, Text, union, except_,\ intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\ over, subquery, case, true import decimal @@ -26,15 +26,15 @@ from sqlalchemy.sql import table, column, label from sqlalchemy.sql.expression import ClauseList, _literal_as_text, HasPrefixes from sqlalchemy.engine import default from sqlalchemy.dialects import mysql, mssql, postgresql, oracle, \ - sqlite, sybase + sqlite, sybase from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql import compiler table1 = table('mytable', - column('myid', Integer), - column('name', String), - column('description', String), -) + column('myid', Integer), + column('name', String), + column('description', String), + ) table2 = table( 'myothertable', @@ -69,25 +69,25 @@ table5 = Table( ) users = table('users', - column('user_id'), - column('user_name'), - column('password'), -) + column('user_id'), + column('user_name'), + column('password'), + ) addresses = table('addresses', - column('address_id'), - column('user_id'), - column('street'), - column('city'), - column('state'), - column('zip') -) + column('address_id'), + column('user_id'), + column('street'), + column('city'), + column('state'), + column('zip') + ) keyed = Table('keyed', metadata, - Column('x', Integer, key='colx'), - Column('y', Integer, key='coly'), - Column('z', Integer), -) + Column('x', Integer, key='colx'), + Column('y', Integer, key='coly'), + Column('z', Integer), + ) class SelectTest(fixtures.TestBase, AssertsCompiledSQL): @@ -111,39 +111,44 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 'columns; use this object directly within a ' 'column-level expression.', lambda: hasattr( - select([table1.c.myid]).as_scalar().self_group(), - 'columns')) + select([table1.c.myid]).as_scalar().self_group(), + 'columns')) assert_raises_message( exc.InvalidRequestError, 'Scalar Select expression has no ' 'columns; use this object directly within a ' 'column-level expression.', lambda: hasattr(select([table1.c.myid]).as_scalar(), - 'columns')) + 'columns')) else: assert not hasattr( - select([table1.c.myid]).as_scalar().self_group(), - 'columns') + select([table1.c.myid]).as_scalar().self_group(), + 'columns') assert not hasattr(select([table1.c.myid]).as_scalar(), 'columns') def test_prefix_constructor(self): class Pref(HasPrefixes): + def _generate(self): return self assert_raises(exc.ArgumentError, - Pref().prefix_with, - "some prefix", not_a_dialect=True - ) + Pref().prefix_with, + "some prefix", not_a_dialect=True + ) def test_table_select(self): self.assert_compile(table1.select(), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable") - self.assert_compile(select([table1, table2]), - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername FROM mytable, " - "myothertable") + self.assert_compile( + select( + [ + table1, + table2]), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername FROM mytable, " + "myothertable") def test_invalid_col_argument(self): assert_raises(exc.ArgumentError, select, table1) @@ -221,11 +226,11 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def test_limit_offset(self): for lim, offset, exp, params in [ (5, 10, "LIMIT :param_1 OFFSET :param_2", - {'param_1':5, 'param_2':10}), - (None, 10, "LIMIT -1 OFFSET :param_1", {'param_1':10}), - (5, None, "LIMIT :param_1", {'param_1':5}), + {'param_1': 5, 'param_2': 10}), + (None, 10, "LIMIT -1 OFFSET :param_1", {'param_1': 10}), + (5, None, "LIMIT :param_1", {'param_1': 5}), (0, 0, "LIMIT :param_1 OFFSET :param_2", - {'param_1':0, 'param_2':0}), + {'param_1': 0, 'param_2': 0}), ]: self.assert_compile( select([1]).limit(lim).offset(offset), @@ -233,19 +238,22 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): checkparams=params ) - - def test_select_precol_compile_ordering(self): s1 = select([column('x')]).select_from('a').limit(5).as_scalar() s2 = select([s1]).limit(10) class MyCompiler(compiler.SQLCompiler): + def get_select_precolumns(self, select): result = "" if select._limit: - result += "FIRST %s " % self.process(literal(select._limit)) + result += "FIRST %s " % self.process( + literal( + select._limit)) if select._offset: - result += "SKIP %s " % self.process(literal(select._offset)) + result += "SKIP %s " % self.process( + literal( + select._offset)) return result def limit_clause(self, select): @@ -262,7 +270,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect=dialect ) - def test_from_subquery(self): """tests placing select statements in the column clause of another select, for the @@ -272,12 +279,12 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( select( [s], - s.c.myid == 7 - ), - "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, " - "mytable.name AS name, mytable.description AS description " - "FROM mytable " - "WHERE mytable.name = :name_1) WHERE myid = :myid_1") + s.c.myid == 7), + "SELECT myid, name, description FROM " + "(SELECT mytable.myid AS myid, " + "mytable.name AS name, mytable.description AS description " + "FROM mytable " + "WHERE mytable.name = :name_1) WHERE myid = :myid_1") sq = select([table1]) self.assert_compile( @@ -307,17 +314,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): ).alias('sq') sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS "\ - "mytable_name, mytable.description AS mytable_description, "\ - "myothertable.otherid AS myothertable_otherid, "\ - "myothertable.othername AS myothertable_othername FROM "\ - "mytable, myothertable WHERE mytable.myid = :myid_1 AND "\ - "myothertable.otherid = mytable.myid" + "mytable_name, mytable.description AS mytable_description, "\ + "myothertable.otherid AS myothertable_otherid, "\ + "myothertable.othername AS myothertable_othername FROM "\ + "mytable, myothertable WHERE mytable.myid = :myid_1 AND "\ + "myothertable.otherid = mytable.myid" self.assert_compile( - sq.select(), - "SELECT sq.mytable_myid, sq.mytable_name, " - "sq.mytable_description, sq.myothertable_otherid, " - "sq.myothertable_othername FROM (%s) AS sq" % sqstring) + sq.select(), + "SELECT sq.mytable_myid, sq.mytable_name, " + "sq.mytable_description, sq.myothertable_otherid, " + "sq.myothertable_othername FROM (%s) AS sq" % sqstring) sq2 = select( [sq], @@ -325,21 +332,21 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): ).alias('sq2') self.assert_compile( - sq2.select(), - "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, " - "sq2.sq_mytable_description, sq2.sq_myothertable_otherid, " - "sq2.sq_myothertable_othername FROM " - "(SELECT sq.mytable_myid AS " - "sq_mytable_myid, sq.mytable_name AS sq_mytable_name, " - "sq.mytable_description AS sq_mytable_description, " - "sq.myothertable_otherid AS sq_myothertable_otherid, " - "sq.myothertable_othername AS sq_myothertable_othername " - "FROM (%s) AS sq) AS sq2" % sqstring) + sq2.select(), + "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, " + "sq2.sq_mytable_description, sq2.sq_myothertable_otherid, " + "sq2.sq_myothertable_othername FROM " + "(SELECT sq.mytable_myid AS " + "sq_mytable_myid, sq.mytable_name AS sq_mytable_name, " + "sq.mytable_description AS sq_mytable_description, " + "sq.myothertable_otherid AS sq_myothertable_otherid, " + "sq.myothertable_othername AS sq_myothertable_othername " + "FROM (%s) AS sq) AS sq2" % sqstring) def test_select_from_clauselist(self): self.assert_compile( select([ClauseList(column('a'), column('b'))] - ).select_from('sometable'), + ).select_from('sometable'), 'SELECT a, b FROM sometable' ) @@ -435,8 +442,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # using alternate keys. a, b, c = Column('a', Integer, key='b'), \ - Column('b', Integer), \ - Column('c', Integer, key='a') + Column('b', Integer), \ + Column('c', Integer, key='a') self.assert_compile( select([a, b, c, a, b, c]), "SELECT a, b, c", dialect=default.DefaultDialect() @@ -445,13 +452,13 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( select([bindparam('a'), bindparam('b'), bindparam('c')]), "SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3", - dialect=default.DefaultDialect(paramstyle='named') + dialect=default.DefaultDialect(paramstyle='named') ) self.assert_compile( select([bindparam('a'), bindparam('b'), bindparam('c')]), "SELECT ? AS anon_1, ? AS anon_2, ? AS anon_3", - dialect=default.DefaultDialect(paramstyle='qmark'), + dialect=default.DefaultDialect(paramstyle='qmark'), ) self.assert_compile( @@ -490,90 +497,103 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): s2 = s1.alias() s3 = select([s2], use_labels=True) self.assert_compile(s3, - "SELECT anon_1.x AS anon_1_x, " - "anon_1.y AS anon_1_y, " - "anon_1.z AS anon_1_z FROM " - "(SELECT keyed.x AS x, keyed.y " - "AS y, keyed.z AS z FROM keyed) AS anon_1") + "SELECT anon_1.x AS anon_1_x, " + "anon_1.y AS anon_1_y, " + "anon_1.z AS anon_1_z FROM " + "(SELECT keyed.x AS x, keyed.y " + "AS y, keyed.z AS z FROM keyed) AS anon_1") s4 = s3.alias() s5 = select([s4], use_labels=True) self.assert_compile(s5, - "SELECT anon_1.anon_2_x AS anon_1_anon_2_x, " - "anon_1.anon_2_y AS anon_1_anon_2_y, " - "anon_1.anon_2_z AS anon_1_anon_2_z " - "FROM (SELECT anon_2.x AS anon_2_x, " - "anon_2.y AS anon_2_y, " - "anon_2.z AS anon_2_z FROM " - "(SELECT keyed.x AS x, keyed.y AS y, keyed.z " - "AS z FROM keyed) AS anon_2) AS anon_1" - ) + "SELECT anon_1.anon_2_x AS anon_1_anon_2_x, " + "anon_1.anon_2_y AS anon_1_anon_2_y, " + "anon_1.anon_2_z AS anon_1_anon_2_z " + "FROM (SELECT anon_2.x AS anon_2_x, " + "anon_2.y AS anon_2_y, " + "anon_2.z AS anon_2_z FROM " + "(SELECT keyed.x AS x, keyed.y AS y, keyed.z " + "AS z FROM keyed) AS anon_2) AS anon_1" + ) def test_exists(self): s = select([table1.c.myid]).where(table1.c.myid == 5) self.assert_compile(exists(s), - "EXISTS (SELECT mytable.myid FROM mytable " - "WHERE mytable.myid = :myid_1)" - ) + "EXISTS (SELECT mytable.myid FROM mytable " + "WHERE mytable.myid = :myid_1)" + ) self.assert_compile(exists(s.as_scalar()), - "EXISTS (SELECT mytable.myid FROM mytable " - "WHERE mytable.myid = :myid_1)" - ) + "EXISTS (SELECT mytable.myid FROM mytable " + "WHERE mytable.myid = :myid_1)" + ) self.assert_compile(exists([table1.c.myid], table1.c.myid - == 5).select(), + == 5).select(), 'SELECT EXISTS (SELECT mytable.myid FROM ' 'mytable WHERE mytable.myid = :myid_1)', params={'mytable_myid': 5}) self.assert_compile(select([table1, exists([1], - from_obj=table2)]), + from_obj=table2)]), 'SELECT mytable.myid, mytable.name, ' 'mytable.description, EXISTS (SELECT 1 ' 'FROM myothertable) FROM mytable', params={}) - self.assert_compile(select([table1, exists([1], - from_obj=table2).label('foo')]), + self.assert_compile(select([table1, + exists([1], + from_obj=table2).label('foo')]), 'SELECT mytable.myid, mytable.name, ' 'mytable.description, EXISTS (SELECT 1 ' 'FROM myothertable) AS foo FROM mytable', params={}) - self.assert_compile(table1.select(exists().where(table2.c.otherid - == table1.c.myid).correlate(table1)), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'EXISTS (SELECT * FROM myothertable WHERE ' - 'myothertable.otherid = mytable.myid)') - self.assert_compile(table1.select(exists().where(table2.c.otherid - == table1.c.myid).correlate(table1)), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'EXISTS (SELECT * FROM myothertable WHERE ' - 'myothertable.otherid = mytable.myid)') - self.assert_compile(table1.select(exists().where(table2.c.otherid - == table1.c.myid).correlate(table1) - ).replace_selectable(table2, - table2.alias()), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'EXISTS (SELECT * FROM myothertable AS ' - 'myothertable_1 WHERE myothertable_1.otheri' - 'd = mytable.myid)') - self.assert_compile(table1.select(exists().where(table2.c.otherid - == table1.c.myid).correlate(table1)).select_from( - table1.join(table2, - table1.c.myid - == table2.c.otherid)).replace_selectable(table2, - table2.alias()), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable JOIN ' - 'myothertable AS myothertable_1 ON ' - 'mytable.myid = myothertable_1.otherid ' - 'WHERE EXISTS (SELECT * FROM myothertable ' - 'AS myothertable_1 WHERE ' - 'myothertable_1.otherid = mytable.myid)') + self.assert_compile( + table1.select( + exists().where( + table2.c.otherid == table1.c.myid).correlate(table1)), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'EXISTS (SELECT * FROM myothertable WHERE ' + 'myothertable.otherid = mytable.myid)') + self.assert_compile( + table1.select( + exists().where( + table2.c.otherid == table1.c.myid).correlate(table1)), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'EXISTS (SELECT * FROM myothertable WHERE ' + 'myothertable.otherid = mytable.myid)') + self.assert_compile( + table1.select( + exists().where( + table2.c.otherid == table1.c.myid).correlate(table1) + ).replace_selectable( + table2, + table2.alias()), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'EXISTS (SELECT * FROM myothertable AS ' + 'myothertable_1 WHERE myothertable_1.otheri' + 'd = mytable.myid)') + self.assert_compile( + table1.select( + exists().where( + table2.c.otherid == table1.c.myid).correlate(table1)). + select_from( + table1.join( + table2, + table1.c.myid == table2.c.otherid)). + replace_selectable( + table2, + table2.alias()), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable JOIN ' + 'myothertable AS myothertable_1 ON ' + 'mytable.myid = myothertable_1.otherid ' + 'WHERE EXISTS (SELECT * FROM myothertable ' + 'AS myothertable_1 WHERE ' + 'myothertable_1.otherid = mytable.myid)') self.assert_compile( select([ @@ -588,7 +608,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "myothertable.otherid = :otherid_2)) AS anon_1" ) - def test_where_subquery(self): s = select([addresses.c.street], addresses.c.user_id == users.c.user_id, correlate=True).alias('s') @@ -600,31 +619,36 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "(SELECT addresses.street AS street FROM " "addresses, users WHERE addresses.user_id = " "users.user_id) AS s") - self.assert_compile(table1.select(table1.c.myid - == select([table1.c.myid], table1.c.name - == 'jack')), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'mytable.myid = (SELECT mytable.myid FROM ' - 'mytable WHERE mytable.name = :name_1)') - self.assert_compile(table1.select(table1.c.myid - == select([table2.c.otherid], table1.c.name - == table2.c.othername)), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'mytable.myid = (SELECT ' - 'myothertable.otherid FROM myothertable ' - 'WHERE mytable.name = myothertable.othernam' - 'e)') + self.assert_compile(table1.select( + table1.c.myid == select( + [table1.c.myid], + table1.c.name == 'jack')), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'mytable.myid = (SELECT mytable.myid FROM ' + 'mytable WHERE mytable.name = :name_1)') + self.assert_compile( + table1.select( + table1.c.myid == select( + [table2.c.otherid], + table1.c.name == table2.c.othername + ) + ), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable WHERE ' + 'mytable.myid = (SELECT ' + 'myothertable.otherid FROM myothertable ' + 'WHERE mytable.name = myothertable.othernam' + 'e)') self.assert_compile(table1.select(exists([1], table2.c.otherid - == table1.c.myid)), + == table1.c.myid)), 'SELECT mytable.myid, mytable.name, ' 'mytable.description FROM mytable WHERE ' 'EXISTS (SELECT 1 FROM myothertable WHERE ' 'myothertable.otherid = mytable.myid)') talias = table1.alias('ta') s = subquery('sq2', [talias], exists([1], table2.c.otherid - == talias.c.myid)) + == talias.c.myid)) self.assert_compile(select([s, table1]), 'SELECT sq2.myid, sq2.name, ' 'sq2.description, mytable.myid, ' @@ -635,7 +659,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 'myothertable WHERE myothertable.otherid = ' 'ta.myid)) AS sq2, mytable') - # test constructing the outer query via append_column(), which # occurs in the ORM's Query object @@ -648,18 +671,22 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 'EXISTS (SELECT 1 FROM myothertable WHERE ' 'myothertable.otherid = mytable.myid)') - def test_orderby_subquery(self): - self.assert_compile(table1.select(order_by=[select([table2.c.otherid], - table1.c.myid == table2.c.otherid)]), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable ORDER BY ' - '(SELECT myothertable.otherid FROM ' - 'myothertable WHERE mytable.myid = ' - 'myothertable.otherid)') + self.assert_compile( + table1.select( + order_by=[ + select( + [ + table2.c.otherid], + table1.c.myid == table2.c.otherid)]), + 'SELECT mytable.myid, mytable.name, ' + 'mytable.description FROM mytable ORDER BY ' + '(SELECT myothertable.otherid FROM ' + 'myothertable WHERE mytable.myid = ' + 'myothertable.otherid)') self.assert_compile(table1.select(order_by=[ desc(select([table2.c.otherid], - table1.c.myid == table2.c.otherid))]), + table1.c.myid == table2.c.otherid))]), 'SELECT mytable.myid, mytable.name, ' 'mytable.description FROM mytable ORDER BY ' '(SELECT myothertable.otherid FROM ' @@ -706,12 +733,12 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): s = select([table1.c.myid]).alias() self.assert_compile(select([table1.c.myid]).where(table1.c.myid - == s), + == s), 'SELECT mytable.myid FROM mytable WHERE ' 'mytable.myid = (SELECT mytable.myid FROM ' 'mytable)') self.assert_compile(select([table1.c.myid]).where(s - > table1.c.myid), + > table1.c.myid), 'SELECT mytable.myid FROM mytable WHERE ' 'mytable.myid < (SELECT mytable.myid FROM ' 'mytable)') @@ -728,14 +755,14 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 'SELECT (SELECT mytable.myid FROM mytable) ' '- :param_1 AS anon_1') self.assert_compile(select([select([table1.c.name]).as_scalar() - + literal('x')]), + + literal('x')]), 'SELECT (SELECT mytable.name FROM mytable) ' '|| :param_1 AS anon_1') self.assert_compile(select([s > literal(8)]), 'SELECT (SELECT mytable.myid FROM mytable) ' '> :param_1 AS anon_1') self.assert_compile(select([select([table1.c.name]).label('foo' - )]), + )]), 'SELECT (SELECT mytable.name FROM mytable) ' 'AS foo') @@ -757,25 +784,25 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): 'object directly within a column-level expression.' zips = table('zips', - column('zipcode'), - column('latitude'), - column('longitude'), - ) + column('zipcode'), + column('latitude'), + column('longitude'), + ) places = table('places', - column('id'), - column('nm') - ) + column('id'), + column('nm') + ) zip = '12345' qlat = select([zips.c.latitude], zips.c.zipcode == zip).\ - correlate(None).as_scalar() + correlate(None).as_scalar() qlng = select([zips.c.longitude], zips.c.zipcode == zip).\ - correlate(None).as_scalar() + correlate(None).as_scalar() q = select([places.c.id, places.c.nm, zips.c.zipcode, - func.latlondist(qlat, qlng).label('dist')], - zips.c.zipcode == zip, - order_by=['dist', places.c.nm] - ) + func.latlondist(qlat, qlng).label('dist')], + zips.c.zipcode == zip, + order_by=['dist', places.c.nm] + ) self.assert_compile(q, 'SELECT places.id, places.nm, ' @@ -789,11 +816,11 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): zalias = zips.alias('main_zip') qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).\ - as_scalar() + as_scalar() qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).\ - as_scalar() + as_scalar() q = select([places.c.id, places.c.nm, zalias.c.zipcode, - func.latlondist(qlat, qlng).label('dist')], + func.latlondist(qlat, qlng).label('dist')], order_by=['dist', places.c.nm]) self.assert_compile(q, 'SELECT places.id, places.nm, ' @@ -827,9 +854,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def test_label_comparison_two(self): self.assert_compile( - label('bar', column('foo', type_=String)) + 'foo', - 'foo || :param_1') - + label('bar', column('foo', type_=String)) + 'foo', + 'foo || :param_1') def test_order_by_labels_enabled(self): lab1 = (table1.c.myid + 12).label('foo') @@ -837,11 +863,11 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect = default.DefaultDialect() self.assert_compile(select([lab1, lab2]).order_by(lab1, desc(lab2)), - "SELECT mytable.myid + :myid_1 AS foo, " - "somefunc(mytable.name) AS bar FROM mytable " - "ORDER BY foo, bar DESC", - dialect=dialect - ) + "SELECT mytable.myid + :myid_1 AS foo, " + "somefunc(mytable.name) AS bar FROM mytable " + "ORDER BY foo, bar DESC", + dialect=dialect + ) # the function embedded label renders as the function self.assert_compile( @@ -854,11 +880,11 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # binary expressions render as the expression without labels self.assert_compile(select([lab1, lab2]).order_by(lab1 + "test"), - "SELECT mytable.myid + :myid_1 AS foo, " - "somefunc(mytable.name) AS bar FROM mytable " - "ORDER BY mytable.myid + :myid_1 + :param_1", - dialect=dialect - ) + "SELECT mytable.myid + :myid_1 AS foo, " + "somefunc(mytable.name) AS bar FROM mytable " + "ORDER BY mytable.myid + :myid_1 + :param_1", + dialect=dialect + ) # labels within functions in the columns clause render # with the expression @@ -868,8 +894,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "foo(mytable.myid + :myid_1) AS foo_1 FROM mytable " "ORDER BY foo, foo(mytable.myid + :myid_1)", dialect=dialect - ) - + ) lx = (table1.c.myid + table1.c.myid).label('lx') ly = (func.lower(table1.c.name) + table1.c.description).label('ly') @@ -880,19 +905,24 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "lower(mytable.name) || mytable.description AS ly " "FROM mytable ORDER BY lx, ly DESC", dialect=dialect - ) + ) def test_order_by_labels_disabled(self): lab1 = (table1.c.myid + 12).label('foo') lab2 = func.somefunc(table1.c.name).label('bar') dialect = default.DefaultDialect() dialect.supports_simple_order_by_label = False - self.assert_compile(select([lab1, lab2]).order_by(lab1, desc(lab2)), + self.assert_compile( + select( + [ + lab1, + lab2]).order_by( + lab1, + desc(lab2)), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY mytable.myid + :myid_1, somefunc(mytable.name) DESC", - dialect=dialect - ) + dialect=dialect) self.assert_compile( select([lab1, lab2]).order_by(func.hoho(lab1), desc(lab2)), "SELECT mytable.myid + :myid_1 AS foo, " @@ -905,7 +935,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def test_conjunctions(self): a, b, c = 'a', 'b', 'c' x = and_(a, b, c) - assert isinstance(x.type, Boolean) + assert isinstance(x.type, Boolean) assert str(x) == 'a AND b AND c' self.assert_compile( select([x.label('foo')]), @@ -914,8 +944,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( and_(table1.c.myid == 12, table1.c.name == 'asdf', - table2.c.othername == 'foo', "sysdate() = today()"), - "mytable.myid = :myid_1 AND mytable.name = :name_1 "\ + table2.c.othername == 'foo', "sysdate() = today()"), + "mytable.myid = :myid_1 AND mytable.name = :name_1 " "AND myothertable.othername = " ":othername_1 AND sysdate() = today()" ) @@ -928,11 +958,11 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "sysdate() = today()", ), 'mytable.myid = :myid_1 AND (myothertable.othername = ' - ':othername_1 OR myothertable.othername = :othername_2 OR ' - 'myothertable.otherid = :otherid_1) AND sysdate() = ' - 'today()', + ':othername_1 OR myothertable.othername = :othername_2 OR ' + 'myothertable.otherid = :otherid_1) AND sysdate() = ' + 'today()', checkparams={'othername_1': 'asdf', 'othername_2': 'foo', - 'otherid_1': 9, 'myid_1': 12} + 'otherid_1': 9, 'myid_1': 12} ) # test a generator @@ -954,17 +984,28 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( select([t]).where(and_(t.c.x == 5, - or_(and_(or_(t.c.x == 7))))), + or_(and_(or_(t.c.x == 7))))), "SELECT t.x FROM t WHERE t.x = :x_1 AND t.x = :x_2" ) self.assert_compile( select([t]).where(and_(or_(t.c.x == 12, - and_(or_(t.c.x == 8))))), + and_(or_(t.c.x == 8))))), "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2" ) self.assert_compile( - select([t]).where(and_(or_(or_(t.c.x == 12), - and_(or_(), or_(and_(t.c.x == 8)), and_())))), + select([t]). + where( + and_( + or_( + or_(t.c.x == 12), + and_( + or_(), + or_(and_(t.c.x == 8)), + and_() + ) + ) + ) + ), "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2" ) @@ -1014,7 +1055,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable" ) - def test_where_empty(self): self.assert_compile( select([table1.c.myid]).where(and_()), @@ -1028,7 +1068,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def test_multiple_col_binds(self): self.assert_compile( select(["*"], or_(table1.c.myid == 12, table1.c.myid == 'asdf', - table1.c.myid == 'foo')), + table1.c.myid == 'foo')), "SELECT * FROM mytable WHERE mytable.myid = :myid_1 " "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3" ) @@ -1044,7 +1084,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( table2.select(order_by=[ - table2.c.otherid, table2.c.othername.desc().nullslast()]), + table2.c.otherid, table2.c.othername.desc().nullslast()]), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " "myothertable.othername DESC NULLS LAST" @@ -1052,8 +1092,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( table2.select(order_by=[ - table2.c.otherid.nullslast(), - table2.c.othername.desc().nullsfirst()]), + table2.c.otherid.nullslast(), + table2.c.othername.desc().nullsfirst()]), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid NULLS LAST, " "myothertable.othername DESC NULLS FIRST" @@ -1061,7 +1101,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( table2.select(order_by=[table2.c.otherid.nullsfirst(), - table2.c.othername.desc()]), + table2.c.othername.desc()]), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid NULLS FIRST, " "myothertable.othername DESC" @@ -1069,7 +1109,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( table2.select(order_by=[table2.c.otherid.nullsfirst(), - table2.c.othername.desc().nullslast()]), + table2.c.othername.desc().nullslast()]), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid NULLS FIRST, " "myothertable.othername DESC NULLS LAST" @@ -1078,7 +1118,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def test_orderby_groupby(self): self.assert_compile( table2.select(order_by=[table2.c.otherid, - asc(table2.c.othername)]), + asc(table2.c.othername)]), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " "myothertable.othername ASC" @@ -1094,8 +1134,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # generative order_by self.assert_compile( - table2.select().order_by(table2.c.otherid).\ - order_by(table2.c.othername.desc()), + table2.select().order_by(table2.c.otherid). + order_by(table2.c.othername.desc()), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " "myothertable.othername DESC" @@ -1103,16 +1143,16 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( table2.select().order_by(table2.c.otherid). - order_by(table2.c.othername.desc() - ).order_by(None), + order_by(table2.c.othername.desc() + ).order_by(None), "SELECT myothertable.otherid, myothertable.othername " "FROM myothertable" ) self.assert_compile( select( - [table2.c.othername, func.count(table2.c.otherid)], - group_by=[table2.c.othername]), + [table2.c.othername, func.count(table2.c.otherid)], + group_by=[table2.c.othername]), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " "FROM myothertable GROUP BY myothertable.othername" @@ -1121,7 +1161,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # generative group by self.assert_compile( select([table2.c.othername, func.count(table2.c.otherid)]). - group_by(table2.c.othername), + group_by(table2.c.othername), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " "FROM myothertable GROUP BY myothertable.othername" @@ -1129,7 +1169,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( select([table2.c.othername, func.count(table2.c.otherid)]). - group_by(table2.c.othername).group_by(None), + group_by(table2.c.othername).group_by(None), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " "FROM myothertable" @@ -1137,8 +1177,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( select([table2.c.othername, func.count(table2.c.otherid)], - group_by=[table2.c.othername], - order_by=[table2.c.othername]), + group_by=[table2.c.othername], + order_by=[table2.c.othername]), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " "FROM myothertable " @@ -1163,7 +1203,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): table1.select, table1.c.myid == 7, for_update='unknown_mode' ) - def test_alias(self): # test the alias for a table1. column names stay the same, # table name "changes" to "foo". @@ -1188,9 +1227,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # also, only use one column from the second table and all columns # from the first table1. q = select( - [table1, table2.c.otherid], - table1.c.myid == table2.c.otherid, use_labels=True - ) + [table1, table2.c.otherid], + table1.c.myid == table2.c.otherid, use_labels=True + ) # make an alias of the "selectable". column names # stay the same (i.e. the labels), table name "changes" to "t2view". @@ -1207,20 +1246,19 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "t2view.mytable_description AS t2view_mytable_description, " "t2view.myothertable_otherid AS t2view_myothertable_otherid FROM " "(SELECT mytable.myid AS mytable_myid, " - "mytable.name AS mytable_name, " + "mytable.name AS mytable_name, " "mytable.description AS mytable_description, " - "myothertable.otherid AS " + "myothertable.otherid AS " "myothertable_otherid FROM mytable, myothertable " - "WHERE mytable.myid = " + "WHERE mytable.myid = " "myothertable.otherid) AS t2view " - "WHERE t2view.mytable_myid = :mytable_myid_1" + "WHERE t2view.mytable_myid = :mytable_myid_1" ) - def test_prefix(self): self.assert_compile( - table1.select().prefix_with("SQL_CALC_FOUND_ROWS").\ - prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), + table1.select().prefix_with("SQL_CALC_FOUND_ROWS"). + prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING " "mytable.myid, mytable.name, mytable.description FROM mytable" ) @@ -1228,9 +1266,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def test_prefix_dialect_specific(self): self.assert_compile( table1.select().prefix_with("SQL_CALC_FOUND_ROWS", - dialect='sqlite').\ - prefix_with("SQL_SOME_WEIRD_MYSQL_THING", - dialect='mysql'), + dialect='sqlite'). + prefix_with("SQL_SOME_WEIRD_MYSQL_THING", + dialect='mysql'), "SELECT SQL_SOME_WEIRD_MYSQL_THING " "mytable.myid, mytable.name, mytable.description FROM mytable", dialect=mysql.dialect() @@ -1242,6 +1280,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): SQL in the columns clause.""" dialect = default.DefaultDialect() + class Compiler(dialect.statement_compiler): ansi_bind_rules = True dialect.statement_compiler = Compiler @@ -1291,67 +1330,73 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises_message( exc.CompileError, - "Bind parameter 'foo' without a renderable value not allowed here.", - bindparam("foo").in_([]).compile, dialect=dialect - ) - + "Bind parameter 'foo' without a " + "renderable value not allowed here.", + bindparam("foo").in_( + []).compile, + dialect=dialect) def test_literal(self): self.assert_compile(select([literal('foo')]), - "SELECT :param_1 AS anon_1") + "SELECT :param_1 AS anon_1") - self.assert_compile(select([literal("foo") + literal("bar")], - from_obj=[table1]), + self.assert_compile( + select( + [ + literal("foo") + + literal("bar")], + from_obj=[table1]), "SELECT :param_1 || :param_2 AS anon_1 FROM mytable") def test_calculated_columns(self): value_tbl = table('values', - column('id', Integer), - column('val1', Float), - column('val2', Float), - ) + column('id', Integer), + column('val1', Float), + column('val2', Float), + ) self.assert_compile( - select([value_tbl.c.id, (value_tbl.c.val2 - - value_tbl.c.val1) / value_tbl.c.val1]), - "SELECT values.id, (values.val2 - values.val1) " - "/ values.val1 AS anon_1 FROM values" - ) + select([value_tbl.c.id, (value_tbl.c.val2 - + value_tbl.c.val1) / value_tbl.c.val1]), + "SELECT values.id, (values.val2 - values.val1) " + "/ values.val1 AS anon_1 FROM values" + ) self.assert_compile( - select([value_tbl.c.id], (value_tbl.c.val2 - - value_tbl.c.val1) / value_tbl.c.val1 > 2.0), - "SELECT values.id FROM values WHERE " - "(values.val2 - values.val1) / values.val1 > :param_1" - ) + select([ + value_tbl.c.id], + (value_tbl.c.val2 - value_tbl.c.val1) / + value_tbl.c.val1 > 2.0), + "SELECT values.id FROM values WHERE " + "(values.val2 - values.val1) / values.val1 > :param_1" + ) self.assert_compile( - select([value_tbl.c.id], value_tbl.c.val1 / - (value_tbl.c.val2 - value_tbl.c.val1) / - value_tbl.c.val1 > 2.0), - "SELECT values.id FROM values WHERE " - "(values.val1 / (values.val2 - values.val1)) " - "/ values.val1 > :param_1" - ) + select([value_tbl.c.id], value_tbl.c.val1 / + (value_tbl.c.val2 - value_tbl.c.val1) / + value_tbl.c.val1 > 2.0), + "SELECT values.id FROM values WHERE " + "(values.val1 / (values.val2 - values.val1)) " + "/ values.val1 > :param_1" + ) def test_percent_chars(self): t = table("table%name", - column("percent%"), - column("%(oneofthese)s"), - column("spaces % more spaces"), - ) + column("percent%"), + column("%(oneofthese)s"), + column("spaces % more spaces"), + ) self.assert_compile( t.select(use_labels=True), - '''SELECT "table%name"."percent%" AS "table%name_percent%", '''\ - '''"table%name"."%(oneofthese)s" AS '''\ - '''"table%name_%(oneofthese)s", '''\ - '''"table%name"."spaces % more spaces" AS '''\ - '''"table%name_spaces % '''\ + '''SELECT "table%name"."percent%" AS "table%name_percent%", ''' + '''"table%name"."%(oneofthese)s" AS ''' + '''"table%name_%(oneofthese)s", ''' + '''"table%name"."spaces % more spaces" AS ''' + '''"table%name_spaces % ''' '''more spaces" FROM "table%name"''' ) - def test_joins(self): self.assert_compile( join(table2, table1, table1.c.myid == table2.c.otherid).select(), @@ -1362,17 +1407,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( select( - [table1], + [table1], from_obj=[join(table1, table2, table1.c.myid - == table2.c.otherid)] + == table2.c.otherid)] ), - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable JOIN myothertable ON mytable.myid = myothertable.otherid") + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable JOIN myothertable ON mytable.myid = myothertable.otherid") self.assert_compile( select( [join(join(table1, table2, table1.c.myid == table2.c.otherid), - table3, table1.c.myid == table3.c.userid)] + table3, table1.c.myid == table3.c.userid)] ), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername, " @@ -1385,7 +1430,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( join(users, addresses, users.c.user_id == - addresses.c.user_id).select(), + addresses.c.user_id).select(), "SELECT users.user_id, users.user_name, users.password, " "addresses.address_id, addresses.user_id, addresses.street, " "addresses.city, addresses.state, addresses.zip " @@ -1394,58 +1439,57 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): ) self.assert_compile( - select([table1, table2, table3], + select([table1, table2, table3], - from_obj=[join(table1, table2, - table1.c.myid == table2.c.otherid). - outerjoin(table3, - table1.c.myid == table3.c.userid)] - ), - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername, " - "thirdtable.userid," - " thirdtable.otherstuff FROM mytable " - "JOIN myothertable ON mytable.myid " - "= myothertable.otherid LEFT OUTER JOIN thirdtable " - "ON mytable.myid =" - " thirdtable.userid" - ) + from_obj=[join(table1, table2, + table1.c.myid == table2.c.otherid). + outerjoin(table3, + table1.c.myid == table3.c.userid)] + ), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername, " + "thirdtable.userid," + " thirdtable.otherstuff FROM mytable " + "JOIN myothertable ON mytable.myid " + "= myothertable.otherid LEFT OUTER JOIN thirdtable " + "ON mytable.myid =" + " thirdtable.userid" + ) self.assert_compile( - select([table1, table2, table3], - from_obj=[outerjoin(table1, - join(table2, table3, table2.c.otherid - == table3.c.userid), - table1.c.myid == table2.c.otherid)] - ), - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername, " - "thirdtable.userid," - " thirdtable.otherstuff FROM mytable LEFT OUTER JOIN " - "(myothertable " - "JOIN thirdtable ON myothertable.otherid = " - "thirdtable.userid) ON " - "mytable.myid = myothertable.otherid" - ) + select([table1, table2, table3], + from_obj=[outerjoin(table1, + join(table2, table3, table2.c.otherid + == table3.c.userid), + table1.c.myid == table2.c.otherid)] + ), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername, " + "thirdtable.userid," + " thirdtable.otherstuff FROM mytable LEFT OUTER JOIN " + "(myothertable " + "JOIN thirdtable ON myothertable.otherid = " + "thirdtable.userid) ON " + "mytable.myid = myothertable.otherid" + ) query = select( - [table1, table2], - or_( - table1.c.name == 'fred', - table1.c.myid == 10, - table2.c.othername != 'jack', - "EXISTS (select yay from foo where boo = lar)" - ), - from_obj=[outerjoin(table1, table2, + [table1, table2], + or_( + table1.c.name == 'fred', + table1.c.myid == 10, + table2.c.othername != 'jack', + "EXISTS (select yay from foo where boo = lar)" + ), + from_obj=[outerjoin(table1, table2, table1.c.myid == table2.c.otherid)] - ) - self.assert_compile(query, - "SELECT mytable.myid, mytable.name, mytable.description, " + ) + self.assert_compile( + query, "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername " "FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = " "myothertable.otherid WHERE mytable.name = :name_1 OR " "mytable.myid = :myid_1 OR myothertable.othername != :othername_1 " - "OR EXISTS (select yay from foo where boo = lar)", - ) + "OR EXISTS (select yay from foo where boo = lar)", ) def test_compound_selects(self): assert_raises_message( @@ -1457,42 +1501,42 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): ) x = union( - select([table1], table1.c.myid == 5), - select([table1], table1.c.myid == 12), - order_by=[table1.c.myid], + select([table1], table1.c.myid == 5), + select([table1], table1.c.myid == 12), + order_by=[table1.c.myid], ) - self.assert_compile(x, - "SELECT mytable.myid, mytable.name, " - "mytable.description " - "FROM mytable WHERE " - "mytable.myid = :myid_1 UNION " - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_2 " - "ORDER BY mytable.myid") + self.assert_compile( + x, "SELECT mytable.myid, mytable.name, " + "mytable.description " + "FROM mytable WHERE " + "mytable.myid = :myid_1 UNION " + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable WHERE mytable.myid = :myid_2 " + "ORDER BY mytable.myid") x = union( - select([table1]), - select([table1]) + select([table1]), + select([table1]) ) x = union(x, select([table1])) - self.assert_compile(x, - "(SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable UNION SELECT mytable.myid, mytable.name, " - "mytable.description FROM mytable) UNION SELECT mytable.myid," - " mytable.name, mytable.description FROM mytable") + self.assert_compile( + x, "(SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable UNION SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable) UNION SELECT mytable.myid," + " mytable.name, mytable.description FROM mytable") u1 = union( select([table1.c.myid, table1.c.name]), select([table2]), select([table3]) ) - self.assert_compile(u1, - "SELECT mytable.myid, mytable.name " - "FROM mytable UNION SELECT myothertable.otherid, " - "myothertable.othername FROM myothertable " - "UNION SELECT thirdtable.userid, thirdtable.otherstuff " - "FROM thirdtable") + self.assert_compile( + u1, "SELECT mytable.myid, mytable.name " + "FROM mytable UNION SELECT myothertable.otherid, " + "myothertable.othername FROM myothertable " + "UNION SELECT thirdtable.userid, thirdtable.otherstuff " + "FROM thirdtable") assert u1.corresponding_column(table2.c.otherid) is u1.c.myid @@ -1514,9 +1558,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( union( select([table1.c.myid, table1.c.name, - func.max(table1.c.description)], - table1.c.name == 'name2', - group_by=[table1.c.myid, table1.c.name]), + func.max(table1.c.description)], + table1.c.name == 'name2', + group_by=[table1.c.myid, table1.c.name]), table1.select(table1.c.name == 'name1') ), "SELECT mytable.myid, mytable.name, " @@ -1532,8 +1576,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): union( select([literal(100).label('value')]), select([literal(200).label('value')]) - ), - "SELECT :param_1 AS value UNION SELECT :param_2 AS value" + ), + "SELECT :param_1 AS value UNION SELECT :param_2 AS value" ) self.assert_compile( @@ -1550,20 +1594,21 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT thirdtable.userid FROM thirdtable)" ) - s = select([column('foo'), column('bar')]) # ORDER BY's even though not supported by # all DB's, are rendered if requested - self.assert_compile(union(s.order_by("foo"), s.order_by("bar")), - "SELECT foo, bar ORDER BY foo UNION SELECT foo, bar ORDER BY bar" - ) + self.assert_compile( + union( + s.order_by("foo"), + s.order_by("bar")), + "SELECT foo, bar ORDER BY foo UNION SELECT foo, bar ORDER BY bar") # self_group() is honored self.assert_compile( union(s.order_by("foo").self_group(), - s.order_by("bar").limit(10).self_group()), + s.order_by("bar").limit(10).self_group()), "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, " - "bar ORDER BY bar LIMIT :param_1)", + "bar ORDER BY bar LIMIT :param_1)", {'param_1': 10} ) @@ -1588,7 +1633,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): union(s, union(s, union(s, s))), "SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat " "UNION (SELECT foo, bar FROM bat " - "UNION SELECT foo, bar FROM bat))" + "UNION SELECT foo, bar FROM bat))" ) self.assert_compile( @@ -1601,14 +1646,14 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): select([union(s, s).alias()]), 'SELECT anon_1.foo, anon_1.bar FROM ' '(SELECT foo, bar FROM bat UNION ' - 'SELECT foo, bar FROM bat) AS anon_1' + 'SELECT foo, bar FROM bat) AS anon_1' ) self.assert_compile( select([except_(s, s).alias()]), 'SELECT anon_1.foo, anon_1.bar FROM ' '(SELECT foo, bar FROM bat EXCEPT ' - 'SELECT foo, bar FROM bat) AS anon_1' + 'SELECT foo, bar FROM bat) AS anon_1' ) # this query sqlite specifically chokes on @@ -1638,7 +1683,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): ), "SELECT anon_1.foo, anon_1.bar FROM " "(SELECT foo, bar FROM bat EXCEPT " - "SELECT foo, bar FROM bat) AS anon_1 " + "SELECT foo, bar FROM bat) AS anon_1 " "UNION SELECT foo, bar FROM bat" ) @@ -1657,7 +1702,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "UNION (SELECT foo, bar FROM bat " "UNION SELECT foo, bar FROM bat)") - self.assert_compile( union( intersect(s, s), @@ -1665,137 +1709,141 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): ), "(SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat) " "UNION (SELECT foo, bar FROM bat INTERSECT " - "SELECT foo, bar FROM bat)" + "SELECT foo, bar FROM bat)" ) def test_binds(self): for ( - stmt, - expected_named_stmt, - expected_positional_stmt, - expected_default_params_dict, - expected_default_params_list, - test_param_dict, - expected_test_params_dict, - expected_test_params_list - ) in [ - ( - select( - [table1, table2], - and_( - table1.c.myid == table2.c.otherid, - table1.c.name == bindparam('mytablename') - )), - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername FROM mytable, " - "myothertable WHERE mytable.myid = myothertable.otherid " - "AND mytable.name = :mytablename", - "SELECT mytable.myid, mytable.name, mytable.description, " - "myothertable.otherid, myothertable.othername FROM mytable, " - "myothertable WHERE mytable.myid = myothertable.otherid AND " - "mytable.name = ?", - {'mytablename':None}, [None], - {'mytablename':5}, {'mytablename':5}, [5] - ), - ( - select([table1], or_(table1.c.myid == bindparam('myid'), - table2.c.otherid == bindparam('myid'))), - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable, myothertable WHERE mytable.myid = :myid " - "OR myothertable.otherid = :myid", - "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable, myothertable WHERE mytable.myid = ? " - "OR myothertable.otherid = ?", - {'myid': None}, [None, None], - {'myid': 5}, {'myid': 5}, [5, 5] - ), - ( - text("SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = :myid OR " - "myothertable.otherid = :myid"), - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = :myid OR " - "myothertable.otherid = :myid", - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = ? OR " - "myothertable.otherid = ?", - {'myid':None}, [None, None], - {'myid': 5}, {'myid': 5}, [5, 5] - ), - ( - select([table1], or_(table1.c.myid == - bindparam('myid', unique=True), - table2.c.otherid == - bindparam('myid', unique=True))), - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - ":myid_1 OR myothertable.otherid = :myid_2", - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = ? " - "OR myothertable.otherid = ?", - {'myid_1':None, 'myid_2':None}, [None, None], - {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6] - ), - ( + stmt, + expected_named_stmt, + expected_positional_stmt, + expected_default_params_dict, + expected_default_params_list, + test_param_dict, + expected_test_params_dict, + expected_test_params_list + ) in [ + ( + select( + [table1, table2], + and_( + table1.c.myid == table2.c.otherid, + table1.c.name == bindparam('mytablename') + )), + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername FROM mytable, " + "myothertable WHERE mytable.myid = myothertable.otherid " + "AND mytable.name = :mytablename", + "SELECT mytable.myid, mytable.name, mytable.description, " + "myothertable.otherid, myothertable.othername FROM mytable, " + "myothertable WHERE mytable.myid = myothertable.otherid AND " + "mytable.name = ?", + {'mytablename': None}, [None], + {'mytablename': 5}, {'mytablename': 5}, [5] + ), + ( + select([table1], or_(table1.c.myid == bindparam('myid'), + table2.c.otherid == bindparam('myid'))), + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable, myothertable WHERE mytable.myid = :myid " + "OR myothertable.otherid = :myid", + "SELECT mytable.myid, mytable.name, mytable.description " + "FROM mytable, myothertable WHERE mytable.myid = ? " + "OR myothertable.otherid = ?", + {'myid': None}, [None, None], + {'myid': 5}, {'myid': 5}, [5, 5] + ), + ( + text("SELECT mytable.myid, mytable.name, " + "mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = :myid OR " + "myothertable.otherid = :myid"), + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = :myid OR " + "myothertable.otherid = :myid", + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = ? OR " + "myothertable.otherid = ?", + {'myid': None}, [None, None], + {'myid': 5}, {'myid': 5}, [5, 5] + ), + ( + select([table1], or_(table1.c.myid == + bindparam('myid', unique=True), + table2.c.otherid == + bindparam('myid', unique=True))), + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + ":myid_1 OR myothertable.otherid = :myid_2", + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = ? " + "OR myothertable.otherid = ?", + {'myid_1': None, 'myid_2': None}, [None, None], + {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6] + ), + ( bindparam('test', type_=String, required=False) + text("'hi'"), ":test || 'hi'", "? || 'hi'", - {'test':None}, [None], - {}, {'test':None}, [None] - ), - ( + {'test': None}, [None], + {}, {'test': None}, [None] + ), + ( # testing select.params() here - bindparam() objects # must get required flag set to False - select([table1], or_(table1.c.myid == bindparam('myid'), - table2.c.otherid == bindparam('myotherid'))).\ - params({'myid':8, 'myotherid':7}), - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - ":myid OR myothertable.otherid = :myotherid", - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - "? OR myothertable.otherid = ?", - {'myid': 8, 'myotherid': 7}, [8, 7], - {'myid': 5}, {'myid': 5, 'myotherid': 7}, [5, 7] - ), - ( - select([table1], or_(table1.c.myid == - bindparam('myid', value=7, unique=True), - table2.c.otherid == - bindparam('myid', value=8, unique=True))), - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - ":myid_1 OR myothertable.otherid = :myid_2", - "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = " - "? OR myothertable.otherid = ?", - {'myid_1': 7, 'myid_2': 8}, [7, 8], - {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6] - ), - ]: - - self.assert_compile(stmt, expected_named_stmt, - params=expected_default_params_dict) - self.assert_compile(stmt, expected_positional_stmt, - dialect=sqlite.dialect()) - nonpositional = stmt.compile() - positional = stmt.compile(dialect=sqlite.dialect()) - pp = positional.params - eq_([pp[k] for k in positional.positiontup], - expected_default_params_list) - - eq_(nonpositional.construct_params(test_param_dict), - expected_test_params_dict) - pp = positional.construct_params(test_param_dict) - eq_( - [pp[k] for k in positional.positiontup], - expected_test_params_list - ) + select( + [table1], + or_( + table1.c.myid == bindparam('myid'), + table2.c.otherid == bindparam('myotherid') + )).params({'myid': 8, 'myotherid': 7}), + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + ":myid OR myothertable.otherid = :myotherid", + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + "? OR myothertable.otherid = ?", + {'myid': 8, 'myotherid': 7}, [8, 7], + {'myid': 5}, {'myid': 5, 'myotherid': 7}, [5, 7] + ), + ( + select([table1], or_(table1.c.myid == + bindparam('myid', value=7, unique=True), + table2.c.otherid == + bindparam('myid', value=8, unique=True))), + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + ":myid_1 OR myothertable.otherid = :myid_2", + "SELECT mytable.myid, mytable.name, mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = " + "? OR myothertable.otherid = ?", + {'myid_1': 7, 'myid_2': 8}, [7, 8], + {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6] + ), + ]: + + self.assert_compile(stmt, expected_named_stmt, + params=expected_default_params_dict) + self.assert_compile(stmt, expected_positional_stmt, + dialect=sqlite.dialect()) + nonpositional = stmt.compile() + positional = stmt.compile(dialect=sqlite.dialect()) + pp = positional.params + eq_([pp[k] for k in positional.positiontup], + expected_default_params_list) + + eq_(nonpositional.construct_params(test_param_dict), + expected_test_params_dict) + pp = positional.construct_params(test_param_dict) + eq_( + [pp[k] for k in positional.positiontup], + expected_test_params_list + ) # check that params() doesn't modify original statement s = select([table1], or_(table1.c.myid == bindparam('myid'), - table2.c.otherid == - bindparam('myotherid'))) + table2.c.otherid == + bindparam('myotherid'))) s2 = s.params({'myid': 8, 'myotherid': 7}) s3 = s2.params({'myid': 9}) assert s.compile().params == {'myid': None, 'myotherid': None} @@ -1805,9 +1853,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # test using same 'unique' param object twice in one compile s = select([table1.c.myid]).where(table1.c.myid == 12).as_scalar() s2 = select([table1, s], table1.c.myid == s) - self.assert_compile(s2, - "SELECT mytable.myid, mytable.name, mytable.description, " - "(SELECT mytable.myid FROM mytable WHERE mytable.myid = "\ + self.assert_compile( + s2, "SELECT mytable.myid, mytable.name, mytable.description, " + "(SELECT mytable.myid FROM mytable WHERE mytable.myid = " ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = " "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)") positional = s2.compile(dialect=sqlite.dialect()) @@ -1817,18 +1865,18 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # check that conflicts with "unique" params are caught s = select([table1], or_(table1.c.myid == 7, - table1.c.myid == bindparam('myid_1'))) + table1.c.myid == bindparam('myid_1'))) assert_raises_message(exc.CompileError, - "conflicts with unique bind parameter " - "of the same name", - str, s) + "conflicts with unique bind parameter " + "of the same name", + str, s) s = select([table1], or_(table1.c.myid == 7, table1.c.myid == 8, - table1.c.myid == bindparam('myid_1'))) + table1.c.myid == bindparam('myid_1'))) assert_raises_message(exc.CompileError, - "conflicts with unique bind parameter " - "of the same name", - str, s) + "conflicts with unique bind parameter " + "of the same name", + str, s) def _test_binds_no_hash_collision(self): """test that construct_params doesn't corrupt dict @@ -1845,7 +1893,6 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): eq_(len(set(pp)), total_params, '%s %s' % (len(set(pp)), len(pp))) eq_(len(set(pp.values())), total_params) - def test_bind_as_col(self): t = table('foo', column('id')) @@ -1863,74 +1910,76 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): ) def test_bind_params_missing(self): - assert_raises_message(exc.InvalidRequestError, + assert_raises_message( + exc.InvalidRequestError, r"A value is required for bind parameter 'x'", - select([table1]).where( - and_( - table1.c.myid == bindparam("x", required=True), - table1.c.name == bindparam("y", required=True) - ) - ).compile().construct_params, + select( + [table1]).where( + and_( + table1.c.myid == bindparam("x", required=True), + table1.c.name == bindparam("y", required=True) + ) + ).compile().construct_params, params=dict(y=5) ) - assert_raises_message(exc.InvalidRequestError, + assert_raises_message( + exc.InvalidRequestError, r"A value is required for bind parameter 'x'", - select([table1]).where( - table1.c.myid == bindparam("x", required=True) - ).compile().construct_params - ) + select( + [table1]).where( + table1.c.myid == bindparam( + "x", + required=True)).compile().construct_params) - assert_raises_message(exc.InvalidRequestError, + assert_raises_message( + exc.InvalidRequestError, r"A value is required for bind parameter 'x', " - "in parameter group 2", - select([table1]).where( - and_( - table1.c.myid == bindparam("x", required=True), - table1.c.name == bindparam("y", required=True) - ) - ).compile().construct_params, - params=dict(y=5), - _group_number=2 - ) + "in parameter group 2", + select( + [table1]).where( + and_( + table1.c.myid == bindparam("x", required=True), + table1.c.name == bindparam("y", required=True) + ) + ).compile().construct_params, + params=dict(y=5), _group_number=2) - assert_raises_message(exc.InvalidRequestError, + assert_raises_message( + exc.InvalidRequestError, r"A value is required for bind parameter 'x', " - "in parameter group 2", - select([table1]).where( - table1.c.myid == bindparam("x", required=True) - ).compile().construct_params, - _group_number=2 - ) - - - + "in parameter group 2", + select( + [table1]).where( + table1.c.myid == bindparam( + "x", + required=True)).compile().construct_params, + _group_number=2) def test_tuple(self): self.assert_compile( tuple_(table1.c.myid, table1.c.name).in_( - [(1, 'foo'), (5, 'bar')]), + [(1, 'foo'), (5, 'bar')]), "(mytable.myid, mytable.name) IN " "((:param_1, :param_2), (:param_3, :param_4))" ) self.assert_compile( tuple_(table1.c.myid, table1.c.name).in_( - [tuple_(table2.c.otherid, table2.c.othername)] - ), + [tuple_(table2.c.otherid, table2.c.othername)] + ), "(mytable.myid, mytable.name) IN " "((myothertable.otherid, myothertable.othername))" ) self.assert_compile( tuple_(table1.c.myid, table1.c.name).in_( - select([table2.c.otherid, table2.c.othername]) - ), + select([table2.c.otherid, table2.c.othername]) + ), "(mytable.myid, mytable.name) IN (SELECT " "myothertable.otherid, myothertable.othername FROM myothertable)" ) - def test_cast(self): tbl = table('casttest', column('id', Integer), @@ -1941,47 +1990,54 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def check_results(dialect, expected_results, literal): eq_(len(expected_results), 5, - 'Incorrect number of expected results') + 'Incorrect number of expected results') eq_(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), - 'CAST(casttest.v1 AS %s)' % expected_results[0]) + 'CAST(casttest.v1 AS %s)' % expected_results[0]) eq_(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)), - 'CAST(casttest.v1 AS %s)' % expected_results[1]) + 'CAST(casttest.v1 AS %s)' % expected_results[1]) eq_(str(cast(tbl.c.ts, Date).compile(dialect=dialect)), - 'CAST(casttest.ts AS %s)' % expected_results[2]) + 'CAST(casttest.ts AS %s)' % expected_results[2]) eq_(str(cast(1234, Text).compile(dialect=dialect)), - 'CAST(%s AS %s)' % (literal, expected_results[3])) + 'CAST(%s AS %s)' % (literal, expected_results[3])) eq_(str(cast('test', String(20)).compile(dialect=dialect)), - 'CAST(%s AS %s)' % (literal, expected_results[4])) + 'CAST(%s AS %s)' % (literal, expected_results[4])) # fixme: shoving all of this dialect-specific stuff in one test # is now officialy completely ridiculous AND non-obviously omits # coverage on other dialects. - sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(dialect=dialect) + sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile( + dialect=dialect) if isinstance(dialect, type(mysql.dialect())): eq_(str(sel), - "SELECT casttest.id, casttest.v1, casttest.v2, casttest.ts, " - "CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest") + "SELECT casttest.id, casttest.v1, casttest.v2, " + "casttest.ts, " + "CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest") else: eq_(str(sel), - "SELECT casttest.id, casttest.v1, casttest.v2, " - "casttest.ts, CAST(casttest.v1 AS NUMERIC) AS " - "anon_1 \nFROM casttest") + "SELECT casttest.id, casttest.v1, casttest.v2, " + "casttest.ts, CAST(casttest.v1 AS NUMERIC) AS " + "anon_1 \nFROM casttest") # first test with PostgreSQL engine - check_results(postgresql.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', - 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s') + check_results( + postgresql.dialect(), [ + 'NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], + '%(param_1)s') # then the Oracle engine - check_results(oracle.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', - 'DATE', 'CLOB', 'VARCHAR2(20 CHAR)'], ':param_1') + check_results( + oracle.dialect(), [ + 'NUMERIC', 'NUMERIC(12, 9)', 'DATE', + 'CLOB', 'VARCHAR2(20 CHAR)'], + ':param_1') # then the sqlite engine check_results(sqlite.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', - 'DATE', 'TEXT', 'VARCHAR(20)'], '?') + 'DATE', 'TEXT', 'VARCHAR(20)'], '?') # then the MySQL engine check_results(mysql.dialect(), ['DECIMAL', 'DECIMAL(12, 9)', - 'DATE', 'CHAR', 'CHAR(20)'], '%s') + 'DATE', 'CHAR', 'CHAR(20)'], '%s') self.assert_compile(cast(text('NULL'), Integer), 'CAST(NULL AS INTEGER)', @@ -2108,25 +2164,23 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT x + foo() OVER () AS anon_1" ) - def test_date_between(self): import datetime table = Table('dt', metadata, - Column('date', Date)) + Column('date', Date)) self.assert_compile( table.select(table.c.date.between(datetime.date(2006, 6, 1), - datetime.date(2006, 6, 5))), + datetime.date(2006, 6, 5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", checkparams={'date_1': datetime.date(2006, 6, 1), - 'date_2': datetime.date(2006, 6, 5)}) + 'date_2': datetime.date(2006, 6, 5)}) self.assert_compile( table.select(sql.between(table.c.date, datetime.date(2006, 6, 1), - datetime.date(2006, 6, 5))), + datetime.date(2006, 6, 5))), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", checkparams={'date_1': datetime.date(2006, 6, 1), - 'date_2': datetime.date(2006, 6, 5)}) - + 'date_2': datetime.date(2006, 6, 5)}) def test_delayed_col_naming(self): my_str = Column(String) @@ -2179,8 +2233,8 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): f1 = func.hoho(table1.c.name) s1 = select([table1.c.myid, table1.c.myid.label('foobar'), - f1, - func.lala(table1.c.name).label('gg')]) + f1, + func.lala(table1.c.name).label('gg')]) eq_( list(s1.c.keys()), @@ -2196,12 +2250,12 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): cast(table1.c.name, Numeric), literal('x'), ) - for col, key, expr, label in ( + for col, key, expr, lbl in ( (table1.c.name, 'name', 'mytable.name', None), (exprs[0], str(exprs[0]), 'mytable.myid = :myid_1', 'anon_1'), (exprs[1], str(exprs[1]), 'hoho(mytable.myid)', 'hoho_1'), (exprs[2], str(exprs[2]), - 'CAST(mytable.name AS NUMERIC)', 'anon_1'), + 'CAST(mytable.name AS NUMERIC)', 'anon_1'), (t1.c.col1, 'col1', 'mytable.col1', None), (column('some wacky thing'), 'some wacky thing', '"some wacky thing"', ''), @@ -2215,26 +2269,27 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): s1 = select([col], from_obj=t) assert list(s1.c.keys()) == [key], list(s1.c.keys()) - if label: - self.assert_compile(s1, - "SELECT %s AS %s FROM mytable" % (expr, label)) + if lbl: + self.assert_compile( + s1, "SELECT %s AS %s FROM mytable" % + (expr, lbl)) else: self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,)) s1 = select([s1]) - if label: - self.assert_compile(s1, - "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % - (label, expr, label)) + if lbl: + self.assert_compile( + s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % + (lbl, expr, lbl)) elif col.table is not None: # sqlite rule labels subquery columns - self.assert_compile(s1, - "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % - (key, expr, key)) + self.assert_compile( + s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % + (key, expr, key)) else: self.assert_compile(s1, - "SELECT %s FROM (SELECT %s FROM mytable)" % - (expr, expr)) + "SELECT %s FROM (SELECT %s FROM mytable)" % + (expr, expr)) def test_hints(self): s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s") @@ -2248,88 +2303,90 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): subs4 = select([ table1, table2 - ]).select_from(table1.join(table2, table1.c.myid == table2.c.otherid)).\ + ]).select_from( + table1.join(table2, table1.c.myid == table2.c.otherid)).\ with_hint(table1, 'hint1') s4 = select([table3]).select_from( - table3.join( - subs4, - subs4.c.othername == table3.c.otherstuff - ) - ).\ - with_hint(table3, 'hint3') - + table3.join( + subs4, + subs4.c.othername == table3.c.otherstuff + ) + ).\ + with_hint(table3, 'hint3') t1 = table('QuotedName', column('col1')) s6 = select([t1.c.col1]).where(t1.c.col1 > 10).\ - with_hint(t1, '%(name)s idx1') + with_hint(t1, '%(name)s idx1') a2 = t1.alias('SomeName') s7 = select([a2.c.col1]).where(a2.c.col1 > 10).\ - with_hint(a2, '%(name)s idx1') + with_hint(a2, '%(name)s idx1') mysql_d, oracle_d, sybase_d = \ - mysql.dialect(), \ - oracle.dialect(), \ - sybase.dialect() + mysql.dialect(), \ + oracle.dialect(), \ + sybase.dialect() for stmt, dialect, expected in [ - (s, mysql_d, - "SELECT mytable.myid FROM mytable test hint mytable"), - (s, oracle_d, - "SELECT /*+ test hint mytable */ mytable.myid FROM mytable"), - (s, sybase_d, - "SELECT mytable.myid FROM mytable test hint mytable"), - (s2, mysql_d, - "SELECT mytable.myid FROM mytable"), - (s2, oracle_d, - "SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable"), - (s2, sybase_d, - "SELECT mytable.myid FROM mytable WITH HINT INDEX idx"), - (s3, mysql_d, - "SELECT mytable_1.myid FROM mytable AS mytable_1 " - "index(mytable_1 hint)"), - (s3, oracle_d, - "SELECT /*+ index(mytable_1 hint) */ mytable_1.myid FROM " - "mytable mytable_1"), - (s3, sybase_d, - "SELECT mytable_1.myid FROM mytable AS mytable_1 " - "index(mytable_1 hint)"), - (s4, mysql_d, - "SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable " - "hint3 INNER JOIN (SELECT mytable.myid, mytable.name, " - "mytable.description, myothertable.otherid, " - "myothertable.othername FROM mytable hint1 INNER " - "JOIN myothertable ON mytable.myid = myothertable.otherid) " - "ON othername = thirdtable.otherstuff"), - (s4, sybase_d, - "SELECT thirdtable.userid, thirdtable.otherstuff FROM thirdtable " - "hint3 JOIN (SELECT mytable.myid, mytable.name, " - "mytable.description, myothertable.otherid, " - "myothertable.othername FROM mytable hint1 " - "JOIN myothertable ON mytable.myid = myothertable.otherid) " - "ON othername = thirdtable.otherstuff"), - (s4, oracle_d, - "SELECT /*+ hint3 */ thirdtable.userid, thirdtable.otherstuff " - "FROM thirdtable JOIN (SELECT /*+ hint1 */ mytable.myid," - " mytable.name, mytable.description, myothertable.otherid," - " myothertable.othername FROM mytable JOIN myothertable ON" - " mytable.myid = myothertable.otherid) ON othername =" - " thirdtable.otherstuff"), -# TODO: figure out dictionary ordering solution here -# (s5, oracle_d, -# "SELECT /*+ hint3 */ /*+ hint1 */ thirdtable.userid, " -# "thirdtable.otherstuff " -# "FROM thirdtable JOIN (SELECT mytable.myid," -# " mytable.name, mytable.description, myothertable.otherid," -# " myothertable.othername FROM mytable JOIN myothertable ON" -# " mytable.myid = myothertable.otherid) ON othername =" -# " thirdtable.otherstuff"), - (s6, oracle_d, + (s, mysql_d, + "SELECT mytable.myid FROM mytable test hint mytable"), + (s, oracle_d, + "SELECT /*+ test hint mytable */ mytable.myid FROM mytable"), + (s, sybase_d, + "SELECT mytable.myid FROM mytable test hint mytable"), + (s2, mysql_d, + "SELECT mytable.myid FROM mytable"), + (s2, oracle_d, + "SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable"), + (s2, sybase_d, + "SELECT mytable.myid FROM mytable WITH HINT INDEX idx"), + (s3, mysql_d, + "SELECT mytable_1.myid FROM mytable AS mytable_1 " + "index(mytable_1 hint)"), + (s3, oracle_d, + "SELECT /*+ index(mytable_1 hint) */ mytable_1.myid FROM " + "mytable mytable_1"), + (s3, sybase_d, + "SELECT mytable_1.myid FROM mytable AS mytable_1 " + "index(mytable_1 hint)"), + (s4, mysql_d, + "SELECT thirdtable.userid, thirdtable.otherstuff " + "FROM thirdtable " + "hint3 INNER JOIN (SELECT mytable.myid, mytable.name, " + "mytable.description, myothertable.otherid, " + "myothertable.othername FROM mytable hint1 INNER " + "JOIN myothertable ON mytable.myid = myothertable.otherid) " + "ON othername = thirdtable.otherstuff"), + (s4, sybase_d, + "SELECT thirdtable.userid, thirdtable.otherstuff " + "FROM thirdtable " + "hint3 JOIN (SELECT mytable.myid, mytable.name, " + "mytable.description, myothertable.otherid, " + "myothertable.othername FROM mytable hint1 " + "JOIN myothertable ON mytable.myid = myothertable.otherid) " + "ON othername = thirdtable.otherstuff"), + (s4, oracle_d, + "SELECT /*+ hint3 */ thirdtable.userid, thirdtable.otherstuff " + "FROM thirdtable JOIN (SELECT /*+ hint1 */ mytable.myid," + " mytable.name, mytable.description, myothertable.otherid," + " myothertable.othername FROM mytable JOIN myothertable ON" + " mytable.myid = myothertable.otherid) ON othername =" + " thirdtable.otherstuff"), + # TODO: figure out dictionary ordering solution here + # (s5, oracle_d, + # "SELECT /*+ hint3 */ /*+ hint1 */ thirdtable.userid, " + # "thirdtable.otherstuff " + # "FROM thirdtable JOIN (SELECT mytable.myid," + # " mytable.name, mytable.description, myothertable.otherid," + # " myothertable.othername FROM mytable JOIN myothertable ON" + # " mytable.myid = myothertable.otherid) ON othername =" + # " thirdtable.otherstuff"), + (s6, oracle_d, """SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """ """FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1"""), - (s7, oracle_d, - """SELECT /*+ SomeName idx1 */ "SomeName".col1 FROM """ - """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""), + (s7, oracle_d, + """SELECT /*+ SomeName idx1 */ "SomeName".col1 FROM """ + """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""), ]: self.assert_compile( stmt, @@ -2345,13 +2402,15 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): def test_literal_as_text_nonstring_raise(self): assert_raises(exc.ArgumentError, - and_, ("a",), ("b",) - ) + and_, ("a",), ("b",) + ) class UnsupportedTest(fixtures.TestBase): + def test_unsupported_element_str_visit_name(self): from sqlalchemy.sql.expression import ClauseElement + class SomeElement(ClauseElement): __visit_name__ = 'some_element' @@ -2364,7 +2423,9 @@ class UnsupportedTest(fixtures.TestBase): def test_unsupported_element_meth_visit_name(self): from sqlalchemy.sql.expression import ClauseElement + class SomeElement(ClauseElement): + @classmethod def __visit_name__(cls): return "some_element" @@ -2378,6 +2439,7 @@ class UnsupportedTest(fixtures.TestBase): def test_unsupported_operator(self): from sqlalchemy.sql.expression import BinaryExpression + def myop(x, y): pass binary = BinaryExpression(column("foo"), column("bar"), myop) @@ -2394,6 +2456,7 @@ class KwargPropagationTest(fixtures.TestBase): @classmethod def setup_class(cls): from sqlalchemy.sql.expression import ColumnClause, TableClause + class CatchCol(ColumnClause): pass @@ -2417,15 +2480,15 @@ class KwargPropagationTest(fixtures.TestBase): def _do_test(self, element): d = default.DefaultDialect() d.statement_compiler(d, element, - compile_kwargs={"canary": True}) + compile_kwargs={"canary": True}) def test_binary(self): self._do_test(self.column == 5) def test_select(self): s = select([self.column]).select_from(self.table).\ - where(self.column == self.criterion).\ - order_by(self.column) + where(self.column == self.criterion).\ + order_by(self.column) self._do_test(s) def test_case(self): @@ -2440,77 +2503,81 @@ class KwargPropagationTest(fixtures.TestBase): class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' - def test_correlated_update(self): # test against a straight text subquery - u = update(table1, values={ - table1.c.name: - text("(select name from mytable where id=mytable.id)")}) - self.assert_compile(u, - "UPDATE mytable SET name=(select name from mytable " - "where id=mytable.id)") + u = update( + table1, + values={ + table1.c.name: + text("(select name from mytable where id=mytable.id)") + } + ) + self.assert_compile( + u, + "UPDATE mytable SET name=(select name from mytable " + "where id=mytable.id)") mt = table1.alias() u = update(table1, values={ - table1.c.name: - select([mt.c.name], mt.c.myid == table1.c.myid) - }) - self.assert_compile(u, - "UPDATE mytable SET name=(SELECT mytable_1.name FROM " - "mytable AS mytable_1 WHERE " - "mytable_1.myid = mytable.myid)") + table1.c.name: + select([mt.c.name], mt.c.myid == table1.c.myid) + }) + self.assert_compile( + u, "UPDATE mytable SET name=(SELECT mytable_1.name FROM " + "mytable AS mytable_1 WHERE " + "mytable_1.myid = mytable.myid)") # test against a regular constructed subquery s = select([table2], table2.c.otherid == table1.c.myid) u = update(table1, table1.c.name == 'jack', values={table1.c.name: s}) - self.assert_compile(u, - "UPDATE mytable SET name=(SELECT myothertable.otherid, " - "myothertable.othername FROM myothertable WHERE " - "myothertable.otherid = mytable.myid) " - "WHERE mytable.name = :name_1") + self.assert_compile( + u, "UPDATE mytable SET name=(SELECT myothertable.otherid, " + "myothertable.othername FROM myothertable WHERE " + "myothertable.otherid = mytable.myid) " + "WHERE mytable.name = :name_1") # test a non-correlated WHERE clause s = select([table2.c.othername], table2.c.otherid == 7) u = update(table1, table1.c.name == s) self.assert_compile(u, - "UPDATE mytable SET myid=:myid, name=:name, " - "description=:description WHERE mytable.name = " - "(SELECT myothertable.othername FROM myothertable " - "WHERE myothertable.otherid = :otherid_1)") + "UPDATE mytable SET myid=:myid, name=:name, " + "description=:description WHERE mytable.name = " + "(SELECT myothertable.othername FROM myothertable " + "WHERE myothertable.otherid = :otherid_1)") # test one that is actually correlated... s = select([table2.c.othername], table2.c.otherid == table1.c.myid) u = table1.update(table1.c.name == s) self.assert_compile(u, - "UPDATE mytable SET myid=:myid, name=:name, " - "description=:description WHERE mytable.name = " - "(SELECT myothertable.othername FROM myothertable " - "WHERE myothertable.otherid = mytable.myid)") + "UPDATE mytable SET myid=:myid, name=:name, " + "description=:description WHERE mytable.name = " + "(SELECT myothertable.othername FROM myothertable " + "WHERE myothertable.otherid = mytable.myid)") # test correlated FROM implicit in WHERE and SET clauses u = table1.update().values(name=table2.c.othername)\ .where(table2.c.otherid == table1.c.myid) - self.assert_compile(u, - "UPDATE mytable SET name=myothertable.othername " - "FROM myothertable WHERE myothertable.otherid = mytable.myid") + self.assert_compile( + u, "UPDATE mytable SET name=myothertable.othername " + "FROM myothertable WHERE myothertable.otherid = mytable.myid") u = table1.update().values(name='foo')\ .where(table2.c.otherid == table1.c.myid) - self.assert_compile(u, - "UPDATE mytable SET name=:name " - "FROM myothertable WHERE myothertable.otherid = mytable.myid") + self.assert_compile( + u, "UPDATE mytable SET name=:name " + "FROM myothertable WHERE myothertable.otherid = mytable.myid") self.assert_compile(u, - "UPDATE mytable SET name=:name " - "FROM mytable, myothertable WHERE " - "myothertable.otherid = mytable.myid", - dialect=mssql.dialect()) + "UPDATE mytable SET name=:name " + "FROM mytable, myothertable WHERE " + "myothertable.otherid = mytable.myid", + dialect=mssql.dialect()) self.assert_compile(u.where(table2.c.othername == mt.c.name), - "UPDATE mytable SET name=:name " - "FROM mytable, myothertable, mytable AS mytable_1 " - "WHERE myothertable.otherid = mytable.myid " - "AND myothertable.othername = mytable_1.name", - dialect=mssql.dialect()) + "UPDATE mytable SET name=:name " + "FROM mytable, myothertable, mytable AS mytable_1 " + "WHERE myothertable.otherid = mytable.myid " + "AND myothertable.othername = mytable_1.name", + dialect=mssql.dialect()) def test_binds_that_match_columns(self): """test bind params named after column names @@ -2527,29 +2594,44 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises(exc.CompileError, u.values(x=7).compile) self.assert_compile(u.values(y=7), - "UPDATE foo SET y=:y WHERE foo.x = :x") + "UPDATE foo SET y=:y WHERE foo.x = :x") assert_raises(exc.CompileError, - u.values(x=7).compile, column_keys=['x', 'y']) + u.values(x=7).compile, column_keys=['x', 'y']) assert_raises(exc.CompileError, u.compile, column_keys=['x', 'y']) - self.assert_compile(u.values(x=3 + bindparam('x')), - "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x") + self.assert_compile( + u.values( + x=3 + + bindparam('x')), + "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x") - self.assert_compile(u.values(x=3 + bindparam('x')), - "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x", - params={'x': 1}) + self.assert_compile( + u.values( + x=3 + + bindparam('x')), + "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x", + params={ + 'x': 1}) - self.assert_compile(u.values(x=3 + bindparam('x')), - "UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x", - params={'x': 1, 'y': 2}) + self.assert_compile( + u.values( + x=3 + + bindparam('x')), + "UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x", + params={ + 'x': 1, + 'y': 2}) i = t.insert().values(x=3 + bindparam('x')) self.assert_compile(i, - "INSERT INTO foo (x) VALUES ((:param_1 + :x))") - self.assert_compile(i, - "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)", - params={'x': 1, 'y': 2}) + "INSERT INTO foo (x) VALUES ((:param_1 + :x))") + self.assert_compile( + i, + "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)", + params={ + 'x': 1, + 'y': 2}) i = t.insert().values(x=bindparam('y')) self.assert_compile(i, "INSERT INTO foo (x) VALUES (:y)") @@ -2562,15 +2644,23 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): i = t.insert().values(x=3 + bindparam('x2')) self.assert_compile(i, - "INSERT INTO foo (x) VALUES ((:param_1 + :x2))") - self.assert_compile(i, - "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={}) - self.assert_compile(i, - "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)", - params={'x': 1, 'y': 2}) - self.assert_compile(i, - "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)", - params={'x2': 1, 'y': 2}) + "INSERT INTO foo (x) VALUES ((:param_1 + :x2))") + self.assert_compile( + i, + "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", + params={}) + self.assert_compile( + i, + "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)", + params={ + 'x': 1, + 'y': 2}) + self.assert_compile( + i, + "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)", + params={ + 'x2': 1, + 'y': 2}) def test_unconsumed_names(self): t = table("t", column("x"), column("y")) @@ -2590,7 +2680,7 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): exc.CompileError, "Unconsumed column names: j", t.update().values(x=5, j=7).values({t2.c.z: 5}). - where(t.c.x == t2.c.q).compile, + where(t.c.x == t2.c.q).compile, ) # bindparam names don't get counted @@ -2615,7 +2705,6 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): column_keys=['j'] ) - def test_labels_no_collision(self): t = table('foo', column('id'), column('foo_id')) @@ -2630,12 +2719,14 @@ class CRUDTest(fixtures.TestBase, AssertsCompiledSQL): "UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :foo_id_1" ) + class DDLTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' def _illegal_type_fixture(self): class MyType(types.TypeEngine): pass + @compiles(MyType) def compile(element, compiler, **kw): raise exc.CompileError("Couldn't compile type") @@ -2644,8 +2735,8 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): def test_reraise_of_column_spec_issue(self): MyType = self._illegal_type_fixture() t1 = Table('t', MetaData(), - Column('x', MyType()) - ) + Column('x', MyType()) + ) assert_raises_message( exc.CompileError, r"\(in table 't', column 'x'\): Couldn't compile type", @@ -2655,8 +2746,8 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): def test_reraise_of_column_spec_issue_unicode(self): MyType = self._illegal_type_fixture() t1 = Table('t', MetaData(), - Column(u('méil'), MyType()) - ) + Column(u('méil'), MyType()) + ) assert_raises_message( exc.CompileError, u(r"\(in table 't', column 'méil'\): Couldn't compile type"), @@ -2666,8 +2757,8 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): def test_system_flag(self): m = MetaData() t = Table('t', m, Column('x', Integer), - Column('y', Integer, system=True), - Column('z', Integer)) + Column('y', Integer, system=True), + Column('z', Integer)) self.assert_compile( schema.CreateTable(t), "CREATE TABLE t (x INTEGER, z INTEGER)" @@ -2686,58 +2777,65 @@ class InlineDefaultTest(fixtures.TestBase, AssertsCompiledSQL): def test_insert(self): m = MetaData() foo = Table('foo', m, - Column('id', Integer)) + Column('id', Integer)) t = Table('test', m, - Column('col1', Integer, default=func.foo(1)), - Column('col2', Integer, default=select( - [func.coalesce(func.max(foo.c.id))])), - ) + Column('col1', Integer, default=func.foo(1)), + Column('col2', Integer, default=select( + [func.coalesce(func.max(foo.c.id))])), + ) - self.assert_compile(t.insert(inline=True, values={}), - "INSERT INTO test (col1, col2) VALUES (foo(:foo_1), " - "(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM " - "foo))") + self.assert_compile( + t.insert( + inline=True, values={}), + "INSERT INTO test (col1, col2) VALUES (foo(:foo_1), " + "(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM " + "foo))") def test_update(self): m = MetaData() foo = Table('foo', m, - Column('id', Integer)) + Column('id', Integer)) t = Table('test', m, - Column('col1', Integer, onupdate=func.foo(1)), - Column('col2', Integer, onupdate=select( - [func.coalesce(func.max(foo.c.id))])), - Column('col3', String(30)) - ) + Column('col1', Integer, onupdate=func.foo(1)), + Column('col2', Integer, onupdate=select( + [func.coalesce(func.max(foo.c.id))])), + Column('col3', String(30)) + ) self.assert_compile(t.update(inline=True, values={'col3': 'foo'}), - "UPDATE test SET col1=foo(:foo_1), col2=(SELECT " - "coalesce(max(foo.id)) AS coalesce_1 FROM foo), " - "col3=:col3") + "UPDATE test SET col1=foo(:foo_1), col2=(SELECT " + "coalesce(max(foo.id)) AS coalesce_1 FROM foo), " + "col3=:col3") + class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = 'default' def test_select(self): self.assert_compile(table4.select(), - "SELECT remote_owner.remotetable.rem_id, " - "remote_owner.remotetable.datatype_id," - " remote_owner.remotetable.value " - "FROM remote_owner.remotetable") - - self.assert_compile(table4.select(and_(table4.c.datatype_id == 7, - table4.c.value == 'hi')), - "SELECT remote_owner.remotetable.rem_id, " - "remote_owner.remotetable.datatype_id," - " remote_owner.remotetable.value " - "FROM remote_owner.remotetable WHERE " - "remote_owner.remotetable.datatype_id = :datatype_id_1 AND" - " remote_owner.remotetable.value = :value_1") + "SELECT remote_owner.remotetable.rem_id, " + "remote_owner.remotetable.datatype_id," + " remote_owner.remotetable.value " + "FROM remote_owner.remotetable") + + self.assert_compile( + table4.select( + and_( + table4.c.datatype_id == 7, + table4.c.value == 'hi')), + "SELECT remote_owner.remotetable.rem_id, " + "remote_owner.remotetable.datatype_id," + " remote_owner.remotetable.value " + "FROM remote_owner.remotetable WHERE " + "remote_owner.remotetable.datatype_id = :datatype_id_1 AND" + " remote_owner.remotetable.value = :value_1") s = table4.select(and_(table4.c.datatype_id == 7, - table4.c.value == 'hi'), use_labels=True) - self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS" + table4.c.value == 'hi'), use_labels=True) + self.assert_compile( + s, "SELECT remote_owner.remotetable.rem_id AS" " remote_owner_remotetable_rem_id, " "remote_owner.remotetable.datatype_id AS" " remote_owner_remotetable_datatype_id, " @@ -2749,22 +2847,22 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): # multi-part schema name self.assert_compile(table5.select(), - 'SELECT "dbo.remote_owner".remotetable.rem_id, ' - '"dbo.remote_owner".remotetable.datatype_id, ' - '"dbo.remote_owner".remotetable.value ' - 'FROM "dbo.remote_owner".remotetable' - ) + 'SELECT "dbo.remote_owner".remotetable.rem_id, ' + '"dbo.remote_owner".remotetable.datatype_id, ' + '"dbo.remote_owner".remotetable.value ' + 'FROM "dbo.remote_owner".remotetable' + ) # multi-part schema name labels - convert '.' to '_' self.assert_compile(table5.select(use_labels=True), - 'SELECT "dbo.remote_owner".remotetable.rem_id AS' - ' dbo_remote_owner_remotetable_rem_id, ' - '"dbo.remote_owner".remotetable.datatype_id' - ' AS dbo_remote_owner_remotetable_datatype_id,' - ' "dbo.remote_owner".remotetable.value AS ' - 'dbo_remote_owner_remotetable_value FROM' - ' "dbo.remote_owner".remotetable' - ) + 'SELECT "dbo.remote_owner".remotetable.rem_id AS' + ' dbo_remote_owner_remotetable_rem_id, ' + '"dbo.remote_owner".remotetable.datatype_id' + ' AS dbo_remote_owner_remotetable_datatype_id,' + ' "dbo.remote_owner".remotetable.value AS ' + 'dbo_remote_owner_remotetable_value FROM' + ' "dbo.remote_owner".remotetable' + ) def test_alias(self): a = alias(table4, 'remtable') @@ -2776,17 +2874,16 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): def test_update(self): self.assert_compile( - table4.update(table4.c.value == 'test', - values={table4.c.datatype_id: 12}), - "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id " - "WHERE remote_owner.remotetable.value = :value_1") + table4.update(table4.c.value == 'test', + values={table4.c.datatype_id: 12}), + "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id " + "WHERE remote_owner.remotetable.value = :value_1") def test_insert(self): self.assert_compile(table4.insert(values=(2, 5, 'test')), - "INSERT INTO remote_owner.remotetable " - "(rem_id, datatype_id, value) VALUES " - "(:rem_id, :datatype_id, :value)") - + "INSERT INTO remote_owner.remotetable " + "(rem_id, datatype_id, value) VALUES " + "(:rem_id, :datatype_id, :value)") class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): @@ -2794,7 +2891,7 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): def test_dont_overcorrelate(self): self.assert_compile(select([table1], from_obj=[table1, - table1.select()]), + table1.select()]), "SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable, (SELECT " "mytable.myid AS myid, mytable.name AS " @@ -2808,188 +2905,191 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): def _assert_where_correlated(self, stmt): self.assert_compile( - stmt, - "SELECT t2.a FROM t2 WHERE t2.a = " - "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)") + stmt, + "SELECT t2.a FROM t2 WHERE t2.a = " + "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)") def _assert_where_all_correlated(self, stmt): self.assert_compile( - stmt, - "SELECT t1.a, t2.a FROM t1, t2 WHERE t2.a = " - "(SELECT t1.a WHERE t1.a = t2.a)") + stmt, + "SELECT t1.a, t2.a FROM t1, t2 WHERE t2.a = " + "(SELECT t1.a WHERE t1.a = t2.a)") # note there's no more "backwards" correlation after # we've done #2746 - #def _assert_where_backwards_correlated(self, stmt): + # def _assert_where_backwards_correlated(self, stmt): # self.assert_compile( # stmt, # "SELECT t2.a FROM t2 WHERE t2.a = " # "(SELECT t1.a FROM t2 WHERE t1.a = t2.a)") - #def _assert_column_backwards_correlated(self, stmt): + # def _assert_column_backwards_correlated(self, stmt): # self.assert_compile(stmt, # "SELECT t2.a, (SELECT t1.a FROM t2 WHERE t1.a = t2.a) " # "AS anon_1 FROM t2") def _assert_column_correlated(self, stmt): - self.assert_compile(stmt, - "SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) " - "AS anon_1 FROM t2") + self.assert_compile( + stmt, + "SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) " + "AS anon_1 FROM t2") def _assert_column_all_correlated(self, stmt): - self.assert_compile(stmt, - "SELECT t1.a, t2.a, " - "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2") - + self.assert_compile( + stmt, + "SELECT t1.a, t2.a, " + "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2") def _assert_having_correlated(self, stmt): self.assert_compile(stmt, - "SELECT t2.a FROM t2 HAVING t2.a = " - "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)") + "SELECT t2.a FROM t2 HAVING t2.a = " + "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)") def _assert_from_uncorrelated(self, stmt): - self.assert_compile(stmt, - "SELECT t2.a, anon_1.a FROM t2, " - "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1") + self.assert_compile( + stmt, + "SELECT t2.a, anon_1.a FROM t2, " + "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1") def _assert_from_all_uncorrelated(self, stmt): - self.assert_compile(stmt, - "SELECT t1.a, t2.a, anon_1.a FROM t1, t2, " - "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1") + self.assert_compile( + stmt, + "SELECT t1.a, t2.a, anon_1.a FROM t1, t2, " + "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1") def _assert_where_uncorrelated(self, stmt): self.assert_compile(stmt, - "SELECT t2.a FROM t2 WHERE t2.a = " - "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)") + "SELECT t2.a FROM t2 WHERE t2.a = " + "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)") def _assert_column_uncorrelated(self, stmt): self.assert_compile(stmt, - "SELECT t2.a, (SELECT t1.a FROM t1, t2 " - "WHERE t1.a = t2.a) AS anon_1 FROM t2") + "SELECT t2.a, (SELECT t1.a FROM t1, t2 " + "WHERE t1.a = t2.a) AS anon_1 FROM t2") def _assert_having_uncorrelated(self, stmt): self.assert_compile(stmt, - "SELECT t2.a FROM t2 HAVING t2.a = " - "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)") + "SELECT t2.a FROM t2 HAVING t2.a = " + "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)") def _assert_where_single_full_correlated(self, stmt): self.assert_compile(stmt, - "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)") + "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)") def test_correlate_semiauto_where(self): t1, t2, s1 = self._fixture() self._assert_where_correlated( - select([t2]).where(t2.c.a == s1.correlate(t2))) + select([t2]).where(t2.c.a == s1.correlate(t2))) def test_correlate_semiauto_column(self): t1, t2, s1 = self._fixture() self._assert_column_correlated( - select([t2, s1.correlate(t2).as_scalar()])) + select([t2, s1.correlate(t2).as_scalar()])) def test_correlate_semiauto_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( - select([t2, s1.correlate(t2).alias()])) + select([t2, s1.correlate(t2).alias()])) def test_correlate_semiauto_having(self): t1, t2, s1 = self._fixture() self._assert_having_correlated( - select([t2]).having(t2.c.a == s1.correlate(t2))) + select([t2]).having(t2.c.a == s1.correlate(t2))) def test_correlate_except_inclusion_where(self): t1, t2, s1 = self._fixture() self._assert_where_correlated( - select([t2]).where(t2.c.a == s1.correlate_except(t1))) + select([t2]).where(t2.c.a == s1.correlate_except(t1))) def test_correlate_except_exclusion_where(self): t1, t2, s1 = self._fixture() self._assert_where_uncorrelated( - select([t2]).where(t2.c.a == s1.correlate_except(t2))) + select([t2]).where(t2.c.a == s1.correlate_except(t2))) def test_correlate_except_inclusion_column(self): t1, t2, s1 = self._fixture() self._assert_column_correlated( - select([t2, s1.correlate_except(t1).as_scalar()])) + select([t2, s1.correlate_except(t1).as_scalar()])) def test_correlate_except_exclusion_column(self): t1, t2, s1 = self._fixture() self._assert_column_uncorrelated( - select([t2, s1.correlate_except(t2).as_scalar()])) + select([t2, s1.correlate_except(t2).as_scalar()])) def test_correlate_except_inclusion_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( - select([t2, s1.correlate_except(t1).alias()])) + select([t2, s1.correlate_except(t1).alias()])) def test_correlate_except_exclusion_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( - select([t2, s1.correlate_except(t2).alias()])) + select([t2, s1.correlate_except(t2).alias()])) def test_correlate_except_none(self): t1, t2, s1 = self._fixture() self._assert_where_all_correlated( - select([t1, t2]).where(t2.c.a == s1.correlate_except(None))) + select([t1, t2]).where(t2.c.a == s1.correlate_except(None))) def test_correlate_except_having(self): t1, t2, s1 = self._fixture() self._assert_having_correlated( - select([t2]).having(t2.c.a == s1.correlate_except(t1))) + select([t2]).having(t2.c.a == s1.correlate_except(t1))) def test_correlate_auto_where(self): t1, t2, s1 = self._fixture() self._assert_where_correlated( - select([t2]).where(t2.c.a == s1)) + select([t2]).where(t2.c.a == s1)) def test_correlate_auto_column(self): t1, t2, s1 = self._fixture() self._assert_column_correlated( - select([t2, s1.as_scalar()])) + select([t2, s1.as_scalar()])) def test_correlate_auto_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( - select([t2, s1.alias()])) + select([t2, s1.alias()])) def test_correlate_auto_having(self): t1, t2, s1 = self._fixture() self._assert_having_correlated( - select([t2]).having(t2.c.a == s1)) + select([t2]).having(t2.c.a == s1)) def test_correlate_disabled_where(self): t1, t2, s1 = self._fixture() self._assert_where_uncorrelated( - select([t2]).where(t2.c.a == s1.correlate(None))) + select([t2]).where(t2.c.a == s1.correlate(None))) def test_correlate_disabled_column(self): t1, t2, s1 = self._fixture() self._assert_column_uncorrelated( - select([t2, s1.correlate(None).as_scalar()])) + select([t2, s1.correlate(None).as_scalar()])) def test_correlate_disabled_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( - select([t2, s1.correlate(None).alias()])) + select([t2, s1.correlate(None).alias()])) def test_correlate_disabled_having(self): t1, t2, s1 = self._fixture() self._assert_having_uncorrelated( - select([t2]).having(t2.c.a == s1.correlate(None))) + select([t2]).having(t2.c.a == s1.correlate(None))) def test_correlate_all_where(self): t1, t2, s1 = self._fixture() self._assert_where_all_correlated( - select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2))) + select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2))) def test_correlate_all_column(self): t1, t2, s1 = self._fixture() self._assert_column_all_correlated( - select([t1, t2, s1.correlate(t1, t2).as_scalar()])) + select([t1, t2, s1.correlate(t1, t2).as_scalar()])) def test_correlate_all_from(self): t1, t2, s1 = self._fixture() self._assert_from_all_uncorrelated( - select([t1, t2, s1.correlate(t1, t2).alias()])) + select([t1, t2, s1.correlate(t1, t2).alias()])) def test_correlate_where_all_unintentional(self): t1, t2, s1 = self._fixture() @@ -3012,8 +3112,8 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): s = select([t1.c.a]) s2 = select([t1]).where(t1.c.a == s) self.assert_compile(s2, - "SELECT t1.a FROM t1 WHERE t1.a = " - "(SELECT t1.a FROM t1)") + "SELECT t1.a FROM t1 WHERE t1.a = " + "(SELECT t1.a FROM t1)") def test_correlate_semiauto_where_singlefrom(self): t1, t2, s1 = self._fixture() @@ -3035,7 +3135,7 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): # new as of #2668 t1, t2, s1 = self._fixture() self.assert_compile(s1.correlate(t1, t2), - "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a") + "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a") def test_correlate_except_froms(self): # new as of #2748 @@ -3047,23 +3147,24 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): s2 = select([func.foo(s.c.b)]).as_scalar() s3 = select([t1], order_by=s2) - self.assert_compile(s3, - "SELECT t1.a FROM t1 ORDER BY " + self.assert_compile( + s3, "SELECT t1.a FROM t1 ORDER BY " "(SELECT foo(s.b) AS foo_1 FROM " - "(SELECT t2.b AS b FROM t2 WHERE t1.a = t2.a) AS s)" - ) + "(SELECT t2.b AS b FROM t2 WHERE t1.a = t2.a) AS s)") def test_multilevel_froms_correlation(self): # new as of #2748 p = table('parent', column('id')) c = table('child', column('id'), column('parent_id'), column('pos')) - s = c.select().where(c.c.parent_id == p.c.id).order_by(c.c.pos).limit(1) + s = c.select().where( + c.c.parent_id == p.c.id).order_by( + c.c.pos).limit(1) s = s.correlate(p) s = exists().select_from(s).where(s.c.id == 1) s = select([p]).where(s) - self.assert_compile(s, - "SELECT parent.id FROM parent WHERE EXISTS (SELECT * " + self.assert_compile( + s, "SELECT parent.id FROM parent WHERE EXISTS (SELECT * " "FROM (SELECT child.id AS id, child.parent_id AS parent_id, " "child.pos AS pos FROM child WHERE child.parent_id = parent.id " "ORDER BY child.pos LIMIT :param_1) WHERE id = :id_1)") @@ -3077,7 +3178,8 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): s = select([t1]).where(t1.c.x == t2.c.y).\ where(t2.c.y == t3.c.z).correlate_except(t1) - self.assert_compile(s, + self.assert_compile( + s, "SELECT t1.x FROM t1, t2, t3 WHERE t1.x = t2.y AND t2.y = t3.z") def test_multilevel_implicit_correlation_disabled(self): @@ -3092,13 +3194,13 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): s3 = select([t1]).where(t1.c.x == s2.as_scalar()) self.assert_compile(s3, - "SELECT t1.x FROM t1 " - "WHERE t1.x = (SELECT t3.z " - "FROM t3 " - "WHERE t3.z = (SELECT t1.x " - "FROM t1, t2 " - "WHERE t1.x = t2.y))" - ) + "SELECT t1.x FROM t1 " + "WHERE t1.x = (SELECT t3.z " + "FROM t3 " + "WHERE t3.z = (SELECT t1.x " + "FROM t1, t2 " + "WHERE t1.x = t2.y))" + ) def test_from_implicit_correlation_disabled(self): # test that implicit correlation with immediate and @@ -3112,10 +3214,11 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): s3 = select([t1, s2]) self.assert_compile(s3, - "SELECT t1.x, y, x FROM t1, " - "(SELECT t2.y AS y, x FROM t2, " - "(SELECT t1.x AS x FROM t1, t2 WHERE t1.x = t2.y))" - ) + "SELECT t1.x, y, x FROM t1, " + "(SELECT t2.y AS y, x FROM t2, " + "(SELECT t1.x AS x FROM t1, t2 WHERE t1.x = t2.y))" + ) + class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = default.DefaultDialect(supports_native_boolean=True) @@ -3123,7 +3226,7 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): def _fixture(self): m = MetaData() return Table('foo', m, - Column('id', Integer)) + Column('id', Integer)) bool_table = table('t', column('x', Boolean)) @@ -3194,17 +3297,19 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): class ResultMapTest(fixtures.TestBase): + """test the behavior of the 'entry stack' and the determination when the result_map needs to be populated. """ + def test_compound_populates(self): t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer)) stmt = select([t]).union(select([t])) comp = stmt.compile() eq_( comp.result_map, - {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type), + {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type), 'b': ('b', (t.c.b, 'b', 'b'), t.c.b.type)} ) @@ -3215,7 +3320,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile() eq_( comp.result_map, - {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)} + {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)} ) def test_compound_only_top_populates(self): @@ -3224,7 +3329,7 @@ class ResultMapTest(fixtures.TestBase): comp = stmt.compile() eq_( comp.result_map, - {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)}, + {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)}, ) def test_label_plus_element(self): @@ -3236,22 +3341,22 @@ class ResultMapTest(fixtures.TestBase): tc_anon_label = comp.result_map['a_1'][1][0] eq_( comp.result_map, - { + { 'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type), 'bar': ('bar', (l1, 'bar'), l1.type), 'a_1': ('%%(%d a)s' % id(tc), (tc_anon_label, 'a_1'), tc.type), - }, + }, ) def test_label_conflict_union(self): t1 = Table('t1', MetaData(), Column('a', Integer), - Column('b', Integer)) + Column('b', Integer)) t2 = Table('t2', MetaData(), Column('t1_a', Integer)) union = select([t2]).union(select([t2])).alias() t1_alias = t1.alias() stmt = select([t1, t1_alias]).select_from( - t1.join(union, t1.c.a == union.c.t1_a)).apply_labels() + t1.join(union, t1.c.a == union.c.t1_a)).apply_labels() comp = stmt.compile() eq_( set(comp.result_map), |