diff options
Diffstat (limited to 'test/sql/test_compiler.py')
| -rw-r--r-- | test/sql/test_compiler.py | 3542 |
1 files changed, 1974 insertions, 1568 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index f543b8677..f3305743a 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -10,147 +10,201 @@ styling and coherent test organization. """ -from sqlalchemy.testing import eq_, is_, assert_raises, \ - assert_raises_message, eq_ignore_whitespace +from sqlalchemy.testing import ( + eq_, + is_, + assert_raises, + assert_raises_message, + eq_ignore_whitespace, +) from sqlalchemy import testing from sqlalchemy.testing import fixtures, AssertsCompiledSQL -from sqlalchemy import Integer, String, MetaData, Table, Column, select, \ - func, not_, cast, text, tuple_, exists, update, bindparam,\ - literal, and_, null, type_coerce, alias, or_, literal_column,\ - Float, TIMESTAMP, Numeric, Date, Text, union, except_,\ - intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\ - over, subquery, case, true, CheckConstraint, Sequence +from sqlalchemy import ( + Integer, + String, + MetaData, + Table, + Column, + select, + func, + not_, + cast, + text, + tuple_, + exists, + update, + bindparam, + literal, + and_, + null, + type_coerce, + alias, + or_, + literal_column, + Float, + TIMESTAMP, + Numeric, + Date, + Text, + union, + except_, + intersect, + union_all, + Boolean, + distinct, + join, + outerjoin, + asc, + desc, + over, + subquery, + case, + true, + CheckConstraint, + Sequence, +) import decimal from sqlalchemy.util import u from sqlalchemy import exc, sql, util, types, schema from sqlalchemy.sql import table, column, label from sqlalchemy.sql.expression import ClauseList, _literal_as_text, HasPrefixes from sqlalchemy.engine import default -from sqlalchemy.dialects import mysql, mssql, postgresql, oracle, \ - sqlite, sybase +from sqlalchemy.dialects import ( + mysql, + mssql, + postgresql, + oracle, + sqlite, + sybase, +) from sqlalchemy.dialects.postgresql.base import PGCompiler, PGDialect from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql import compiler -table1 = table('mytable', - column('myid', Integer), - column('name', String), - column('description', String), - ) +table1 = table( + "mytable", + column("myid", Integer), + column("name", String), + column("description", String), +) table2 = table( - 'myothertable', - column('otherid', Integer), - column('othername', String), + "myothertable", column("otherid", Integer), column("othername", String) ) table3 = table( - 'thirdtable', - column('userid', Integer), - column('otherstuff', String), + "thirdtable", column("userid", Integer), column("otherstuff", String) ) metadata = MetaData() # table with a schema table4 = Table( - 'remotetable', metadata, - Column('rem_id', Integer, primary_key=True), - Column('datatype_id', Integer), - Column('value', String(20)), - schema='remote_owner' + "remotetable", + metadata, + Column("rem_id", Integer, primary_key=True), + Column("datatype_id", Integer), + Column("value", String(20)), + schema="remote_owner", ) # table with a 'multipart' schema table5 = Table( - 'remotetable', metadata, - Column('rem_id', Integer, primary_key=True), - Column('datatype_id', Integer), - Column('value', String(20)), - schema='dbo.remote_owner' + "remotetable", + metadata, + Column("rem_id", Integer, primary_key=True), + Column("datatype_id", Integer), + Column("value", String(20)), + schema="dbo.remote_owner", ) -users = table('users', - column('user_id'), - column('user_name'), - column('password'), - ) +users = table( + "users", column("user_id"), column("user_name"), column("password") +) -addresses = table('addresses', - column('address_id'), - column('user_id'), - column('street'), - column('city'), - column('state'), - column('zip') - ) +addresses = table( + "addresses", + column("address_id"), + column("user_id"), + column("street"), + column("city"), + column("state"), + column("zip"), +) -keyed = Table('keyed', metadata, - Column('x', Integer, key='colx'), - Column('y', Integer, key='coly'), - Column('z', Integer), - ) +keyed = Table( + "keyed", + metadata, + Column("x", Integer, key="colx"), + Column("y", Integer, key="coly"), + Column("z", Integer), +) class SelectTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" def test_attribute_sanity(self): - assert hasattr(table1, 'c') - assert hasattr(table1.select(), 'c') - assert not hasattr(table1.c.myid.self_group(), 'columns') - assert hasattr(table1.select().self_group(), 'columns') - assert not hasattr(table1.c.myid, 'columns') - assert not hasattr(table1.c.myid, 'c') - assert not hasattr(table1.select().c.myid, 'c') - assert not hasattr(table1.select().c.myid, 'columns') - assert not hasattr(table1.alias().c.myid, 'columns') - assert not hasattr(table1.alias().c.myid, 'c') + assert hasattr(table1, "c") + assert hasattr(table1.select(), "c") + assert not hasattr(table1.c.myid.self_group(), "columns") + assert hasattr(table1.select().self_group(), "columns") + assert not hasattr(table1.c.myid, "columns") + assert not hasattr(table1.c.myid, "c") + assert not hasattr(table1.select().c.myid, "c") + assert not hasattr(table1.select().c.myid, "columns") + assert not hasattr(table1.alias().c.myid, "columns") + assert not hasattr(table1.alias().c.myid, "c") if util.compat.py32: assert_raises_message( exc.InvalidRequestError, - 'Scalar Select expression has no ' - 'columns; use this object directly within a ' - 'column-level expression.', + "Scalar Select expression has no " + "columns; use this object directly within a " + "column-level expression.", lambda: hasattr( - select([table1.c.myid]).as_scalar().self_group(), - 'columns')) + select([table1.c.myid]).as_scalar().self_group(), "columns" + ), + ) assert_raises_message( exc.InvalidRequestError, - 'Scalar Select expression has no ' - 'columns; use this object directly within a ' - 'column-level expression.', - lambda: hasattr(select([table1.c.myid]).as_scalar(), - 'columns')) + "Scalar Select expression has no " + "columns; use this object directly within a " + "column-level expression.", + lambda: hasattr( + select([table1.c.myid]).as_scalar(), "columns" + ), + ) else: assert not hasattr( - select([table1.c.myid]).as_scalar().self_group(), - 'columns') - assert not hasattr(select([table1.c.myid]).as_scalar(), 'columns') + select([table1.c.myid]).as_scalar().self_group(), "columns" + ) + assert not hasattr(select([table1.c.myid]).as_scalar(), "columns") def test_prefix_constructor(self): class Pref(HasPrefixes): - def _generate(self): return self - assert_raises(exc.ArgumentError, - Pref().prefix_with, - "some prefix", not_a_dialect=True - ) + + assert_raises( + exc.ArgumentError, + Pref().prefix_with, + "some prefix", + not_a_dialect=True, + ) def test_table_select(self): - self.assert_compile(table1.select(), - "SELECT mytable.myid, mytable.name, " - "mytable.description FROM mytable") + self.assert_compile( + table1.select(), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable", + ) self.assert_compile( - select( - [ - table1, - table2]), + select([table1, table2]), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername FROM mytable, " - "myothertable") + "myothertable", + ) def test_invalid_col_argument(self): assert_raises(exc.ArgumentError, select, table1) @@ -178,13 +232,12 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): exp1 = literal_column("Q") exp2 = literal_column("Y") self.assert_compile( - select([1]).limit(exp1).offset(exp2), - "SELECT 1 LIMIT Q OFFSET Y" + select([1]).limit(exp1).offset(exp2), "SELECT 1 LIMIT Q OFFSET Y" ) self.assert_compile( - select([1]).limit(bindparam('x')).offset(bindparam('y')), - "SELECT 1 LIMIT :x OFFSET :y" + select([1]).limit(bindparam("x")).offset(bindparam("y")), + "SELECT 1 LIMIT :x OFFSET :y", ) def test_limit_offset_no_int_coercion_two(self): @@ -196,14 +249,18 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): exc.CompileError, "This SELECT structure does not use a simple integer " "value for limit", - getattr, sel, "_limit" + getattr, + sel, + "_limit", ) assert_raises_message( exc.CompileError, "This SELECT structure does not use a simple integer " "value for offset", - getattr, sel, "_offset" + getattr, + sel, + "_offset", ) def test_limit_offset_no_int_coercion_three(self): @@ -215,37 +272,47 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): exc.CompileError, "This SELECT structure does not use a simple integer " "value for limit", - getattr, sel, "_limit" + getattr, + sel, + "_limit", ) assert_raises_message( exc.CompileError, "This SELECT structure does not use a simple integer " "value for offset", - getattr, sel, "_offset" + getattr, + sel, + "_offset", ) def test_limit_offset(self): for lim, offset, exp, params in [ - (5, 10, "LIMIT :param_1 OFFSET :param_2", - {'param_1': 5, 'param_2': 10}), - (None, 10, "LIMIT -1 OFFSET :param_1", {'param_1': 10}), - (5, None, "LIMIT :param_1", {'param_1': 5}), - (0, 0, "LIMIT :param_1 OFFSET :param_2", - {'param_1': 0, 'param_2': 0}), + ( + 5, + 10, + "LIMIT :param_1 OFFSET :param_2", + {"param_1": 5, "param_2": 10}, + ), + (None, 10, "LIMIT -1 OFFSET :param_1", {"param_1": 10}), + (5, None, "LIMIT :param_1", {"param_1": 5}), + ( + 0, + 0, + "LIMIT :param_1 OFFSET :param_2", + {"param_1": 0, "param_2": 0}, + ), ]: self.assert_compile( select([1]).limit(lim).offset(offset), "SELECT 1 " + exp, - checkparams=params + checkparams=params, ) def test_limit_offset_select_literal_binds(self): stmt = select([1]).limit(5).offset(6) self.assert_compile( - stmt, - "SELECT 1 LIMIT 5 OFFSET 6", - literal_binds=True + stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True ) def test_limit_offset_compound_select_literal_binds(self): @@ -253,25 +320,24 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( stmt, "SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6", - literal_binds=True + literal_binds=True, ) def test_select_precol_compile_ordering(self): - s1 = select([column('x')]).select_from(text('a')).limit(5).as_scalar() + s1 = select([column("x")]).select_from(text("a")).limit(5).as_scalar() s2 = select([s1]).limit(10) class MyCompiler(compiler.SQLCompiler): - def get_select_precolumns(self, select, **kw): result = "" if select._limit: result += "FIRST %s " % self.process( - literal( - select._limit), **kw) + literal(select._limit), **kw + ) if select._offset: result += "SKIP %s " % self.process( - literal( - select._offset), **kw) + literal(select._offset), **kw + ) return result def limit_clause(self, select, **kw): @@ -279,13 +345,13 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): dialect = default.DefaultDialect() dialect.statement_compiler = MyCompiler - dialect.paramstyle = 'qmark' + dialect.paramstyle = "qmark" dialect.positional = True self.assert_compile( s2, "SELECT FIRST ? (SELECT FIRST ? x FROM a) AS anon_1", checkpositional=(10, 5), - dialect=dialect + dialect=dialect, ) def test_from_subquery(self): @@ -293,16 +359,15 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): another select, for the purposes of selecting from the exported columns of that select.""" - s = select([table1], table1.c.name == 'jack') + s = select([table1], table1.c.name == "jack") self.assert_compile( - select( - [s], - s.c.myid == 7), + select([s], s.c.myid == 7), "SELECT myid, name, description FROM " "(SELECT mytable.myid AS myid, " "mytable.name AS name, mytable.description AS description " "FROM mytable " - "WHERE mytable.name = :name_1) WHERE myid = :myid_1") + "WHERE mytable.name = :name_1) WHERE myid = :myid_1", + ) sq = select([table1]) self.assert_compile( @@ -310,44 +375,42 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT myid, name, description FROM " "(SELECT mytable.myid AS myid, " "mytable.name AS name, mytable.description " - "AS description FROM mytable)" + "AS description FROM mytable)", ) - sq = select( - [table1], - ).alias('sq') + sq = select([table1]).alias("sq") self.assert_compile( sq.select(sq.c.myid == 7), "SELECT sq.myid, sq.name, sq.description FROM " "(SELECT mytable.myid AS myid, mytable.name AS name, " "mytable.description AS description FROM mytable) AS sq " - "WHERE sq.myid = :myid_1" + "WHERE sq.myid = :myid_1", ) sq = select( [table1, table2], and_(table1.c.myid == 7, table2.c.otherid == table1.c.myid), - use_labels=True - ).alias('sq') - - sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS "\ - "mytable_name, mytable.description AS mytable_description, "\ - "myothertable.otherid AS myothertable_otherid, "\ - "myothertable.othername AS myothertable_othername FROM "\ - "mytable, myothertable WHERE mytable.myid = :myid_1 AND "\ + use_labels=True, + ).alias("sq") + + sqstring = ( + "SELECT mytable.myid AS mytable_myid, mytable.name AS " + "mytable_name, mytable.description AS mytable_description, " + "myothertable.otherid AS myothertable_otherid, " + "myothertable.othername AS myothertable_othername FROM " + "mytable, myothertable WHERE mytable.myid = :myid_1 AND " "myothertable.otherid = mytable.myid" + ) self.assert_compile( sq.select(), "SELECT sq.mytable_myid, sq.mytable_name, " "sq.mytable_description, sq.myothertable_otherid, " - "sq.myothertable_othername FROM (%s) AS sq" % sqstring) + "sq.myothertable_othername FROM (%s) AS sq" % sqstring, + ) - sq2 = select( - [sq], - use_labels=True - ).alias('sq2') + sq2 = select([sq], use_labels=True).alias("sq2") self.assert_compile( sq2.select(), @@ -359,53 +422,53 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "sq.mytable_description AS sq_mytable_description, " "sq.myothertable_otherid AS sq_myothertable_otherid, " "sq.myothertable_othername AS sq_myothertable_othername " - "FROM (%s) AS sq) AS sq2" % sqstring) + "FROM (%s) AS sq) AS sq2" % sqstring, + ) def test_select_from_clauselist(self): self.assert_compile( - select([ClauseList(column('a'), column('b'))] - ).select_from(text('sometable')), - 'SELECT a, b FROM sometable' + select([ClauseList(column("a"), column("b"))]).select_from( + text("sometable") + ), + "SELECT a, b FROM sometable", ) def test_use_labels(self): self.assert_compile( select([table1.c.myid == 5], use_labels=True), - "SELECT mytable.myid = :myid_1 AS anon_1 FROM mytable" + "SELECT mytable.myid = :myid_1 AS anon_1 FROM mytable", ) self.assert_compile( - select([func.foo()], use_labels=True), - "SELECT foo() AS foo_1" + select([func.foo()], use_labels=True), "SELECT foo() AS foo_1" ) # this is native_boolean=False for default dialect self.assert_compile( select([not_(True)], use_labels=True), - "SELECT :param_1 = 0 AS anon_1" + "SELECT :param_1 = 0 AS anon_1", ) self.assert_compile( select([cast("data", Integer)], use_labels=True), - "SELECT CAST(:param_1 AS INTEGER) AS anon_1" + "SELECT CAST(:param_1 AS INTEGER) AS anon_1", ) self.assert_compile( - select([func.sum( - func.lala(table1.c.myid).label('foo')).label('bar')]), - "SELECT sum(lala(mytable.myid)) AS bar FROM mytable" + select( + [func.sum(func.lala(table1.c.myid).label("foo")).label("bar")] + ), + "SELECT sum(lala(mytable.myid)) AS bar FROM mytable", ) self.assert_compile( - select([keyed]), - "SELECT keyed.x, keyed.y" - ", keyed.z FROM keyed" + select([keyed]), "SELECT keyed.x, keyed.y" ", keyed.z FROM keyed" ) self.assert_compile( select([keyed]).apply_labels(), "SELECT keyed.x AS keyed_x, keyed.y AS " - "keyed_y, keyed.z AS keyed_z FROM keyed" + "keyed_y, keyed.z AS keyed_z FROM keyed", ) def test_paramstyles(self): @@ -414,40 +477,40 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( stmt, "select ?, ?, ? from sometable", - dialect=default.DefaultDialect(paramstyle='qmark') + dialect=default.DefaultDialect(paramstyle="qmark"), ) self.assert_compile( stmt, "select :foo, :bar, :bat from sometable", - dialect=default.DefaultDialect(paramstyle='named') + dialect=default.DefaultDialect(paramstyle="named"), ) self.assert_compile( stmt, "select %s, %s, %s from sometable", - dialect=default.DefaultDialect(paramstyle='format') + dialect=default.DefaultDialect(paramstyle="format"), ) self.assert_compile( stmt, "select :1, :2, :3 from sometable", - dialect=default.DefaultDialect(paramstyle='numeric') + dialect=default.DefaultDialect(paramstyle="numeric"), ) self.assert_compile( stmt, "select %(foo)s, %(bar)s, %(bat)s from sometable", - dialect=default.DefaultDialect(paramstyle='pyformat') + dialect=default.DefaultDialect(paramstyle="pyformat"), ) def test_anon_param_name_on_keys(self): self.assert_compile( keyed.insert(), "INSERT INTO keyed (x, y, z) VALUES (%(colx)s, %(coly)s, %(z)s)", - dialect=default.DefaultDialect(paramstyle='pyformat') + dialect=default.DefaultDialect(paramstyle="pyformat"), ) self.assert_compile( keyed.c.coly == 5, "keyed.y = %(coly_1)s", - checkparams={'coly_1': 5}, - dialect=default.DefaultDialect(paramstyle='pyformat') + checkparams={"coly_1": 5}, + dialect=default.DefaultDialect(paramstyle="pyformat"), ) def test_dupe_columns(self): @@ -455,51 +518,54 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): element identity, not rendered result.""" self.assert_compile( - select([column('a'), column('a'), column('a')]), - "SELECT a, a, a", dialect=default.DefaultDialect() + select([column("a"), column("a"), column("a")]), + "SELECT a, a, a", + dialect=default.DefaultDialect(), ) - c = column('a') + c = column("a") self.assert_compile( - select([c, c, c]), - "SELECT a", dialect=default.DefaultDialect() + select([c, c, c]), "SELECT a", dialect=default.DefaultDialect() ) - a, b = column('a'), column('b') + a, b = column("a"), column("b") self.assert_compile( select([a, b, b, b, a, a]), - "SELECT a, b", dialect=default.DefaultDialect() + "SELECT a, b", + dialect=default.DefaultDialect(), ) # using alternate keys. - a, b, c = Column('a', Integer, key='b'), \ - Column('b', Integer), \ - Column('c', Integer, key='a') + a, b, c = ( + Column("a", Integer, key="b"), + Column("b", Integer), + Column("c", Integer, key="a"), + ) self.assert_compile( select([a, b, c, a, b, c]), - "SELECT a, b, c", dialect=default.DefaultDialect() + "SELECT a, b, c", + dialect=default.DefaultDialect(), ) self.assert_compile( - select([bindparam('a'), bindparam('b'), bindparam('c')]), + select([bindparam("a"), bindparam("b"), bindparam("c")]), "SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3", - dialect=default.DefaultDialect(paramstyle='named') + dialect=default.DefaultDialect(paramstyle="named"), ) self.assert_compile( - select([bindparam('a'), bindparam('b'), bindparam('c')]), + select([bindparam("a"), bindparam("b"), bindparam("c")]), "SELECT ? AS anon_1, ? AS anon_2, ? AS anon_3", - dialect=default.DefaultDialect(paramstyle='qmark'), + dialect=default.DefaultDialect(paramstyle="qmark"), ) self.assert_compile( - select([column("a"), column("a"), column("a")]), - "SELECT a, a, a" + select([column("a"), column("a"), column("a")]), "SELECT a, a, a" ) - s = select([bindparam('a'), bindparam('b'), bindparam('c')]) - s = s.compile(dialect=default.DefaultDialect(paramstyle='qmark')) - eq_(s.positiontup, ['a', 'b', 'c']) + s = select([bindparam("a"), bindparam("b"), bindparam("c")]) + s = s.compile(dialect=default.DefaultDialect(paramstyle="qmark")) + eq_(s.positiontup, ["a", "b", "c"]) def test_nested_label_targeting(self): """test nested anonymous label generation. @@ -510,234 +576,275 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): s3 = select([s2], use_labels=True) s4 = s3.alias() s5 = select([s4], use_labels=True) - self.assert_compile(s5, - 'SELECT anon_1.anon_2_myid AS ' - 'anon_1_anon_2_myid, anon_1.anon_2_name AS ' - 'anon_1_anon_2_name, anon_1.anon_2_descript' - 'ion AS anon_1_anon_2_description FROM ' - '(SELECT anon_2.myid AS anon_2_myid, ' - 'anon_2.name AS anon_2_name, ' - 'anon_2.description AS anon_2_description ' - 'FROM (SELECT mytable.myid AS myid, ' - 'mytable.name AS name, mytable.description ' - 'AS description FROM mytable) AS anon_2) ' - 'AS anon_1') + self.assert_compile( + s5, + "SELECT anon_1.anon_2_myid AS " + "anon_1_anon_2_myid, anon_1.anon_2_name AS " + "anon_1_anon_2_name, anon_1.anon_2_descript" + "ion AS anon_1_anon_2_description FROM " + "(SELECT anon_2.myid AS anon_2_myid, " + "anon_2.name AS anon_2_name, " + "anon_2.description AS anon_2_description " + "FROM (SELECT mytable.myid AS myid, " + "mytable.name AS name, mytable.description " + "AS description FROM mytable) AS anon_2) " + "AS anon_1", + ) def test_nested_label_targeting_keyed(self): s1 = keyed.select() s2 = s1.alias() s3 = select([s2], use_labels=True) - self.assert_compile(s3, - "SELECT anon_1.x AS anon_1_x, " - "anon_1.y AS anon_1_y, " - "anon_1.z AS anon_1_z FROM " - "(SELECT keyed.x AS x, keyed.y " - "AS y, keyed.z AS z FROM keyed) AS anon_1") + self.assert_compile( + s3, + "SELECT anon_1.x AS anon_1_x, " + "anon_1.y AS anon_1_y, " + "anon_1.z AS anon_1_z FROM " + "(SELECT keyed.x AS x, keyed.y " + "AS y, keyed.z AS z FROM keyed) AS anon_1", + ) s4 = s3.alias() s5 = select([s4], use_labels=True) - self.assert_compile(s5, - "SELECT anon_1.anon_2_x AS anon_1_anon_2_x, " - "anon_1.anon_2_y AS anon_1_anon_2_y, " - "anon_1.anon_2_z AS anon_1_anon_2_z " - "FROM (SELECT anon_2.x AS anon_2_x, " - "anon_2.y AS anon_2_y, " - "anon_2.z AS anon_2_z FROM " - "(SELECT keyed.x AS x, keyed.y AS y, keyed.z " - "AS z FROM keyed) AS anon_2) AS anon_1" - ) + self.assert_compile( + s5, + "SELECT anon_1.anon_2_x AS anon_1_anon_2_x, " + "anon_1.anon_2_y AS anon_1_anon_2_y, " + "anon_1.anon_2_z AS anon_1_anon_2_z " + "FROM (SELECT anon_2.x AS anon_2_x, " + "anon_2.y AS anon_2_y, " + "anon_2.z AS anon_2_z FROM " + "(SELECT keyed.x AS x, keyed.y AS y, keyed.z " + "AS z FROM keyed) AS anon_2) AS anon_1", + ) def test_exists(self): s = select([table1.c.myid]).where(table1.c.myid == 5) - self.assert_compile(exists(s), - "EXISTS (SELECT mytable.myid FROM mytable " - "WHERE mytable.myid = :myid_1)" - ) - - self.assert_compile(exists(s.as_scalar()), - "EXISTS (SELECT mytable.myid FROM mytable " - "WHERE mytable.myid = :myid_1)" - ) - - self.assert_compile(exists([table1.c.myid], table1.c.myid - == 5).select(), - 'SELECT EXISTS (SELECT mytable.myid FROM ' - 'mytable WHERE mytable.myid = :myid_1) AS anon_1', - params={'mytable_myid': 5}) - self.assert_compile(select([table1, exists([1], - from_obj=table2)]), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description, EXISTS (SELECT 1 ' - 'FROM myothertable) AS anon_1 FROM mytable', - params={}) - self.assert_compile(select([table1, - exists([1], - from_obj=table2).label('foo')]), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description, EXISTS (SELECT 1 ' - 'FROM myothertable) AS foo FROM mytable', - params={}) + self.assert_compile( + exists(s), + "EXISTS (SELECT mytable.myid FROM mytable " + "WHERE mytable.myid = :myid_1)", + ) + + self.assert_compile( + exists(s.as_scalar()), + "EXISTS (SELECT mytable.myid FROM mytable " + "WHERE mytable.myid = :myid_1)", + ) + + self.assert_compile( + exists([table1.c.myid], table1.c.myid == 5).select(), + "SELECT EXISTS (SELECT mytable.myid FROM " + "mytable WHERE mytable.myid = :myid_1) AS anon_1", + params={"mytable_myid": 5}, + ) + self.assert_compile( + select([table1, exists([1], from_obj=table2)]), + "SELECT mytable.myid, mytable.name, " + "mytable.description, EXISTS (SELECT 1 " + "FROM myothertable) AS anon_1 FROM mytable", + params={}, + ) + self.assert_compile( + select([table1, exists([1], from_obj=table2).label("foo")]), + "SELECT mytable.myid, mytable.name, " + "mytable.description, EXISTS (SELECT 1 " + "FROM myothertable) AS foo FROM mytable", + params={}, + ) self.assert_compile( table1.select( - exists().where( - table2.c.otherid == table1.c.myid).correlate(table1)), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'EXISTS (SELECT * FROM myothertable WHERE ' - 'myothertable.otherid = mytable.myid)') + exists() + .where(table2.c.otherid == table1.c.myid) + .correlate(table1) + ), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable WHERE " + "EXISTS (SELECT * FROM myothertable WHERE " + "myothertable.otherid = mytable.myid)", + ) self.assert_compile( table1.select( - exists().where( - table2.c.otherid == table1.c.myid).correlate(table1)), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'EXISTS (SELECT * FROM myothertable WHERE ' - 'myothertable.otherid = mytable.myid)') + exists() + .where(table2.c.otherid == table1.c.myid) + .correlate(table1) + ), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable WHERE " + "EXISTS (SELECT * FROM myothertable WHERE " + "myothertable.otherid = mytable.myid)", + ) self.assert_compile( table1.select( - exists().where( - table2.c.otherid == table1.c.myid).correlate(table1) - ).replace_selectable( - table2, - table2.alias()), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'EXISTS (SELECT * FROM myothertable AS ' - 'myothertable_1 WHERE myothertable_1.otheri' - 'd = mytable.myid)') + exists() + .where(table2.c.otherid == table1.c.myid) + .correlate(table1) + ).replace_selectable(table2, table2.alias()), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable WHERE " + "EXISTS (SELECT * FROM myothertable AS " + "myothertable_1 WHERE myothertable_1.otheri" + "d = mytable.myid)", + ) self.assert_compile( table1.select( - exists().where( - table2.c.otherid == table1.c.myid).correlate(table1)). - select_from( - table1.join( - table2, - table1.c.myid == table2.c.otherid)). - replace_selectable( - table2, - table2.alias()), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable JOIN ' - 'myothertable AS myothertable_1 ON ' - 'mytable.myid = myothertable_1.otherid ' - 'WHERE EXISTS (SELECT * FROM myothertable ' - 'AS myothertable_1 WHERE ' - 'myothertable_1.otherid = mytable.myid)') - - self.assert_compile( - select([ - or_( - exists().where(table2.c.otherid == 'foo'), - exists().where(table2.c.otherid == 'bar') - ) - ]), + exists() + .where(table2.c.otherid == table1.c.myid) + .correlate(table1) + ) + .select_from( + table1.join(table2, table1.c.myid == table2.c.otherid) + ) + .replace_selectable(table2, table2.alias()), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable JOIN " + "myothertable AS myothertable_1 ON " + "mytable.myid = myothertable_1.otherid " + "WHERE EXISTS (SELECT * FROM myothertable " + "AS myothertable_1 WHERE " + "myothertable_1.otherid = mytable.myid)", + ) + + self.assert_compile( + select( + [ + or_( + exists().where(table2.c.otherid == "foo"), + exists().where(table2.c.otherid == "bar"), + ) + ] + ), "SELECT (EXISTS (SELECT * FROM myothertable " "WHERE myothertable.otherid = :otherid_1)) " "OR (EXISTS (SELECT * FROM myothertable WHERE " - "myothertable.otherid = :otherid_2)) AS anon_1" + "myothertable.otherid = :otherid_2)) AS anon_1", ) self.assert_compile( - select([exists([1])]), - "SELECT EXISTS (SELECT 1) AS anon_1" + select([exists([1])]), "SELECT EXISTS (SELECT 1) AS anon_1" ) self.assert_compile( - select([~exists([1])]), - "SELECT NOT (EXISTS (SELECT 1)) AS anon_1" + select([~exists([1])]), "SELECT NOT (EXISTS (SELECT 1)) AS anon_1" ) self.assert_compile( select([~(~exists([1]))]), - "SELECT NOT (NOT (EXISTS (SELECT 1))) AS anon_1" + "SELECT NOT (NOT (EXISTS (SELECT 1))) AS anon_1", ) def test_where_subquery(self): - s = select([addresses.c.street], addresses.c.user_id - == users.c.user_id, correlate=True).alias('s') + s = select( + [addresses.c.street], + addresses.c.user_id == users.c.user_id, + correlate=True, + ).alias("s") # don't correlate in a FROM list - self.assert_compile(select([users, s.c.street], from_obj=s), - "SELECT users.user_id, users.user_name, " - "users.password, s.street FROM users, " - "(SELECT addresses.street AS street FROM " - "addresses, users WHERE addresses.user_id = " - "users.user_id) AS s") - self.assert_compile(table1.select( - table1.c.myid == select( - [table1.c.myid], - table1.c.name == 'jack')), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'mytable.myid = (SELECT mytable.myid FROM ' - 'mytable WHERE mytable.name = :name_1)') + self.assert_compile( + select([users, s.c.street], from_obj=s), + "SELECT users.user_id, users.user_name, " + "users.password, s.street FROM users, " + "(SELECT addresses.street AS street FROM " + "addresses, users WHERE addresses.user_id = " + "users.user_id) AS s", + ) + self.assert_compile( + table1.select( + table1.c.myid + == select([table1.c.myid], table1.c.name == "jack") + ), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable WHERE " + "mytable.myid = (SELECT mytable.myid FROM " + "mytable WHERE mytable.name = :name_1)", + ) self.assert_compile( table1.select( - table1.c.myid == select( - [table2.c.otherid], - table1.c.name == table2.c.othername + table1.c.myid + == select( + [table2.c.otherid], table1.c.name == table2.c.othername ) ), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'mytable.myid = (SELECT ' - 'myothertable.otherid FROM myothertable ' - 'WHERE mytable.name = myothertable.othernam' - 'e)') - self.assert_compile(table1.select(exists([1], table2.c.otherid - == table1.c.myid)), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'EXISTS (SELECT 1 FROM myothertable WHERE ' - 'myothertable.otherid = mytable.myid)') - talias = table1.alias('ta') - s = subquery('sq2', [talias], exists([1], table2.c.otherid - == talias.c.myid)) - self.assert_compile(select([s, table1]), - 'SELECT sq2.myid, sq2.name, ' - 'sq2.description, mytable.myid, ' - 'mytable.name, mytable.description FROM ' - '(SELECT ta.myid AS myid, ta.name AS name, ' - 'ta.description AS description FROM ' - 'mytable AS ta WHERE EXISTS (SELECT 1 FROM ' - 'myothertable WHERE myothertable.otherid = ' - 'ta.myid)) AS sq2, mytable') + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable WHERE " + "mytable.myid = (SELECT " + "myothertable.otherid FROM myothertable " + "WHERE mytable.name = myothertable.othernam" + "e)", + ) + self.assert_compile( + table1.select(exists([1], table2.c.otherid == table1.c.myid)), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable WHERE " + "EXISTS (SELECT 1 FROM myothertable WHERE " + "myothertable.otherid = mytable.myid)", + ) + talias = table1.alias("ta") + s = subquery( + "sq2", [talias], exists([1], table2.c.otherid == talias.c.myid) + ) + self.assert_compile( + select([s, table1]), + "SELECT sq2.myid, sq2.name, " + "sq2.description, mytable.myid, " + "mytable.name, mytable.description FROM " + "(SELECT ta.myid AS myid, ta.name AS name, " + "ta.description AS description FROM " + "mytable AS ta WHERE EXISTS (SELECT 1 FROM " + "myothertable WHERE myothertable.otherid = " + "ta.myid)) AS sq2, mytable", + ) # test constructing the outer query via append_column(), which # occurs in the ORM's Query object - s = select([], exists([1], table2.c.otherid == table1.c.myid), - from_obj=table1) + s = select( + [], exists([1], table2.c.otherid == table1.c.myid), from_obj=table1 + ) s.append_column(table1) - self.assert_compile(s, - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable WHERE ' - 'EXISTS (SELECT 1 FROM myothertable WHERE ' - 'myothertable.otherid = mytable.myid)') + self.assert_compile( + s, + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable WHERE " + "EXISTS (SELECT 1 FROM myothertable WHERE " + "myothertable.otherid = mytable.myid)", + ) def test_orderby_subquery(self): self.assert_compile( table1.select( order_by=[ select( - [ - table2.c.otherid], - table1.c.myid == table2.c.otherid)]), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable ORDER BY ' - '(SELECT myothertable.otherid FROM ' - 'myothertable WHERE mytable.myid = ' - 'myothertable.otherid)') - self.assert_compile(table1.select(order_by=[ - desc(select([table2.c.otherid], - table1.c.myid == table2.c.otherid))]), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description FROM mytable ORDER BY ' - '(SELECT myothertable.otherid FROM ' - 'myothertable WHERE mytable.myid = ' - 'myothertable.otherid) DESC') + [table2.c.otherid], table1.c.myid == table2.c.otherid + ) + ] + ), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable ORDER BY " + "(SELECT myothertable.otherid FROM " + "myothertable WHERE mytable.myid = " + "myothertable.otherid)", + ) + self.assert_compile( + table1.select( + order_by=[ + desc( + select( + [table2.c.otherid], + table1.c.myid == table2.c.otherid, + ) + ) + ] + ), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable ORDER BY " + "(SELECT myothertable.otherid FROM " + "myothertable WHERE mytable.myid = " + "myothertable.otherid) DESC", + ) def test_scalar_select(self): assert_raises_message( @@ -745,72 +852,86 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): r"Select objects don't have a type\. Call as_scalar\(\) " r"on this Select object to return a 'scalar' " r"version of this Select\.", - func.coalesce, select([table1.c.myid]) + func.coalesce, + select([table1.c.myid]), ) s = select([table1.c.myid], correlate=False).as_scalar() - self.assert_compile(select([table1, s]), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description, (SELECT mytable.myid ' - 'FROM mytable) AS anon_1 FROM mytable') + self.assert_compile( + select([table1, s]), + "SELECT mytable.myid, mytable.name, " + "mytable.description, (SELECT mytable.myid " + "FROM mytable) AS anon_1 FROM mytable", + ) s = select([table1.c.myid]).as_scalar() - self.assert_compile(select([table2, s]), - 'SELECT myothertable.otherid, ' - 'myothertable.othername, (SELECT ' - 'mytable.myid FROM mytable) AS anon_1 FROM ' - 'myothertable') + self.assert_compile( + select([table2, s]), + "SELECT myothertable.otherid, " + "myothertable.othername, (SELECT " + "mytable.myid FROM mytable) AS anon_1 FROM " + "myothertable", + ) s = select([table1.c.myid]).correlate(None).as_scalar() - self.assert_compile(select([table1, s]), - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description, (SELECT mytable.myid ' - 'FROM mytable) AS anon_1 FROM mytable') + self.assert_compile( + select([table1, s]), + "SELECT mytable.myid, mytable.name, " + "mytable.description, (SELECT mytable.myid " + "FROM mytable) AS anon_1 FROM mytable", + ) s = select([table1.c.myid]).as_scalar() s2 = s.where(table1.c.myid == 5) self.assert_compile( s2, - "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)" - ) - self.assert_compile( - s, "(SELECT mytable.myid FROM mytable)" + "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", ) + self.assert_compile(s, "(SELECT mytable.myid FROM mytable)") # test that aliases use as_scalar() when used in an explicitly # scalar context s = select([table1.c.myid]).alias() - self.assert_compile(select([table1.c.myid]).where(table1.c.myid - == s), - 'SELECT mytable.myid FROM mytable WHERE ' - 'mytable.myid = (SELECT mytable.myid FROM ' - 'mytable)') - self.assert_compile(select([table1.c.myid]).where(s - > table1.c.myid), - 'SELECT mytable.myid FROM mytable WHERE ' - 'mytable.myid < (SELECT mytable.myid FROM ' - 'mytable)') + self.assert_compile( + select([table1.c.myid]).where(table1.c.myid == s), + "SELECT mytable.myid FROM mytable WHERE " + "mytable.myid = (SELECT mytable.myid FROM " + "mytable)", + ) + self.assert_compile( + select([table1.c.myid]).where(s > table1.c.myid), + "SELECT mytable.myid FROM mytable WHERE " + "mytable.myid < (SELECT mytable.myid FROM " + "mytable)", + ) s = select([table1.c.myid]).as_scalar() - self.assert_compile(select([table2, s]), - 'SELECT myothertable.otherid, ' - 'myothertable.othername, (SELECT ' - 'mytable.myid FROM mytable) AS anon_1 FROM ' - 'myothertable') + self.assert_compile( + select([table2, s]), + "SELECT myothertable.otherid, " + "myothertable.othername, (SELECT " + "mytable.myid FROM mytable) AS anon_1 FROM " + "myothertable", + ) # test expressions against scalar selects - self.assert_compile(select([s - literal(8)]), - 'SELECT (SELECT mytable.myid FROM mytable) ' - '- :param_1 AS anon_1') - self.assert_compile(select([select([table1.c.name]).as_scalar() - + literal('x')]), - 'SELECT (SELECT mytable.name FROM mytable) ' - '|| :param_1 AS anon_1') - self.assert_compile(select([s > literal(8)]), - 'SELECT (SELECT mytable.myid FROM mytable) ' - '> :param_1 AS anon_1') - self.assert_compile(select([select([table1.c.name]).label('foo' - )]), - 'SELECT (SELECT mytable.name FROM mytable) ' - 'AS foo') + self.assert_compile( + select([s - literal(8)]), + "SELECT (SELECT mytable.myid FROM mytable) " + "- :param_1 AS anon_1", + ) + self.assert_compile( + select([select([table1.c.name]).as_scalar() + literal("x")]), + "SELECT (SELECT mytable.name FROM mytable) " + "|| :param_1 AS anon_1", + ) + self.assert_compile( + select([s > literal(8)]), + "SELECT (SELECT mytable.myid FROM mytable) " + "> :param_1 AS anon_1", + ) + self.assert_compile( + select([select([table1.c.name]).label("foo")]), + "SELECT (SELECT mytable.name FROM mytable) " "AS foo", + ) # scalar selects should not have any attributes on their 'c' or # 'columns' attribute @@ -819,101 +940,129 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): try: s.c.foo except exc.InvalidRequestError as err: - assert str(err) \ - == 'Scalar Select expression has no columns; use this '\ - 'object directly within a column-level expression.' + assert ( + str(err) + == "Scalar Select expression has no columns; use this " + "object directly within a column-level expression." + ) try: s.columns.foo except exc.InvalidRequestError as err: - assert str(err) \ - == 'Scalar Select expression has no columns; use this '\ - 'object directly within a column-level expression.' - - zips = table('zips', - column('zipcode'), - column('latitude'), - column('longitude'), - ) - places = table('places', - column('id'), - column('nm') - ) - zip = '12345' - qlat = select([zips.c.latitude], zips.c.zipcode == zip).\ - correlate(None).as_scalar() - qlng = select([zips.c.longitude], zips.c.zipcode == zip).\ - correlate(None).as_scalar() - - q = select([places.c.id, places.c.nm, zips.c.zipcode, - func.latlondist(qlat, qlng).label('dist')], - zips.c.zipcode == zip, - order_by=['dist', places.c.nm] - ) - - self.assert_compile(q, - 'SELECT places.id, places.nm, ' - 'zips.zipcode, latlondist((SELECT ' - 'zips.latitude FROM zips WHERE ' - 'zips.zipcode = :zipcode_1), (SELECT ' - 'zips.longitude FROM zips WHERE ' - 'zips.zipcode = :zipcode_2)) AS dist FROM ' - 'places, zips WHERE zips.zipcode = ' - ':zipcode_3 ORDER BY dist, places.nm') - - zalias = zips.alias('main_zip') - qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).\ - as_scalar() - qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).\ - as_scalar() - q = select([places.c.id, places.c.nm, zalias.c.zipcode, - func.latlondist(qlat, qlng).label('dist')], - order_by=['dist', places.c.nm]) - self.assert_compile(q, - 'SELECT places.id, places.nm, ' - 'main_zip.zipcode, latlondist((SELECT ' - 'zips.latitude FROM zips WHERE ' - 'zips.zipcode = main_zip.zipcode), (SELECT ' - 'zips.longitude FROM zips WHERE ' - 'zips.zipcode = main_zip.zipcode)) AS dist ' - 'FROM places, zips AS main_zip ORDER BY ' - 'dist, places.nm') - - a1 = table2.alias('t2alias') + assert ( + str(err) + == "Scalar Select expression has no columns; use this " + "object directly within a column-level expression." + ) + + zips = table( + "zips", column("zipcode"), column("latitude"), column("longitude") + ) + places = table("places", column("id"), column("nm")) + zip = "12345" + qlat = ( + select([zips.c.latitude], zips.c.zipcode == zip) + .correlate(None) + .as_scalar() + ) + qlng = ( + select([zips.c.longitude], zips.c.zipcode == zip) + .correlate(None) + .as_scalar() + ) + + q = select( + [ + places.c.id, + places.c.nm, + zips.c.zipcode, + func.latlondist(qlat, qlng).label("dist"), + ], + zips.c.zipcode == zip, + order_by=["dist", places.c.nm], + ) + + self.assert_compile( + q, + "SELECT places.id, places.nm, " + "zips.zipcode, latlondist((SELECT " + "zips.latitude FROM zips WHERE " + "zips.zipcode = :zipcode_1), (SELECT " + "zips.longitude FROM zips WHERE " + "zips.zipcode = :zipcode_2)) AS dist FROM " + "places, zips WHERE zips.zipcode = " + ":zipcode_3 ORDER BY dist, places.nm", + ) + + zalias = zips.alias("main_zip") + qlat = select( + [zips.c.latitude], zips.c.zipcode == zalias.c.zipcode + ).as_scalar() + qlng = select( + [zips.c.longitude], zips.c.zipcode == zalias.c.zipcode + ).as_scalar() + q = select( + [ + places.c.id, + places.c.nm, + zalias.c.zipcode, + func.latlondist(qlat, qlng).label("dist"), + ], + order_by=["dist", places.c.nm], + ) + self.assert_compile( + q, + "SELECT places.id, places.nm, " + "main_zip.zipcode, latlondist((SELECT " + "zips.latitude FROM zips WHERE " + "zips.zipcode = main_zip.zipcode), (SELECT " + "zips.longitude FROM zips WHERE " + "zips.zipcode = main_zip.zipcode)) AS dist " + "FROM places, zips AS main_zip ORDER BY " + "dist, places.nm", + ) + + a1 = table2.alias("t2alias") s1 = select([a1.c.otherid], table1.c.myid == a1.c.otherid).as_scalar() j1 = table1.join(table2, table1.c.myid == table2.c.otherid) s2 = select([table1, s1], from_obj=j1) - self.assert_compile(s2, - 'SELECT mytable.myid, mytable.name, ' - 'mytable.description, (SELECT ' - 't2alias.otherid FROM myothertable AS ' - 't2alias WHERE mytable.myid = ' - 't2alias.otherid) AS anon_1 FROM mytable ' - 'JOIN myothertable ON mytable.myid = ' - 'myothertable.otherid') + self.assert_compile( + s2, + "SELECT mytable.myid, mytable.name, " + "mytable.description, (SELECT " + "t2alias.otherid FROM myothertable AS " + "t2alias WHERE mytable.myid = " + "t2alias.otherid) AS anon_1 FROM mytable " + "JOIN myothertable ON mytable.myid = " + "myothertable.otherid", + ) def test_label_comparison_one(self): - x = func.lala(table1.c.myid).label('foo') - self.assert_compile(select([x], x == 5), - 'SELECT lala(mytable.myid) AS foo FROM ' - 'mytable WHERE lala(mytable.myid) = ' - ':param_1') + x = func.lala(table1.c.myid).label("foo") + self.assert_compile( + select([x], x == 5), + "SELECT lala(mytable.myid) AS foo FROM " + "mytable WHERE lala(mytable.myid) = " + ":param_1", + ) def test_label_comparison_two(self): self.assert_compile( - label('bar', column('foo', type_=String)) + 'foo', - 'foo || :param_1') + label("bar", column("foo", type_=String)) + "foo", + "foo || :param_1", + ) def test_order_by_labels_enabled(self): - lab1 = (table1.c.myid + 12).label('foo') - lab2 = func.somefunc(table1.c.name).label('bar') + lab1 = (table1.c.myid + 12).label("foo") + lab2 = func.somefunc(table1.c.name).label("bar") dialect = default.DefaultDialect() - self.assert_compile(select([lab1, lab2]).order_by(lab1, desc(lab2)), - "SELECT mytable.myid + :myid_1 AS foo, " - "somefunc(mytable.name) AS bar FROM mytable " - "ORDER BY foo, bar DESC", - dialect=dialect - ) + self.assert_compile( + select([lab1, lab2]).order_by(lab1, desc(lab2)), + "SELECT mytable.myid + :myid_1 AS foo, " + "somefunc(mytable.name) AS bar FROM mytable " + "ORDER BY foo, bar DESC", + dialect=dialect, + ) # the function embedded label renders as the function self.assert_compile( @@ -921,16 +1070,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY hoho(mytable.myid + :myid_1), bar DESC", - dialect=dialect + dialect=dialect, ) # binary expressions render as the expression without labels - self.assert_compile(select([lab1, lab2]).order_by(lab1 + "test"), - "SELECT mytable.myid + :myid_1 AS foo, " - "somefunc(mytable.name) AS bar FROM mytable " - "ORDER BY mytable.myid + :myid_1 + :param_1", - dialect=dialect - ) + self.assert_compile( + select([lab1, lab2]).order_by(lab1 + "test"), + "SELECT mytable.myid + :myid_1 AS foo, " + "somefunc(mytable.name) AS bar FROM mytable " + "ORDER BY mytable.myid + :myid_1 + :param_1", + dialect=dialect, + ) # labels within functions in the columns clause render # with the expression @@ -939,98 +1089,92 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT mytable.myid + :myid_1 AS foo, " "foo(mytable.myid + :myid_1) AS foo_1 FROM mytable " "ORDER BY foo, foo(mytable.myid + :myid_1)", - dialect=dialect + dialect=dialect, ) - lx = (table1.c.myid + table1.c.myid).label('lx') - ly = (func.lower(table1.c.name) + table1.c.description).label('ly') + lx = (table1.c.myid + table1.c.myid).label("lx") + ly = (func.lower(table1.c.name) + table1.c.description).label("ly") self.assert_compile( select([lx, ly]).order_by(lx, ly.desc()), "SELECT mytable.myid + mytable.myid AS lx, " "lower(mytable.name) || mytable.description AS ly " "FROM mytable ORDER BY lx, ly DESC", - dialect=dialect + dialect=dialect, ) # expression isn't actually the same thing (even though label is) self.assert_compile( select([lab1, lab2]).order_by( - table1.c.myid.label('foo'), - desc(table1.c.name.label('bar')) + table1.c.myid.label("foo"), desc(table1.c.name.label("bar")) ), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY mytable.myid, mytable.name DESC", - dialect=dialect + dialect=dialect, ) # it's also an exact match, not aliased etc. self.assert_compile( select([lab1, lab2]).order_by( - desc(table1.alias().c.name.label('bar')) + desc(table1.alias().c.name.label("bar")) ), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY mytable_1.name DESC", - dialect=dialect + dialect=dialect, ) # but! it's based on lineage lab2_lineage = lab2.element._clone() self.assert_compile( - select([lab1, lab2]).order_by( - desc(lab2_lineage.label('bar')) - ), + select([lab1, lab2]).order_by(desc(lab2_lineage.label("bar"))), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY bar DESC", - dialect=dialect + dialect=dialect, ) # here, 'name' is implicitly available, but w/ #3882 we don't # want to render a name that isn't specifically a Label elsewhere # in the query self.assert_compile( - select([table1.c.myid]).order_by(table1.c.name.label('name')), - "SELECT mytable.myid FROM mytable ORDER BY mytable.name" + select([table1.c.myid]).order_by(table1.c.name.label("name")), + "SELECT mytable.myid FROM mytable ORDER BY mytable.name", ) # as well as if it doesn't match self.assert_compile( select([table1.c.myid]).order_by( - func.lower(table1.c.name).label('name')), - "SELECT mytable.myid FROM mytable ORDER BY lower(mytable.name)" + func.lower(table1.c.name).label("name") + ), + "SELECT mytable.myid FROM mytable ORDER BY lower(mytable.name)", ) def test_order_by_labels_disabled(self): - lab1 = (table1.c.myid + 12).label('foo') - lab2 = func.somefunc(table1.c.name).label('bar') + lab1 = (table1.c.myid + 12).label("foo") + lab2 = func.somefunc(table1.c.name).label("bar") dialect = default.DefaultDialect() dialect.supports_simple_order_by_label = False self.assert_compile( - select( - [ - lab1, - lab2]).order_by( - lab1, - desc(lab2)), + select([lab1, lab2]).order_by(lab1, desc(lab2)), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY mytable.myid + :myid_1, somefunc(mytable.name) DESC", - dialect=dialect) + dialect=dialect, + ) self.assert_compile( select([lab1, lab2]).order_by(func.hoho(lab1), desc(lab2)), "SELECT mytable.myid + :myid_1 AS foo, " "somefunc(mytable.name) AS bar FROM mytable " "ORDER BY hoho(mytable.myid + :myid_1), " "somefunc(mytable.name) DESC", - dialect=dialect + dialect=dialect, ) def test_no_group_by_labels(self): - lab1 = (table1.c.myid + 12).label('foo') - lab2 = func.somefunc(table1.c.name).label('bar') + lab1 = (table1.c.myid + 12).label("foo") + lab2 = func.somefunc(table1.c.name).label("bar") dialect = default.DefaultDialect() self.assert_compile( @@ -1038,140 +1182,140 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT mytable.myid + :myid_1 AS foo, somefunc(mytable.name) " "AS bar FROM mytable GROUP BY mytable.myid + :myid_1, " "somefunc(mytable.name)", - dialect=dialect + dialect=dialect, ) def test_conjunctions(self): - a, b, c = text('a'), text('b'), text('c') + a, b, c = text("a"), text("b"), text("c") x = and_(a, b, c) assert isinstance(x.type, Boolean) - assert str(x) == 'a AND b AND c' + assert str(x) == "a AND b AND c" self.assert_compile( - select([x.label('foo')]), - 'SELECT a AND b AND c AS foo' + select([x.label("foo")]), "SELECT a AND b AND c AS foo" ) self.assert_compile( - and_(table1.c.myid == 12, table1.c.name == 'asdf', - table2.c.othername == 'foo', text("sysdate() = today()")), + and_( + table1.c.myid == 12, + table1.c.name == "asdf", + table2.c.othername == "foo", + text("sysdate() = today()"), + ), "mytable.myid = :myid_1 AND mytable.name = :name_1 " "AND myothertable.othername = " - ":othername_1 AND sysdate() = today()" + ":othername_1 AND sysdate() = today()", ) self.assert_compile( and_( table1.c.myid == 12, - or_(table2.c.othername == 'asdf', - table2.c.othername == 'foo', table2.c.otherid == 9), + or_( + table2.c.othername == "asdf", + table2.c.othername == "foo", + table2.c.otherid == 9, + ), text("sysdate() = today()"), ), - 'mytable.myid = :myid_1 AND (myothertable.othername = ' - ':othername_1 OR myothertable.othername = :othername_2 OR ' - 'myothertable.otherid = :otherid_1) AND sysdate() = ' - 'today()', - checkparams={'othername_1': 'asdf', 'othername_2': 'foo', - 'otherid_1': 9, 'myid_1': 12} + "mytable.myid = :myid_1 AND (myothertable.othername = " + ":othername_1 OR myothertable.othername = :othername_2 OR " + "myothertable.otherid = :otherid_1) AND sysdate() = " + "today()", + checkparams={ + "othername_1": "asdf", + "othername_2": "foo", + "otherid_1": 9, + "myid_1": 12, + }, ) # test a generator self.assert_compile( and_( - conj for conj in [ - table1.c.myid == 12, - table1.c.name == 'asdf' - ] + conj for conj in [table1.c.myid == 12, table1.c.name == "asdf"] ), - "mytable.myid = :myid_1 AND mytable.name = :name_1" + "mytable.myid = :myid_1 AND mytable.name = :name_1", ) def test_nested_conjunctions_short_circuit(self): """test that empty or_(), and_() conjunctions are collapsed by an enclosing conjunction.""" - t = table('t', column('x')) + t = table("t", column("x")) self.assert_compile( - select([t]).where(and_(t.c.x == 5, - or_(and_(or_(t.c.x == 7))))), - "SELECT t.x FROM t WHERE t.x = :x_1 AND t.x = :x_2" + select([t]).where(and_(t.c.x == 5, or_(and_(or_(t.c.x == 7))))), + "SELECT t.x FROM t WHERE t.x = :x_1 AND t.x = :x_2", ) self.assert_compile( - select([t]).where(and_(or_(t.c.x == 12, - and_(or_(t.c.x == 8))))), - "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2" + select([t]).where(and_(or_(t.c.x == 12, and_(or_(t.c.x == 8))))), + "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2", ) self.assert_compile( - select([t]). - where( + select([t]).where( and_( or_( or_(t.c.x == 12), - and_( - or_(), - or_(and_(t.c.x == 8)), - and_() - ) + and_(or_(), or_(and_(t.c.x == 8)), and_()), ) ) ), - "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2" + "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2", ) def test_true_short_circuit(self): - t = table('t', column('x')) + t = table("t", column("x")) self.assert_compile( select([t]).where(true()), "SELECT t.x FROM t WHERE 1 = 1", - dialect=default.DefaultDialect(supports_native_boolean=False) + dialect=default.DefaultDialect(supports_native_boolean=False), ) self.assert_compile( select([t]).where(true()), "SELECT t.x FROM t WHERE true", - dialect=default.DefaultDialect(supports_native_boolean=True) + dialect=default.DefaultDialect(supports_native_boolean=True), ) self.assert_compile( select([t]), "SELECT t.x FROM t", - dialect=default.DefaultDialect(supports_native_boolean=True) + dialect=default.DefaultDialect(supports_native_boolean=True), ) def test_distinct(self): self.assert_compile( select([table1.c.myid.distinct()]), - "SELECT DISTINCT mytable.myid FROM mytable" + "SELECT DISTINCT mytable.myid FROM mytable", ) self.assert_compile( select([distinct(table1.c.myid)]), - "SELECT DISTINCT mytable.myid FROM mytable" + "SELECT DISTINCT mytable.myid FROM mytable", ) self.assert_compile( select([table1.c.myid]).distinct(), - "SELECT DISTINCT mytable.myid FROM mytable" + "SELECT DISTINCT mytable.myid FROM mytable", ) self.assert_compile( select([func.count(table1.c.myid.distinct())]), - "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable" + "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable", ) self.assert_compile( select([func.count(distinct(table1.c.myid))]), - "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable" + "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable", ) def test_where_empty(self): self.assert_compile( select([table1.c.myid]).where(and_()), - "SELECT mytable.myid FROM mytable" + "SELECT mytable.myid FROM mytable", ) self.assert_compile( select([table1.c.myid]).where(or_()), - "SELECT mytable.myid FROM mytable" + "SELECT mytable.myid FROM mytable", ) def test_multiple_col_binds(self): @@ -1179,133 +1323,165 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): select( [literal_column("*")], or_( - table1.c.myid == 12, table1.c.myid == 'asdf', - table1.c.myid == 'foo') + table1.c.myid == 12, + table1.c.myid == "asdf", + table1.c.myid == "foo", + ), ), "SELECT * FROM mytable WHERE mytable.myid = :myid_1 " - "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3" + "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3", ) def test_order_by_nulls(self): self.assert_compile( - table2.select(order_by=[table2.c.otherid, - table2.c.othername.desc().nullsfirst()]), + table2.select( + order_by=[ + table2.c.otherid, + table2.c.othername.desc().nullsfirst(), + ] + ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " - "myothertable.othername DESC NULLS FIRST" + "myothertable.othername DESC NULLS FIRST", ) self.assert_compile( - table2.select(order_by=[ - table2.c.otherid, table2.c.othername.desc().nullslast()]), + table2.select( + order_by=[ + table2.c.otherid, + table2.c.othername.desc().nullslast(), + ] + ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " - "myothertable.othername DESC NULLS LAST" + "myothertable.othername DESC NULLS LAST", ) self.assert_compile( - table2.select(order_by=[ - table2.c.otherid.nullslast(), - table2.c.othername.desc().nullsfirst()]), + table2.select( + order_by=[ + table2.c.otherid.nullslast(), + table2.c.othername.desc().nullsfirst(), + ] + ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid NULLS LAST, " - "myothertable.othername DESC NULLS FIRST" + "myothertable.othername DESC NULLS FIRST", ) self.assert_compile( - table2.select(order_by=[table2.c.otherid.nullsfirst(), - table2.c.othername.desc()]), + table2.select( + order_by=[ + table2.c.otherid.nullsfirst(), + table2.c.othername.desc(), + ] + ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid NULLS FIRST, " - "myothertable.othername DESC" + "myothertable.othername DESC", ) self.assert_compile( - table2.select(order_by=[table2.c.otherid.nullsfirst(), - table2.c.othername.desc().nullslast()]), + table2.select( + order_by=[ + table2.c.otherid.nullsfirst(), + table2.c.othername.desc().nullslast(), + ] + ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid NULLS FIRST, " - "myothertable.othername DESC NULLS LAST" + "myothertable.othername DESC NULLS LAST", ) def test_orderby_groupby(self): self.assert_compile( - table2.select(order_by=[table2.c.otherid, - asc(table2.c.othername)]), + table2.select( + order_by=[table2.c.otherid, asc(table2.c.othername)] + ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " - "myothertable.othername ASC" + "myothertable.othername ASC", ) self.assert_compile( - table2.select(order_by=[table2.c.otherid, - table2.c.othername.desc()]), + table2.select( + order_by=[table2.c.otherid, table2.c.othername.desc()] + ), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " - "myothertable.othername DESC" + "myothertable.othername DESC", ) # generative order_by self.assert_compile( - table2.select().order_by(table2.c.otherid). - order_by(table2.c.othername.desc()), + table2.select() + .order_by(table2.c.otherid) + .order_by(table2.c.othername.desc()), "SELECT myothertable.otherid, myothertable.othername FROM " "myothertable ORDER BY myothertable.otherid, " - "myothertable.othername DESC" + "myothertable.othername DESC", ) self.assert_compile( - table2.select().order_by(table2.c.otherid). - order_by(table2.c.othername.desc() - ).order_by(None), + table2.select() + .order_by(table2.c.otherid) + .order_by(table2.c.othername.desc()) + .order_by(None), "SELECT myothertable.otherid, myothertable.othername " - "FROM myothertable" + "FROM myothertable", ) self.assert_compile( select( [table2.c.othername, func.count(table2.c.otherid)], - group_by=[table2.c.othername]), + group_by=[table2.c.othername], + ), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " - "FROM myothertable GROUP BY myothertable.othername" + "FROM myothertable GROUP BY myothertable.othername", ) # generative group by self.assert_compile( - select([table2.c.othername, func.count(table2.c.otherid)]). - group_by(table2.c.othername), + select( + [table2.c.othername, func.count(table2.c.otherid)] + ).group_by(table2.c.othername), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " - "FROM myothertable GROUP BY myothertable.othername" + "FROM myothertable GROUP BY myothertable.othername", ) self.assert_compile( - select([table2.c.othername, func.count(table2.c.otherid)]). - group_by(table2.c.othername).group_by(None), + select([table2.c.othername, func.count(table2.c.otherid)]) + .group_by(table2.c.othername) + .group_by(None), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " - "FROM myothertable" + "FROM myothertable", ) self.assert_compile( - select([table2.c.othername, func.count(table2.c.otherid)], - group_by=[table2.c.othername], - order_by=[table2.c.othername]), + select( + [table2.c.othername, func.count(table2.c.otherid)], + group_by=[table2.c.othername], + order_by=[table2.c.othername], + ), "SELECT myothertable.othername, " "count(myothertable.otherid) AS count_1 " "FROM myothertable " - "GROUP BY myothertable.othername ORDER BY myothertable.othername" + "GROUP BY myothertable.othername ORDER BY myothertable.othername", ) def test_custom_order_by_clause(self): class CustomCompiler(PGCompiler): def order_by_clause(self, select, **kw): - return super(CustomCompiler, self).\ - order_by_clause(select, **kw) + " CUSTOMIZED" + return ( + super(CustomCompiler, self).order_by_clause(select, **kw) + + " CUSTOMIZED" + ) class CustomDialect(PGDialect): - name = 'custom' + name = "custom" statement_compiler = CustomCompiler stmt = select([table1.c.myid]).order_by(table1.c.myid) @@ -1313,17 +1489,19 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): stmt, "SELECT mytable.myid FROM mytable ORDER BY " "mytable.myid CUSTOMIZED", - dialect=CustomDialect() + dialect=CustomDialect(), ) def test_custom_group_by_clause(self): class CustomCompiler(PGCompiler): def group_by_clause(self, select, **kw): - return super(CustomCompiler, self).\ - group_by_clause(select, **kw) + " CUSTOMIZED" + return ( + super(CustomCompiler, self).group_by_clause(select, **kw) + + " CUSTOMIZED" + ) class CustomDialect(PGDialect): - name = 'custom' + name = "custom" statement_compiler = CustomCompiler stmt = select([table1.c.myid]).group_by(table1.c.myid) @@ -1331,44 +1509,51 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): stmt, "SELECT mytable.myid FROM mytable GROUP BY " "mytable.myid CUSTOMIZED", - dialect=CustomDialect() + dialect=CustomDialect(), ) def test_for_update(self): self.assert_compile( table1.select(table1.c.myid == 7).with_for_update(), "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", + ) # not supported by dialect, should just use update self.assert_compile( table1.select(table1.c.myid == 7).with_for_update(nowait=True), "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE") + "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE", + ) assert_raises_message( exc.ArgumentError, "Unknown for_update argument: 'unknown_mode'", - table1.select, table1.c.myid == 7, for_update='unknown_mode' + table1.select, + table1.c.myid == 7, + for_update="unknown_mode", ) def test_alias(self): # test the alias for a table1. column names stay the same, # table name "changes" to "foo". self.assert_compile( - select([table1.alias('foo')]), - "SELECT foo.myid, foo.name, foo.description FROM mytable AS foo") + select([table1.alias("foo")]), + "SELECT foo.myid, foo.name, foo.description FROM mytable AS foo", + ) for dialect in (oracle.dialect(),): self.assert_compile( - select([table1.alias('foo')]), + select([table1.alias("foo")]), "SELECT foo.myid, foo.name, foo.description FROM mytable foo", - dialect=dialect) + dialect=dialect, + ) self.assert_compile( select([table1.alias()]), "SELECT mytable_1.myid, mytable_1.name, mytable_1.description " - "FROM mytable AS mytable_1") + "FROM mytable AS mytable_1", + ) # create a select for a join of two tables. use_labels # means the column names will have labels tablename_columnname, @@ -1377,12 +1562,13 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # from the first table1. q = select( [table1, table2.c.otherid], - table1.c.myid == table2.c.otherid, use_labels=True + table1.c.myid == table2.c.otherid, + use_labels=True, ) # make an alias of the "selectable". column names # stay the same (i.e. the labels), table name "changes" to "t2view". - a = alias(q, 't2view') + a = alias(q, "t2view") # select from that alias, also using labels. two levels of labels # should produce two underscores. @@ -1401,26 +1587,26 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "myothertable_otherid FROM mytable, myothertable " "WHERE mytable.myid = " "myothertable.otherid) AS t2view " - "WHERE t2view.mytable_myid = :mytable_myid_1" + "WHERE t2view.mytable_myid = :mytable_myid_1", ) def test_prefix(self): self.assert_compile( - table1.select().prefix_with("SQL_CALC_FOUND_ROWS"). - prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), + table1.select() + .prefix_with("SQL_CALC_FOUND_ROWS") + .prefix_with("SQL_SOME_WEIRD_MYSQL_THING"), "SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING " - "mytable.myid, mytable.name, mytable.description FROM mytable" + "mytable.myid, mytable.name, mytable.description FROM mytable", ) def test_prefix_dialect_specific(self): self.assert_compile( - table1.select().prefix_with("SQL_CALC_FOUND_ROWS", - dialect='sqlite'). - prefix_with("SQL_SOME_WEIRD_MYSQL_THING", - dialect='mysql'), + table1.select() + .prefix_with("SQL_CALC_FOUND_ROWS", dialect="sqlite") + .prefix_with("SQL_SOME_WEIRD_MYSQL_THING", dialect="mysql"), "SELECT SQL_SOME_WEIRD_MYSQL_THING " "mytable.myid, mytable.name, mytable.description FROM mytable", - dialect=mysql.dialect() + dialect=mysql.dialect(), ) def test_render_binds_as_literal(self): @@ -1431,140 +1617,149 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): class Compiler(dialect.statement_compiler): ansi_bind_rules = True + dialect.statement_compiler = Compiler self.assert_compile( select([literal("someliteral")]), "SELECT 'someliteral' AS anon_1", - dialect=dialect + dialect=dialect, ) self.assert_compile( select([table1.c.myid + 3]), "SELECT mytable.myid + 3 AS anon_1 FROM mytable", - dialect=dialect + dialect=dialect, ) self.assert_compile( select([table1.c.myid.in_([4, 5, 6])]), "SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable", - dialect=dialect + dialect=dialect, ) self.assert_compile( select([func.mod(table1.c.myid, 5)]), "SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable", - dialect=dialect + dialect=dialect, ) self.assert_compile( select([literal("foo").in_([])]), "SELECT 1 != 1 AS anon_1", - dialect=dialect + dialect=dialect, ) self.assert_compile( select([literal(util.b("foo"))]), "SELECT 'foo' AS anon_1", - dialect=dialect + dialect=dialect, ) # test callable self.assert_compile( select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]), "SELECT mytable.myid = 5 AS anon_1 FROM mytable", - dialect=dialect + dialect=dialect, ) - empty_in_dialect = default.DefaultDialect(empty_in_strategy='dynamic') + empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic") empty_in_dialect.statement_compiler = Compiler assert_raises_message( exc.CompileError, "Bind parameter 'foo' without a " "renderable value not allowed here.", - bindparam("foo").in_( - []).compile, - dialect=empty_in_dialect) + bindparam("foo").in_([]).compile, + dialect=empty_in_dialect, + ) def test_collate(self): # columns clause self.assert_compile( - select([column('x').collate('bar')]), - "SELECT x COLLATE bar AS anon_1" + select([column("x").collate("bar")]), + "SELECT x COLLATE bar AS anon_1", ) # WHERE clause self.assert_compile( - select([column('x')]).where(column('x').collate('bar') == 'foo'), - "SELECT x WHERE (x COLLATE bar) = :param_1" + select([column("x")]).where(column("x").collate("bar") == "foo"), + "SELECT x WHERE (x COLLATE bar) = :param_1", ) # ORDER BY clause self.assert_compile( - select([column('x')]).order_by(column('x').collate('bar')), - "SELECT x ORDER BY x COLLATE bar" + select([column("x")]).order_by(column("x").collate("bar")), + "SELECT x ORDER BY x COLLATE bar", ) def test_literal(self): - self.assert_compile(select([literal('foo')]), - "SELECT :param_1 AS anon_1") + self.assert_compile( + select([literal("foo")]), "SELECT :param_1 AS anon_1" + ) self.assert_compile( - select( - [ - literal("foo") + - literal("bar")], - from_obj=[table1]), - "SELECT :param_1 || :param_2 AS anon_1 FROM mytable") + select([literal("foo") + literal("bar")], from_obj=[table1]), + "SELECT :param_1 || :param_2 AS anon_1 FROM mytable", + ) def test_calculated_columns(self): - value_tbl = table('values', - column('id', Integer), - column('val1', Float), - column('val2', Float), - ) + value_tbl = table( + "values", + column("id", Integer), + column("val1", Float), + column("val2", Float), + ) self.assert_compile( - select([value_tbl.c.id, (value_tbl.c.val2 - - value_tbl.c.val1) / value_tbl.c.val1]), + select( + [ + value_tbl.c.id, + (value_tbl.c.val2 - value_tbl.c.val1) / value_tbl.c.val1, + ] + ), "SELECT values.id, (values.val2 - values.val1) " - "/ values.val1 AS anon_1 FROM values" + "/ values.val1 AS anon_1 FROM values", ) self.assert_compile( - select([ - value_tbl.c.id], - (value_tbl.c.val2 - value_tbl.c.val1) / - value_tbl.c.val1 > 2.0), + select( + [value_tbl.c.id], + (value_tbl.c.val2 - value_tbl.c.val1) / value_tbl.c.val1 > 2.0, + ), "SELECT values.id FROM values WHERE " - "(values.val2 - values.val1) / values.val1 > :param_1" + "(values.val2 - values.val1) / values.val1 > :param_1", ) self.assert_compile( - select([value_tbl.c.id], value_tbl.c.val1 / - (value_tbl.c.val2 - value_tbl.c.val1) / - value_tbl.c.val1 > 2.0), + select( + [value_tbl.c.id], + value_tbl.c.val1 + / (value_tbl.c.val2 - value_tbl.c.val1) + / value_tbl.c.val1 + > 2.0, + ), "SELECT values.id FROM values WHERE " "(values.val1 / (values.val2 - values.val1)) " - "/ values.val1 > :param_1" + "/ values.val1 > :param_1", ) def test_percent_chars(self): - t = table("table%name", - column("percent%"), - column("%(oneofthese)s"), - column("spaces % more spaces"), - ) + t = table( + "table%name", + column("percent%"), + column("%(oneofthese)s"), + column("spaces % more spaces"), + ) self.assert_compile( t.select(use_labels=True), - '''SELECT "table%name"."percent%" AS "table%name_percent%", ''' - '''"table%name"."%(oneofthese)s" AS ''' - '''"table%name_%(oneofthese)s", ''' - '''"table%name"."spaces % more spaces" AS ''' - '''"table%name_spaces % ''' - '''more spaces" FROM "table%name"''' + """SELECT "table%name"."percent%" AS "table%name_percent%", """ + """"table%name"."%(oneofthese)s" AS """ + """"table%name_%(oneofthese)s", """ + """"table%name"."spaces % more spaces" AS """ + """"table%name_spaces % """ + '''more spaces" FROM "table%name"''', ) def test_joins(self): @@ -1572,22 +1767,31 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): join(table2, table1, table1.c.myid == table2.c.otherid).select(), "SELECT myothertable.otherid, myothertable.othername, " "mytable.myid, mytable.name, mytable.description FROM " - "myothertable JOIN mytable ON mytable.myid = myothertable.otherid" + "myothertable JOIN mytable ON mytable.myid = myothertable.otherid", ) self.assert_compile( select( [table1], - from_obj=[join(table1, table2, table1.c.myid - == table2.c.otherid)] + from_obj=[ + join(table1, table2, table1.c.myid == table2.c.otherid) + ], ), "SELECT mytable.myid, mytable.name, mytable.description FROM " - "mytable JOIN myothertable ON mytable.myid = myothertable.otherid") + "mytable JOIN myothertable ON mytable.myid = myothertable.otherid", + ) self.assert_compile( select( - [join(join(table1, table2, table1.c.myid == table2.c.otherid), - table3, table1.c.myid == table3.c.userid)] + [ + join( + join( + table1, table2, table1.c.myid == table2.c.otherid + ), + table3, + table1.c.myid == table3.c.userid, + ) + ] ), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername, " @@ -1595,27 +1799,29 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "thirdtable.otherstuff FROM mytable JOIN myothertable " "ON mytable.myid =" " myothertable.otherid JOIN thirdtable ON " - "mytable.myid = thirdtable.userid" + "mytable.myid = thirdtable.userid", ) self.assert_compile( - join(users, addresses, users.c.user_id == - addresses.c.user_id).select(), + join( + users, addresses, users.c.user_id == addresses.c.user_id + ).select(), "SELECT users.user_id, users.user_name, users.password, " "addresses.address_id, addresses.user_id, addresses.street, " "addresses.city, addresses.state, addresses.zip " "FROM users JOIN addresses " - "ON users.user_id = addresses.user_id" + "ON users.user_id = addresses.user_id", ) self.assert_compile( - select([table1, table2, table3], - - from_obj=[join(table1, table2, - table1.c.myid == table2.c.otherid). - outerjoin(table3, - table1.c.myid == table3.c.userid)] - ), + select( + [table1, table2, table3], + from_obj=[ + join( + table1, table2, table1.c.myid == table2.c.otherid + ).outerjoin(table3, table1.c.myid == table3.c.userid) + ], + ), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername, " "thirdtable.userid," @@ -1623,15 +1829,21 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "JOIN myothertable ON mytable.myid " "= myothertable.otherid LEFT OUTER JOIN thirdtable " "ON mytable.myid =" - " thirdtable.userid" + " thirdtable.userid", ) self.assert_compile( - select([table1, table2, table3], - from_obj=[outerjoin(table1, - join(table2, table3, table2.c.otherid - == table3.c.userid), - table1.c.myid == table2.c.otherid)] - ), + select( + [table1, table2, table3], + from_obj=[ + outerjoin( + table1, + join( + table2, table3, table2.c.otherid == table3.c.userid + ), + table1.c.myid == table2.c.otherid, + ) + ], + ), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername, " "thirdtable.userid," @@ -1639,47 +1851,49 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "(myothertable " "JOIN thirdtable ON myothertable.otherid = " "thirdtable.userid) ON " - "mytable.myid = myothertable.otherid" + "mytable.myid = myothertable.otherid", ) query = select( [table1, table2], or_( - table1.c.name == 'fred', + table1.c.name == "fred", table1.c.myid == 10, - table2.c.othername != 'jack', - text("EXISTS (select yay from foo where boo = lar)") + table2.c.othername != "jack", + text("EXISTS (select yay from foo where boo = lar)"), ), - from_obj=[outerjoin(table1, table2, - table1.c.myid == table2.c.otherid)] + from_obj=[ + outerjoin(table1, table2, table1.c.myid == table2.c.otherid) + ], ) self.assert_compile( - query, "SELECT mytable.myid, mytable.name, mytable.description, " + query, + "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername " "FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = " "myothertable.otherid WHERE mytable.name = :name_1 OR " "mytable.myid = :myid_1 OR myothertable.othername != :othername_1 " - "OR EXISTS (select yay from foo where boo = lar)", ) + "OR EXISTS (select yay from foo where boo = lar)", + ) def test_full_outer_join(self): for spec in [ join(table1, table2, table1.c.myid == table2.c.otherid, full=True), outerjoin( - table1, table2, - table1.c.myid == table2.c.otherid, full=True), - table1.join( - table2, - table1.c.myid == table2.c.otherid, full=True), + table1, table2, table1.c.myid == table2.c.otherid, full=True + ), + table1.join(table2, table1.c.myid == table2.c.otherid, full=True), table1.outerjoin( - table2, - table1.c.myid == table2.c.otherid, full=True), + table2, table1.c.myid == table2.c.otherid, full=True + ), ]: stmt = select([table1]).select_from(spec) self.assert_compile( stmt, "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable FULL OUTER JOIN myothertable " - "ON mytable.myid = myothertable.otherid") + "ON mytable.myid = myothertable.otherid", + ) def test_compound_selects(self): assert_raises_message( @@ -1687,7 +1901,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "All selectables passed to CompoundSelect " "must have identical numbers of columns; " "select #1 has 2 columns, select #2 has 3", - union, table3.select(), table1.select() + union, + table3.select(), + table1.select(), ) x = union( @@ -1697,36 +1913,39 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): ) self.assert_compile( - x, "SELECT mytable.myid, mytable.name, " + x, + "SELECT mytable.myid, mytable.name, " "mytable.description " "FROM mytable WHERE " "mytable.myid = :myid_1 UNION " "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable WHERE mytable.myid = :myid_2 " - "ORDER BY mytable.myid") - - x = union( - select([table1]), - select([table1]) + "ORDER BY mytable.myid", ) + + x = union(select([table1]), select([table1])) x = union(x, select([table1])) self.assert_compile( - x, "(SELECT mytable.myid, mytable.name, mytable.description " + x, + "(SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable UNION SELECT mytable.myid, mytable.name, " "mytable.description FROM mytable) UNION SELECT mytable.myid," - " mytable.name, mytable.description FROM mytable") + " mytable.name, mytable.description FROM mytable", + ) u1 = union( select([table1.c.myid, table1.c.name]), select([table2]), - select([table3]) + select([table3]), ) self.assert_compile( - u1, "SELECT mytable.myid, mytable.name " + u1, + "SELECT mytable.myid, mytable.name " "FROM mytable UNION SELECT myothertable.otherid, " "myothertable.othername FROM myothertable " "UNION SELECT thirdtable.userid, thirdtable.otherstuff " - "FROM thirdtable") + "FROM thirdtable", + ) assert u1.corresponding_column(table2.c.otherid) is u1.c.myid @@ -1734,25 +1953,30 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): union( select([table1.c.myid, table1.c.name]), select([table2]), - order_by=['myid'], + order_by=["myid"], offset=10, - limit=5 + limit=5, ), "SELECT mytable.myid, mytable.name " "FROM mytable UNION SELECT myothertable.otherid, " "myothertable.othername " "FROM myothertable ORDER BY myid " # note table name is omitted "LIMIT :param_1 OFFSET :param_2", - {'param_1': 5, 'param_2': 10} + {"param_1": 5, "param_2": 10}, ) self.assert_compile( union( - select([table1.c.myid, table1.c.name, - func.max(table1.c.description)], - table1.c.name == 'name2', - group_by=[table1.c.myid, table1.c.name]), - table1.select(table1.c.name == 'name1') + select( + [ + table1.c.myid, + table1.c.name, + func.max(table1.c.description), + ], + table1.c.name == "name2", + group_by=[table1.c.myid, table1.c.name], + ), + table1.select(table1.c.name == "name1"), ), "SELECT mytable.myid, mytable.name, " "max(mytable.description) AS max_1 " @@ -1760,183 +1984,155 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "GROUP BY mytable.myid, " "mytable.name UNION SELECT mytable.myid, mytable.name, " "mytable.description " - "FROM mytable WHERE mytable.name = :name_2" + "FROM mytable WHERE mytable.name = :name_2", ) self.assert_compile( union( - select([literal(100).label('value')]), - select([literal(200).label('value')]) + select([literal(100).label("value")]), + select([literal(200).label("value")]), ), - "SELECT :param_1 AS value UNION SELECT :param_2 AS value" + "SELECT :param_1 AS value UNION SELECT :param_2 AS value", ) self.assert_compile( union_all( select([table1.c.myid]), - union( - select([table2.c.otherid]), - select([table3.c.userid]), - ) + union(select([table2.c.otherid]), select([table3.c.userid])), ), - "SELECT mytable.myid FROM mytable UNION ALL " "(SELECT myothertable.otherid FROM myothertable UNION " - "SELECT thirdtable.userid FROM thirdtable)" + "SELECT thirdtable.userid FROM thirdtable)", ) - s = select([column('foo'), column('bar')]) + s = select([column("foo"), column("bar")]) self.assert_compile( - union( - s.order_by("foo"), - s.order_by("bar")), + union(s.order_by("foo"), s.order_by("bar")), "(SELECT foo, bar ORDER BY foo) UNION " - "(SELECT foo, bar ORDER BY bar)") + "(SELECT foo, bar ORDER BY bar)", + ) self.assert_compile( - union(s.order_by("foo").self_group(), - s.order_by("bar").limit(10).self_group()), + union( + s.order_by("foo").self_group(), + s.order_by("bar").limit(10).self_group(), + ), "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, " "bar ORDER BY bar LIMIT :param_1)", - {'param_1': 10} - + {"param_1": 10}, ) def test_compound_grouping(self): - s = select([column('foo'), column('bar')]).select_from(text('bat')) + s = select([column("foo"), column("bar")]).select_from(text("bat")) self.assert_compile( union(union(union(s, s), s), s), "((SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) " - "UNION SELECT foo, bar FROM bat) UNION SELECT foo, bar FROM bat" + "UNION SELECT foo, bar FROM bat) UNION SELECT foo, bar FROM bat", ) self.assert_compile( union(s, s, s, s), "SELECT foo, bar FROM bat UNION SELECT foo, bar " "FROM bat UNION SELECT foo, bar FROM bat " - "UNION SELECT foo, bar FROM bat" + "UNION SELECT foo, bar FROM bat", ) self.assert_compile( union(s, union(s, union(s, s))), "SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat " "UNION (SELECT foo, bar FROM bat " - "UNION SELECT foo, bar FROM bat))" + "UNION SELECT foo, bar FROM bat))", ) self.assert_compile( select([s.alias()]), - 'SELECT anon_1.foo, anon_1.bar FROM ' - '(SELECT foo, bar FROM bat) AS anon_1' + "SELECT anon_1.foo, anon_1.bar FROM " + "(SELECT foo, bar FROM bat) AS anon_1", ) self.assert_compile( select([union(s, s).alias()]), - 'SELECT anon_1.foo, anon_1.bar FROM ' - '(SELECT foo, bar FROM bat UNION ' - 'SELECT foo, bar FROM bat) AS anon_1' + "SELECT anon_1.foo, anon_1.bar FROM " + "(SELECT foo, bar FROM bat UNION " + "SELECT foo, bar FROM bat) AS anon_1", ) self.assert_compile( select([except_(s, s).alias()]), - 'SELECT anon_1.foo, anon_1.bar FROM ' - '(SELECT foo, bar FROM bat EXCEPT ' - 'SELECT foo, bar FROM bat) AS anon_1' + "SELECT anon_1.foo, anon_1.bar FROM " + "(SELECT foo, bar FROM bat EXCEPT " + "SELECT foo, bar FROM bat) AS anon_1", ) # this query sqlite specifically chokes on self.assert_compile( - union( - except_(s, s), - s - ), + union(except_(s, s), s), "(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) " - "UNION SELECT foo, bar FROM bat" + "UNION SELECT foo, bar FROM bat", ) self.assert_compile( - union( - s, - except_(s, s), - ), + union(s, except_(s, s)), "SELECT foo, bar FROM bat " - "UNION (SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat)" + "UNION (SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat)", ) # this solves it self.assert_compile( - union( - except_(s, s).alias().select(), - s - ), + union(except_(s, s).alias().select(), s), "SELECT anon_1.foo, anon_1.bar FROM " "(SELECT foo, bar FROM bat EXCEPT " "SELECT foo, bar FROM bat) AS anon_1 " - "UNION SELECT foo, bar FROM bat" + "UNION SELECT foo, bar FROM bat", ) self.assert_compile( - except_( - union(s, s), - union(s, s) - ), + except_(union(s, s), union(s, s)), "(SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) " - "EXCEPT (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat)" + "EXCEPT (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat)", ) s2 = union(s, s) s3 = union(s2, s2) - self.assert_compile(s3, "(SELECT foo, bar FROM bat " - "UNION SELECT foo, bar FROM bat) " - "UNION (SELECT foo, bar FROM bat " - "UNION SELECT foo, bar FROM bat)") + self.assert_compile( + s3, + "(SELECT foo, bar FROM bat " + "UNION SELECT foo, bar FROM bat) " + "UNION (SELECT foo, bar FROM bat " + "UNION SELECT foo, bar FROM bat)", + ) self.assert_compile( - union( - intersect(s, s), - intersect(s, s) - ), + union(intersect(s, s), intersect(s, s)), "(SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat) " "UNION (SELECT foo, bar FROM bat INTERSECT " - "SELECT foo, bar FROM bat)" + "SELECT foo, bar FROM bat)", ) # tests for [ticket:2528] # sqlite hates all of these. self.assert_compile( - union( - s.limit(1), - s.offset(2) - ), + union(s.limit(1), s.offset(2)), "(SELECT foo, bar FROM bat LIMIT :param_1) " - "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_2)" + "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_2)", ) self.assert_compile( - union( - s.order_by(column('bar')), - s.offset(2) - ), + union(s.order_by(column("bar")), s.offset(2)), "(SELECT foo, bar FROM bat ORDER BY bar) " - "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_1)" + "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_1)", ) self.assert_compile( - union( - s.limit(1).alias('a'), - s.limit(2).alias('b') - ), + union(s.limit(1).alias("a"), s.limit(2).alias("b")), "(SELECT foo, bar FROM bat LIMIT :param_1) " - "UNION (SELECT foo, bar FROM bat LIMIT :param_2)" + "UNION (SELECT foo, bar FROM bat LIMIT :param_2)", ) self.assert_compile( - union( - s.limit(1).self_group(), - s.limit(2).self_group() - ), + union(s.limit(1).self_group(), s.limit(2).self_group()), "(SELECT foo, bar FROM bat LIMIT :param_1) " - "UNION (SELECT foo, bar FROM bat LIMIT :param_2)" + "UNION (SELECT foo, bar FROM bat LIMIT :param_2)", ) self.assert_compile( @@ -1944,22 +2140,19 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT anon_1.foo, anon_1.bar FROM " "((SELECT foo, bar FROM bat LIMIT :param_1) " "UNION (SELECT foo, bar FROM bat LIMIT :param_2 OFFSET :param_3)) " - "AS anon_1" + "AS anon_1", ) # this version works for SQLite self.assert_compile( - union( - s.limit(1).alias().select(), - s.offset(2).alias().select(), - ), + union(s.limit(1).alias().select(), s.offset(2).alias().select()), "SELECT anon_1.foo, anon_1.bar " "FROM (SELECT foo, bar FROM bat" " LIMIT :param_1) AS anon_1 " "UNION SELECT anon_2.foo, anon_2.bar " "FROM (SELECT foo, bar " "FROM bat" - " LIMIT -1 OFFSET :param_2) AS anon_2" + " LIMIT -1 OFFSET :param_2) AS anon_2", ) def test_binds(self): @@ -1971,15 +2164,16 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): expected_default_params_list, test_param_dict, expected_test_params_dict, - expected_test_params_list + expected_test_params_list, ) in [ ( select( [table1, table2], and_( table1.c.myid == table2.c.otherid, - table1.c.name == bindparam('mytablename') - )), + table1.c.name == bindparam("mytablename"), + ), + ), "SELECT mytable.myid, mytable.name, mytable.description, " "myothertable.otherid, myothertable.othername FROM mytable, " "myothertable WHERE mytable.myid = myothertable.otherid " @@ -1988,55 +2182,80 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "myothertable.otherid, myothertable.othername FROM mytable, " "myothertable WHERE mytable.myid = myothertable.otherid AND " "mytable.name = ?", - {'mytablename': None}, [None], - {'mytablename': 5}, {'mytablename': 5}, [5] + {"mytablename": None}, + [None], + {"mytablename": 5}, + {"mytablename": 5}, + [5], ), ( - select([table1], or_(table1.c.myid == bindparam('myid'), - table2.c.otherid == bindparam('myid'))), + select( + [table1], + or_( + table1.c.myid == bindparam("myid"), + table2.c.otherid == bindparam("myid"), + ), + ), "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable, myothertable WHERE mytable.myid = :myid " "OR myothertable.otherid = :myid", "SELECT mytable.myid, mytable.name, mytable.description " "FROM mytable, myothertable WHERE mytable.myid = ? " "OR myothertable.otherid = ?", - {'myid': None}, [None, None], - {'myid': 5}, {'myid': 5}, [5, 5] + {"myid": None}, + [None, None], + {"myid": 5}, + {"myid": 5}, + [5, 5], ), ( - text("SELECT mytable.myid, mytable.name, " - "mytable.description FROM " - "mytable, myothertable WHERE mytable.myid = :myid OR " - "myothertable.otherid = :myid"), + text( + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM " + "mytable, myothertable WHERE mytable.myid = :myid OR " + "myothertable.otherid = :myid" + ), "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = :myid OR " "myothertable.otherid = :myid", "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = ? OR " "myothertable.otherid = ?", - {'myid': None}, [None, None], - {'myid': 5}, {'myid': 5}, [5, 5] + {"myid": None}, + [None, None], + {"myid": 5}, + {"myid": 5}, + [5, 5], ), ( - select([table1], or_(table1.c.myid == - bindparam('myid', unique=True), - table2.c.otherid == - bindparam('myid', unique=True))), + select( + [table1], + or_( + table1.c.myid == bindparam("myid", unique=True), + table2.c.otherid == bindparam("myid", unique=True), + ), + ), "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " ":myid_1 OR myothertable.otherid = :myid_2", "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = ? " "OR myothertable.otherid = ?", - {'myid_1': None, 'myid_2': None}, [None, None], - {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6] + {"myid_1": None, "myid_2": None}, + [None, None], + {"myid_1": 5, "myid_2": 6}, + {"myid_1": 5, "myid_2": 6}, + [5, 6], ), ( - bindparam('test', type_=String, required=False) + text("'hi'"), + bindparam("test", type_=String, required=False) + text("'hi'"), ":test || 'hi'", "? || 'hi'", - {'test': None}, [None], - {}, {'test': None}, [None] + {"test": None}, + [None], + {}, + {"test": None}, + [None], ), ( # testing select.params() here - bindparam() objects @@ -2044,89 +2263,125 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): select( [table1], or_( - table1.c.myid == bindparam('myid'), - table2.c.otherid == bindparam('myotherid') - )).params({'myid': 8, 'myotherid': 7}), + table1.c.myid == bindparam("myid"), + table2.c.otherid == bindparam("myotherid"), + ), + ).params({"myid": 8, "myotherid": 7}), "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " ":myid OR myothertable.otherid = :myotherid", "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " "? OR myothertable.otherid = ?", - {'myid': 8, 'myotherid': 7}, [8, 7], - {'myid': 5}, {'myid': 5, 'myotherid': 7}, [5, 7] + {"myid": 8, "myotherid": 7}, + [8, 7], + {"myid": 5}, + {"myid": 5, "myotherid": 7}, + [5, 7], ), ( - select([table1], or_(table1.c.myid == - bindparam('myid', value=7, unique=True), - table2.c.otherid == - bindparam('myid', value=8, unique=True))), + select( + [table1], + or_( + table1.c.myid + == bindparam("myid", value=7, unique=True), + table2.c.otherid + == bindparam("myid", value=8, unique=True), + ), + ), "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " ":myid_1 OR myothertable.otherid = :myid_2", "SELECT mytable.myid, mytable.name, mytable.description FROM " "mytable, myothertable WHERE mytable.myid = " "? OR myothertable.otherid = ?", - {'myid_1': 7, 'myid_2': 8}, [7, 8], - {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6] + {"myid_1": 7, "myid_2": 8}, + [7, 8], + {"myid_1": 5, "myid_2": 6}, + {"myid_1": 5, "myid_2": 6}, + [5, 6], ), ]: - self.assert_compile(stmt, expected_named_stmt, - params=expected_default_params_dict) - self.assert_compile(stmt, expected_positional_stmt, - dialect=sqlite.dialect()) + self.assert_compile( + stmt, expected_named_stmt, params=expected_default_params_dict + ) + self.assert_compile( + stmt, expected_positional_stmt, dialect=sqlite.dialect() + ) nonpositional = stmt.compile() positional = stmt.compile(dialect=sqlite.dialect()) pp = positional.params - eq_([pp[k] for k in positional.positiontup], - expected_default_params_list) + eq_( + [pp[k] for k in positional.positiontup], + expected_default_params_list, + ) - eq_(nonpositional.construct_params(test_param_dict), - expected_test_params_dict) + eq_( + nonpositional.construct_params(test_param_dict), + expected_test_params_dict, + ) pp = positional.construct_params(test_param_dict) eq_( [pp[k] for k in positional.positiontup], - expected_test_params_list + expected_test_params_list, ) # check that params() doesn't modify original statement - s = select([table1], or_(table1.c.myid == bindparam('myid'), - table2.c.otherid == - bindparam('myotherid'))) - s2 = s.params({'myid': 8, 'myotherid': 7}) - s3 = s2.params({'myid': 9}) - assert s.compile().params == {'myid': None, 'myotherid': None} - assert s2.compile().params == {'myid': 8, 'myotherid': 7} - assert s3.compile().params == {'myid': 9, 'myotherid': 7} + s = select( + [table1], + or_( + table1.c.myid == bindparam("myid"), + table2.c.otherid == bindparam("myotherid"), + ), + ) + s2 = s.params({"myid": 8, "myotherid": 7}) + s3 = s2.params({"myid": 9}) + assert s.compile().params == {"myid": None, "myotherid": None} + assert s2.compile().params == {"myid": 8, "myotherid": 7} + assert s3.compile().params == {"myid": 9, "myotherid": 7} # test using same 'unique' param object twice in one compile s = select([table1.c.myid]).where(table1.c.myid == 12).as_scalar() s2 = select([table1, s], table1.c.myid == s) self.assert_compile( - s2, "SELECT mytable.myid, mytable.name, mytable.description, " + s2, + "SELECT mytable.myid, mytable.name, mytable.description, " "(SELECT mytable.myid FROM mytable WHERE mytable.myid = " ":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = " - "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)") + "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)", + ) positional = s2.compile(dialect=sqlite.dialect()) pp = positional.params assert [pp[k] for k in positional.positiontup] == [12, 12] # check that conflicts with "unique" params are caught - s = select([table1], or_(table1.c.myid == 7, - table1.c.myid == bindparam('myid_1'))) - assert_raises_message(exc.CompileError, - "conflicts with unique bind parameter " - "of the same name", - str, s) - - s = select([table1], or_(table1.c.myid == 7, table1.c.myid == 8, - table1.c.myid == bindparam('myid_1'))) - assert_raises_message(exc.CompileError, - "conflicts with unique bind parameter " - "of the same name", - str, s) + s = select( + [table1], + or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")), + ) + assert_raises_message( + exc.CompileError, + "conflicts with unique bind parameter " "of the same name", + str, + s, + ) + + s = select( + [table1], + or_( + table1.c.myid == 7, + table1.c.myid == 8, + table1.c.myid == bindparam("myid_1"), + ), + ) + assert_raises_message( + exc.CompileError, + "conflicts with unique bind parameter " "of the same name", + str, + s, + ) def _test_binds_no_hash_collision(self): """test that construct_params doesn't corrupt dict @@ -2134,84 +2389,85 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): total_params = 100000 - in_clause = [':in%d' % i for i in range(total_params)] - params = dict(('in%d' % i, i) for i in range(total_params)) - t = text('text clause %s' % ', '.join(in_clause)) + in_clause = [":in%d" % i for i in range(total_params)] + params = dict(("in%d" % i, i) for i in range(total_params)) + t = text("text clause %s" % ", ".join(in_clause)) eq_(len(t.bindparams), total_params) c = t.compile() pp = c.construct_params(params) - eq_(len(set(pp)), total_params, '%s %s' % (len(set(pp)), len(pp))) + eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp))) eq_(len(set(pp.values())), total_params) def test_bind_as_col(self): - t = table('foo', column('id')) + t = table("foo", column("id")) - s = select([t, literal('lala').label('hoho')]) + s = select([t, literal("lala").label("hoho")]) self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo") assert [str(c) for c in s.c] == ["id", "hoho"] def test_bind_callable(self): - expr = column('x') == bindparam("key", callable_=lambda: 12) - self.assert_compile( - expr, - "x = :key", - {'x': 12} - ) + expr = column("x") == bindparam("key", callable_=lambda: 12) + self.assert_compile(expr, "x = :key", {"x": 12}) def test_bind_params_missing(self): assert_raises_message( exc.InvalidRequestError, r"A value is required for bind parameter 'x'", - select( - [table1]).where( + select([table1]) + .where( and_( table1.c.myid == bindparam("x", required=True), - table1.c.name == bindparam("y", required=True) + table1.c.name == bindparam("y", required=True), ) - ).compile().construct_params, - params=dict(y=5) + ) + .compile() + .construct_params, + params=dict(y=5), ) assert_raises_message( exc.InvalidRequestError, r"A value is required for bind parameter 'x'", - select( - [table1]).where( - table1.c.myid == bindparam( - "x", - required=True)).compile().construct_params) + select([table1]) + .where(table1.c.myid == bindparam("x", required=True)) + .compile() + .construct_params, + ) assert_raises_message( exc.InvalidRequestError, r"A value is required for bind parameter 'x', " "in parameter group 2", - select( - [table1]).where( + select([table1]) + .where( and_( table1.c.myid == bindparam("x", required=True), - table1.c.name == bindparam("y", required=True) + table1.c.name == bindparam("y", required=True), ) - ).compile().construct_params, - params=dict(y=5), _group_number=2) + ) + .compile() + .construct_params, + params=dict(y=5), + _group_number=2, + ) assert_raises_message( exc.InvalidRequestError, r"A value is required for bind parameter 'x', " "in parameter group 2", - select( - [table1]).where( - table1.c.myid == bindparam( - "x", - required=True)).compile().construct_params, - _group_number=2) + select([table1]) + .where(table1.c.myid == bindparam("x", required=True)) + .compile() + .construct_params, + _group_number=2, + ) def test_tuple(self): self.assert_compile( - tuple_(table1.c.myid, table1.c.name).in_( - [(1, 'foo'), (5, 'bar')]), + tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]), "(mytable.myid, mytable.name) IN " - "((:param_1, :param_2), (:param_3, :param_4))" + "((:param_1, :param_2), (:param_3, :param_4))", ) self.assert_compile( @@ -2219,7 +2475,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): [tuple_(table2.c.otherid, table2.c.othername)] ), "(mytable.myid, mytable.name) IN " - "((myothertable.otherid, myothertable.othername))" + "((myothertable.otherid, myothertable.othername))", ) self.assert_compile( @@ -2227,226 +2483,245 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): select([table2.c.otherid, table2.c.othername]) ), "(mytable.myid, mytable.name) IN (SELECT " - "myothertable.otherid, myothertable.othername FROM myothertable)" + "myothertable.otherid, myothertable.othername FROM myothertable)", ) def test_expanding_parameter(self): self.assert_compile( tuple_(table1.c.myid, table1.c.name).in_( - bindparam('foo', expanding=True)), - "(mytable.myid, mytable.name) IN ([EXPANDING_foo])" + bindparam("foo", expanding=True) + ), + "(mytable.myid, mytable.name) IN ([EXPANDING_foo])", ) self.assert_compile( - table1.c.myid.in_(bindparam('foo', expanding=True)), - "mytable.myid IN ([EXPANDING_foo])" + table1.c.myid.in_(bindparam("foo", expanding=True)), + "mytable.myid IN ([EXPANDING_foo])", ) def test_cast(self): - tbl = table('casttest', - column('id', Integer), - column('v1', Float), - column('v2', Float), - column('ts', TIMESTAMP), - ) + tbl = table( + "casttest", + column("id", Integer), + column("v1", Float), + column("v2", Float), + column("ts", TIMESTAMP), + ) def check_results(dialect, expected_results, literal): - eq_(len(expected_results), 5, - 'Incorrect number of expected results') - eq_(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), - 'CAST(casttest.v1 AS %s)' % expected_results[0]) - eq_(str(tbl.c.v1.cast(Numeric).compile(dialect=dialect)), - 'CAST(casttest.v1 AS %s)' % expected_results[0]) - eq_(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)), - 'CAST(casttest.v1 AS %s)' % expected_results[1]) - eq_(str(cast(tbl.c.ts, Date).compile(dialect=dialect)), - 'CAST(casttest.ts AS %s)' % expected_results[2]) - eq_(str(cast(1234, Text).compile(dialect=dialect)), - 'CAST(%s AS %s)' % (literal, expected_results[3])) - eq_(str(cast('test', String(20)).compile(dialect=dialect)), - 'CAST(%s AS %s)' % (literal, expected_results[4])) + eq_( + len(expected_results), + 5, + "Incorrect number of expected results", + ) + eq_( + str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)), + "CAST(casttest.v1 AS %s)" % expected_results[0], + ) + eq_( + str(tbl.c.v1.cast(Numeric).compile(dialect=dialect)), + "CAST(casttest.v1 AS %s)" % expected_results[0], + ) + eq_( + str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)), + "CAST(casttest.v1 AS %s)" % expected_results[1], + ) + eq_( + str(cast(tbl.c.ts, Date).compile(dialect=dialect)), + "CAST(casttest.ts AS %s)" % expected_results[2], + ) + eq_( + str(cast(1234, Text).compile(dialect=dialect)), + "CAST(%s AS %s)" % (literal, expected_results[3]), + ) + eq_( + str(cast("test", String(20)).compile(dialect=dialect)), + "CAST(%s AS %s)" % (literal, expected_results[4]), + ) # fixme: shoving all of this dialect-specific stuff in one test # is now officially completely ridiculous AND non-obviously omits # coverage on other dialects. sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile( - dialect=dialect) + dialect=dialect + ) if isinstance(dialect, type(mysql.dialect())): - eq_(str(sel), + eq_( + str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, " "casttest.ts, " - "CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest") + "CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest", + ) else: - eq_(str(sel), + eq_( + str(sel), "SELECT casttest.id, casttest.v1, casttest.v2, " "casttest.ts, CAST(casttest.v1 AS NUMERIC) AS " - "anon_1 \nFROM casttest") + "anon_1 \nFROM casttest", + ) # first test with PostgreSQL engine check_results( - postgresql.dialect(), [ - 'NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'], - '%(param_1)s') + postgresql.dialect(), + ["NUMERIC", "NUMERIC(12, 9)", "DATE", "TEXT", "VARCHAR(20)"], + "%(param_1)s", + ) # then the Oracle engine check_results( - oracle.dialect(), [ - 'NUMERIC', 'NUMERIC(12, 9)', 'DATE', - 'CLOB', 'VARCHAR2(20 CHAR)'], - ':param_1') + oracle.dialect(), + ["NUMERIC", "NUMERIC(12, 9)", "DATE", "CLOB", "VARCHAR2(20 CHAR)"], + ":param_1", + ) # then the sqlite engine - check_results(sqlite.dialect(), ['NUMERIC', 'NUMERIC(12, 9)', - 'DATE', 'TEXT', 'VARCHAR(20)'], '?') + check_results( + sqlite.dialect(), + ["NUMERIC", "NUMERIC(12, 9)", "DATE", "TEXT", "VARCHAR(20)"], + "?", + ) # then the MySQL engine - check_results(mysql.dialect(), ['DECIMAL', 'DECIMAL(12, 9)', - 'DATE', 'CHAR', 'CHAR(20)'], '%s') - - self.assert_compile(cast(text('NULL'), Integer), - 'CAST(NULL AS INTEGER)', - dialect=sqlite.dialect()) - self.assert_compile(cast(null(), Integer), - 'CAST(NULL AS INTEGER)', - dialect=sqlite.dialect()) - self.assert_compile(cast(literal_column('NULL'), Integer), - 'CAST(NULL AS INTEGER)', - dialect=sqlite.dialect()) + check_results( + mysql.dialect(), + ["DECIMAL", "DECIMAL(12, 9)", "DATE", "CHAR", "CHAR(20)"], + "%s", + ) - def test_over(self): self.assert_compile( - func.row_number().over(), - "row_number() OVER ()" + cast(text("NULL"), Integer), + "CAST(NULL AS INTEGER)", + dialect=sqlite.dialect(), + ) + self.assert_compile( + cast(null(), Integer), + "CAST(NULL AS INTEGER)", + dialect=sqlite.dialect(), + ) + self.assert_compile( + cast(literal_column("NULL"), Integer), + "CAST(NULL AS INTEGER)", + dialect=sqlite.dialect(), ) + + def test_over(self): + self.assert_compile(func.row_number().over(), "row_number() OVER ()") self.assert_compile( func.row_number().over( order_by=[table1.c.name, table1.c.description] ), - "row_number() OVER (ORDER BY mytable.name, mytable.description)" + "row_number() OVER (ORDER BY mytable.name, mytable.description)", ) self.assert_compile( func.row_number().over( partition_by=[table1.c.name, table1.c.description] ), "row_number() OVER (PARTITION BY mytable.name, " - "mytable.description)" + "mytable.description)", ) self.assert_compile( func.row_number().over( - partition_by=[table1.c.name], - order_by=[table1.c.description] + partition_by=[table1.c.name], order_by=[table1.c.description] ), "row_number() OVER (PARTITION BY mytable.name " - "ORDER BY mytable.description)" + "ORDER BY mytable.description)", ) self.assert_compile( func.row_number().over( - partition_by=table1.c.name, - order_by=table1.c.description + partition_by=table1.c.name, order_by=table1.c.description ), "row_number() OVER (PARTITION BY mytable.name " - "ORDER BY mytable.description)" + "ORDER BY mytable.description)", ) self.assert_compile( func.row_number().over( partition_by=table1.c.name, - order_by=[table1.c.name, table1.c.description] + order_by=[table1.c.name, table1.c.description], ), "row_number() OVER (PARTITION BY mytable.name " - "ORDER BY mytable.name, mytable.description)" + "ORDER BY mytable.name, mytable.description)", ) self.assert_compile( func.row_number().over( - partition_by=[], - order_by=[table1.c.name, table1.c.description] + partition_by=[], order_by=[table1.c.name, table1.c.description] ), - "row_number() OVER (ORDER BY mytable.name, mytable.description)" + "row_number() OVER (ORDER BY mytable.name, mytable.description)", ) self.assert_compile( func.row_number().over( - partition_by=[table1.c.name, table1.c.description], - order_by=[] + partition_by=[table1.c.name, table1.c.description], order_by=[] ), "row_number() OVER (PARTITION BY mytable.name, " - "mytable.description)" + "mytable.description)", ) self.assert_compile( - func.row_number().over( - partition_by=[], - order_by=[] - ), - "row_number() OVER ()" + func.row_number().over(partition_by=[], order_by=[]), + "row_number() OVER ()", ) self.assert_compile( - select([func.row_number().over( - order_by=table1.c.description - ).label('foo')]), + select( + [ + func.row_number() + .over(order_by=table1.c.description) + .label("foo") + ] + ), "SELECT row_number() OVER (ORDER BY mytable.description) " - "AS foo FROM mytable" + "AS foo FROM mytable", ) # test from_obj generation. # from func: self.assert_compile( - select([ - func.max(table1.c.name).over( - partition_by=['description'] - ) - ]), + select( + [func.max(table1.c.name).over(partition_by=["description"])] + ), "SELECT max(mytable.name) OVER (PARTITION BY mytable.description) " - "AS anon_1 FROM mytable" + "AS anon_1 FROM mytable", ) # from partition_by self.assert_compile( - select([ - func.row_number().over( - partition_by=[table1.c.name] - ) - ]), + select([func.row_number().over(partition_by=[table1.c.name])]), "SELECT row_number() OVER (PARTITION BY mytable.name) " - "AS anon_1 FROM mytable" + "AS anon_1 FROM mytable", ) # from order_by self.assert_compile( - select([ - func.row_number().over( - order_by=table1.c.name - ) - ]), + select([func.row_number().over(order_by=table1.c.name)]), "SELECT row_number() OVER (ORDER BY mytable.name) " - "AS anon_1 FROM mytable" + "AS anon_1 FROM mytable", ) # this tests that _from_objects # concantenates OK self.assert_compile( select([column("x") + over(func.foo())]), - "SELECT x + foo() OVER () AS anon_1" + "SELECT x + foo() OVER () AS anon_1", ) # test a reference to a label that in the referecned selectable; # this resolves - expr = (table1.c.myid + 5).label('sum') + expr = (table1.c.myid + 5).label("sum") stmt = select([expr]).alias() self.assert_compile( select([stmt.c.sum, func.row_number().over(order_by=stmt.c.sum)]), "SELECT anon_1.sum, row_number() OVER (ORDER BY anon_1.sum) " "AS anon_2 FROM (SELECT mytable.myid + :myid_1 AS sum " - "FROM mytable) AS anon_1" + "FROM mytable) AS anon_1", ) # test a reference to a label that's at the same level as the OVER # in the columns clause; doesn't resolve - expr = (table1.c.myid + 5).label('sum') + expr = (table1.c.myid + 5).label("sum") self.assert_compile( select([expr, func.row_number().over(order_by=expr)]), "SELECT mytable.myid + :myid_1 AS sum, " "row_number() OVER " - "(ORDER BY mytable.myid + :myid_1) AS anon_1 FROM mytable" + "(ORDER BY mytable.myid + :myid_1) AS anon_1 FROM mytable", ) def test_over_framespec(self): @@ -2457,7 +2732,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT row_number() OVER " "(ORDER BY mytable.myid ROWS BETWEEN CURRENT " "ROW AND UNBOUNDED FOLLOWING)" - " AS anon_1 FROM mytable" + " AS anon_1 FROM mytable", ) self.assert_compile( @@ -2465,7 +2740,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT row_number() OVER " "(ORDER BY mytable.myid ROWS BETWEEN UNBOUNDED " "PRECEDING AND UNBOUNDED FOLLOWING)" - " AS anon_1 FROM mytable" + " AS anon_1 FROM mytable", ) self.assert_compile( @@ -2473,7 +2748,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT row_number() OVER " "(ORDER BY mytable.myid RANGE BETWEEN " "UNBOUNDED PRECEDING AND CURRENT ROW)" - " AS anon_1 FROM mytable" + " AS anon_1 FROM mytable", ) self.assert_compile( @@ -2482,7 +2757,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "(ORDER BY mytable.myid RANGE BETWEEN " ":param_1 PRECEDING AND :param_2 FOLLOWING)" " AS anon_1 FROM mytable", - checkparams={'param_1': 5, 'param_2': 10} + checkparams={"param_1": 5, "param_2": 10}, ) self.assert_compile( @@ -2491,7 +2766,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "(ORDER BY mytable.myid RANGE BETWEEN " ":param_1 FOLLOWING AND :param_2 FOLLOWING)" " AS anon_1 FROM mytable", - checkparams={'param_1': 1, 'param_2': 10} + checkparams={"param_1": 1, "param_2": 10}, ) self.assert_compile( @@ -2500,89 +2775,108 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): "(ORDER BY mytable.myid RANGE BETWEEN " ":param_1 PRECEDING AND :param_2 PRECEDING)" " AS anon_1 FROM mytable", - checkparams={'param_1': 10, 'param_2': 1} + checkparams={"param_1": 10, "param_2": 1}, ) def test_over_invalid_framespecs(self): assert_raises_message( exc.ArgumentError, "Integer or None expected for range value", - func.row_number().over, range_=("foo", 8) + func.row_number().over, + range_=("foo", 8), ) assert_raises_message( exc.ArgumentError, "Integer or None expected for range value", - func.row_number().over, range_=(-5, "foo") + func.row_number().over, + range_=(-5, "foo"), ) assert_raises_message( exc.ArgumentError, "'range_' and 'rows' are mutually exclusive", - func.row_number().over, range_=(-5, 8), rows=(-2, 5) + func.row_number().over, + range_=(-5, 8), + rows=(-2, 5), ) def test_over_within_group(self): from sqlalchemy import within_group - stmt = select([ - table1.c.myid, - within_group( - func.percentile_cont(0.5), - table1.c.name.desc() - ).over( - range_=(1, 2), - partition_by=table1.c.name, - order_by=table1.c.myid - ) - ]) + + stmt = select( + [ + table1.c.myid, + within_group( + func.percentile_cont(0.5), table1.c.name.desc() + ).over( + range_=(1, 2), + partition_by=table1.c.name, + order_by=table1.c.myid, + ), + ] + ) eq_ignore_whitespace( str(stmt), "SELECT mytable.myid, percentile_cont(:percentile_cont_1) " "WITHIN GROUP (ORDER BY mytable.name DESC) " "OVER (PARTITION BY mytable.name ORDER BY mytable.myid " "RANGE BETWEEN :param_1 FOLLOWING AND :param_2 FOLLOWING) " - "AS anon_1 FROM mytable" + "AS anon_1 FROM mytable", + ) + + stmt = select( + [ + table1.c.myid, + within_group( + func.percentile_cont(0.5), table1.c.name.desc() + ).over( + rows=(1, 2), + partition_by=table1.c.name, + order_by=table1.c.myid, + ), + ] ) - - stmt = select([ - table1.c.myid, - within_group( - func.percentile_cont(0.5), - table1.c.name.desc() - ).over( - rows=(1, 2), - partition_by=table1.c.name, - order_by=table1.c.myid - ) - ]) eq_ignore_whitespace( str(stmt), "SELECT mytable.myid, percentile_cont(:percentile_cont_1) " "WITHIN GROUP (ORDER BY mytable.name DESC) " "OVER (PARTITION BY mytable.name ORDER BY mytable.myid " "ROWS BETWEEN :param_1 FOLLOWING AND :param_2 FOLLOWING) " - "AS anon_1 FROM mytable" + "AS anon_1 FROM mytable", ) - - def test_date_between(self): import datetime - table = Table('dt', metadata, - Column('date', Date)) + + table = Table("dt", metadata, Column("date", Date)) self.assert_compile( - table.select(table.c.date.between(datetime.date(2006, 6, 1), - datetime.date(2006, 6, 5))), + table.select( + table.c.date.between( + datetime.date(2006, 6, 1), datetime.date(2006, 6, 5) + ) + ), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", - checkparams={'date_1': datetime.date(2006, 6, 1), - 'date_2': datetime.date(2006, 6, 5)}) + checkparams={ + "date_1": datetime.date(2006, 6, 1), + "date_2": datetime.date(2006, 6, 5), + }, + ) self.assert_compile( - table.select(sql.between(table.c.date, datetime.date(2006, 6, 1), - datetime.date(2006, 6, 5))), + table.select( + sql.between( + table.c.date, + datetime.date(2006, 6, 1), + datetime.date(2006, 6, 5), + ) + ), "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2", - checkparams={'date_1': datetime.date(2006, 6, 1), - 'date_2': datetime.date(2006, 6, 5)}) + checkparams={ + "date_1": datetime.date(2006, 6, 1), + "date_2": datetime.date(2006, 6, 5), + }, + ) def test_delayed_col_naming(self): my_str = Column(String) @@ -2592,18 +2886,18 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises_message( exc.InvalidRequestError, "Cannot initialize a sub-selectable with this Column", - lambda: sel1.c + lambda: sel1.c, ) # calling label or as_scalar doesn't compile # anything. - sel2 = select([func.substr(my_str, 2, 3)]).label('my_substr') + sel2 = select([func.substr(my_str, 2, 3)]).label("my_substr") assert_raises_message( exc.CompileError, "Cannot compile Column object until its 'name' is assigned.", sel2.compile, - dialect=default.DefaultDialect() + dialect=default.DefaultDialect(), ) sel3 = select([my_str]).as_scalar() @@ -2611,24 +2905,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): exc.CompileError, "Cannot compile Column object until its 'name' is assigned.", sel3.compile, - dialect=default.DefaultDialect() + dialect=default.DefaultDialect(), ) - my_str.name = 'foo' + my_str.name = "foo" + self.assert_compile(sel1, "SELECT foo") self.assert_compile( - sel1, - "SELECT foo", - ) - self.assert_compile( - sel2, - '(SELECT substr(foo, :substr_2, :substr_3) AS substr_1)', + sel2, "(SELECT substr(foo, :substr_2, :substr_3) AS substr_1)" ) - self.assert_compile( - sel3, - "(SELECT foo)" - ) + self.assert_compile(sel3, "(SELECT foo)") def test_naming(self): # TODO: the part where we check c.keys() are not "compile" tests, they @@ -2636,36 +2923,46 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # version of that suite f1 = func.hoho(table1.c.name) - s1 = select([table1.c.myid, table1.c.myid.label('foobar'), - f1, - func.lala(table1.c.name).label('gg')]) - - eq_( - list(s1.c.keys()), - ['myid', 'foobar', str(f1), 'gg'] + s1 = select( + [ + table1.c.myid, + table1.c.myid.label("foobar"), + f1, + func.lala(table1.c.name).label("gg"), + ] ) + eq_(list(s1.c.keys()), ["myid", "foobar", str(f1), "gg"]) + meta = MetaData() - t1 = Table('mytable', meta, Column('col1', Integer)) + t1 = Table("mytable", meta, Column("col1", Integer)) exprs = ( table1.c.myid == 12, func.hoho(table1.c.myid), cast(table1.c.name, Numeric), - literal('x'), + literal("x"), ) for col, key, expr, lbl in ( - (table1.c.name, 'name', 'mytable.name', None), - (exprs[0], str(exprs[0]), 'mytable.myid = :myid_1', 'anon_1'), - (exprs[1], str(exprs[1]), 'hoho(mytable.myid)', 'hoho_1'), - (exprs[2], str(exprs[2]), - 'CAST(mytable.name AS NUMERIC)', 'anon_1'), - (t1.c.col1, 'col1', 'mytable.col1', None), - (column('some wacky thing'), 'some wacky thing', - '"some wacky thing"', ''), - (exprs[3], exprs[3].key, ":param_1", "anon_1") + (table1.c.name, "name", "mytable.name", None), + (exprs[0], str(exprs[0]), "mytable.myid = :myid_1", "anon_1"), + (exprs[1], str(exprs[1]), "hoho(mytable.myid)", "hoho_1"), + ( + exprs[2], + str(exprs[2]), + "CAST(mytable.name AS NUMERIC)", + "anon_1", + ), + (t1.c.col1, "col1", "mytable.col1", None), + ( + column("some wacky thing"), + "some wacky thing", + '"some wacky thing"', + "", + ), + (exprs[3], exprs[3].key, ":param_1", "anon_1"), ): - if getattr(col, 'table', None) is not None: + if getattr(col, "table", None) is not None: t = col.table else: t = table1 @@ -2675,107 +2972,151 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): if lbl: self.assert_compile( - s1, "SELECT %s AS %s FROM mytable" % - (expr, lbl)) + s1, "SELECT %s AS %s FROM mytable" % (expr, lbl) + ) else: self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,)) s1 = select([s1]) if lbl: self.assert_compile( - s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % - (lbl, expr, lbl)) + s1, + "SELECT %s FROM (SELECT %s AS %s FROM mytable)" + % (lbl, expr, lbl), + ) elif col.table is not None: # sqlite rule labels subquery columns self.assert_compile( - s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" % - (key, expr, key)) + s1, + "SELECT %s FROM (SELECT %s AS %s FROM mytable)" + % (key, expr, key), + ) else: - self.assert_compile(s1, - "SELECT %s FROM (SELECT %s FROM mytable)" % - (expr, expr)) + self.assert_compile( + s1, + "SELECT %s FROM (SELECT %s FROM mytable)" % (expr, expr), + ) def test_hints(self): s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s") - s2 = select([table1.c.myid]).\ - with_hint(table1, "index(%(name)s idx)", 'oracle').\ - with_hint(table1, "WITH HINT INDEX idx", 'sybase') + s2 = ( + select([table1.c.myid]) + .with_hint(table1, "index(%(name)s idx)", "oracle") + .with_hint(table1, "WITH HINT INDEX idx", "sybase") + ) a1 = table1.alias() s3 = select([a1.c.myid]).with_hint(a1, "index(%(name)s hint)") - subs4 = select([ - table1, table2 - ]).select_from( - table1.join(table2, table1.c.myid == table2.c.otherid)).\ - with_hint(table1, 'hint1') + subs4 = ( + select([table1, table2]) + .select_from( + table1.join(table2, table1.c.myid == table2.c.otherid) + ) + .with_hint(table1, "hint1") + ) - s4 = select([table3]).select_from( - table3.join( - subs4, - subs4.c.othername == table3.c.otherstuff + s4 = ( + select([table3]) + .select_from( + table3.join(subs4, subs4.c.othername == table3.c.otherstuff) ) - ).\ - with_hint(table3, 'hint3') + .with_hint(table3, "hint3") + ) - t1 = table('QuotedName', column('col1')) - s6 = select([t1.c.col1]).where(t1.c.col1 > 10).\ - with_hint(t1, '%(name)s idx1') - a2 = t1.alias('SomeName') - s7 = select([a2.c.col1]).where(a2.c.col1 > 10).\ - with_hint(a2, '%(name)s idx1') + t1 = table("QuotedName", column("col1")) + s6 = ( + select([t1.c.col1]) + .where(t1.c.col1 > 10) + .with_hint(t1, "%(name)s idx1") + ) + a2 = t1.alias("SomeName") + s7 = ( + select([a2.c.col1]) + .where(a2.c.col1 > 10) + .with_hint(a2, "%(name)s idx1") + ) - mysql_d, oracle_d, sybase_d = \ - mysql.dialect(), \ - oracle.dialect(), \ - sybase.dialect() + mysql_d, oracle_d, sybase_d = ( + mysql.dialect(), + oracle.dialect(), + sybase.dialect(), + ) for stmt, dialect, expected in [ - (s, mysql_d, - "SELECT mytable.myid FROM mytable test hint mytable"), - (s, oracle_d, - "SELECT /*+ test hint mytable */ mytable.myid FROM mytable"), - (s, sybase_d, - "SELECT mytable.myid FROM mytable test hint mytable"), - (s2, mysql_d, - "SELECT mytable.myid FROM mytable"), - (s2, oracle_d, - "SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable"), - (s2, sybase_d, - "SELECT mytable.myid FROM mytable WITH HINT INDEX idx"), - (s3, mysql_d, + (s, mysql_d, "SELECT mytable.myid FROM mytable test hint mytable"), + ( + s, + oracle_d, + "SELECT /*+ test hint mytable */ mytable.myid FROM mytable", + ), + ( + s, + sybase_d, + "SELECT mytable.myid FROM mytable test hint mytable", + ), + (s2, mysql_d, "SELECT mytable.myid FROM mytable"), + ( + s2, + oracle_d, + "SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable", + ), + ( + s2, + sybase_d, + "SELECT mytable.myid FROM mytable WITH HINT INDEX idx", + ), + ( + s3, + mysql_d, "SELECT mytable_1.myid FROM mytable AS mytable_1 " - "index(mytable_1 hint)"), - (s3, oracle_d, + "index(mytable_1 hint)", + ), + ( + s3, + oracle_d, "SELECT /*+ index(mytable_1 hint) */ mytable_1.myid FROM " - "mytable mytable_1"), - (s3, sybase_d, + "mytable mytable_1", + ), + ( + s3, + sybase_d, "SELECT mytable_1.myid FROM mytable AS mytable_1 " - "index(mytable_1 hint)"), - (s4, mysql_d, + "index(mytable_1 hint)", + ), + ( + s4, + mysql_d, "SELECT thirdtable.userid, thirdtable.otherstuff " "FROM thirdtable " "hint3 INNER JOIN (SELECT mytable.myid, mytable.name, " "mytable.description, myothertable.otherid, " "myothertable.othername FROM mytable hint1 INNER " "JOIN myothertable ON mytable.myid = myothertable.otherid) " - "ON othername = thirdtable.otherstuff"), - (s4, sybase_d, + "ON othername = thirdtable.otherstuff", + ), + ( + s4, + sybase_d, "SELECT thirdtable.userid, thirdtable.otherstuff " "FROM thirdtable " "hint3 JOIN (SELECT mytable.myid, mytable.name, " "mytable.description, myothertable.otherid, " "myothertable.othername FROM mytable hint1 " "JOIN myothertable ON mytable.myid = myothertable.otherid) " - "ON othername = thirdtable.otherstuff"), - (s4, oracle_d, + "ON othername = thirdtable.otherstuff", + ), + ( + s4, + oracle_d, "SELECT /*+ hint3 */ thirdtable.userid, thirdtable.otherstuff " "FROM thirdtable JOIN (SELECT /*+ hint1 */ mytable.myid," " mytable.name, mytable.description, myothertable.otherid," " myothertable.othername FROM mytable JOIN myothertable ON" " mytable.myid = myothertable.otherid) ON othername =" - " thirdtable.otherstuff"), + " thirdtable.otherstuff", + ), # TODO: figure out dictionary ordering solution here # (s5, oracle_d, # "SELECT /*+ hint3 */ /*+ hint1 */ thirdtable.userid, " @@ -2785,68 +3126,64 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL): # " myothertable.othername FROM mytable JOIN myothertable ON" # " mytable.myid = myothertable.otherid) ON othername =" # " thirdtable.otherstuff"), - (s6, oracle_d, + ( + s6, + oracle_d, """SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """ - """FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1"""), - (s7, oracle_d, - """SELECT /*+ "SomeName" idx1 */ "SomeName".col1 FROM """ - """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""), + """FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1""", + ), + ( + s7, + oracle_d, + """SELECT /*+ "SomeName" idx1 */ "SomeName".col1 FROM """ + """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1""", + ), ]: - self.assert_compile( - stmt, - expected, - dialect=dialect - ) + self.assert_compile(stmt, expected, dialect=dialect) def test_statement_hints(self): - stmt = select([table1.c.myid]).\ - with_statement_hint("test hint one").\ - with_statement_hint("test hint two", 'mysql') + stmt = ( + select([table1.c.myid]) + .with_statement_hint("test hint one") + .with_statement_hint("test hint two", "mysql") + ) self.assert_compile( - stmt, - "SELECT mytable.myid FROM mytable test hint one", + stmt, "SELECT mytable.myid FROM mytable test hint one" ) self.assert_compile( stmt, "SELECT mytable.myid FROM mytable test hint one test hint two", - dialect='mysql' + dialect="mysql", ) def test_literal_as_text_fromstring(self): - self.assert_compile( - and_(text("a"), text("b")), - "a AND b" - ) + self.assert_compile(and_(text("a"), text("b")), "a AND b") def test_literal_as_text_nonstring_raise(self): - assert_raises(exc.ArgumentError, - and_, ("a",), ("b",) - ) + assert_raises(exc.ArgumentError, and_, ("a",), ("b",)) class UnsupportedTest(fixtures.TestBase): - def test_unsupported_element_str_visit_name(self): from sqlalchemy.sql.expression import ClauseElement class SomeElement(ClauseElement): - __visit_name__ = 'some_element' + __visit_name__ = "some_element" assert_raises_message( exc.UnsupportedCompilationError, r"Compiler <sqlalchemy.sql.compiler.StrSQLCompiler .*" r"can't render element of type <class '.*SomeElement'>", - SomeElement().compile + SomeElement().compile, ) def test_unsupported_element_meth_visit_name(self): from sqlalchemy.sql.expression import ClauseElement class SomeElement(ClauseElement): - @classmethod def __visit_name__(cls): return "some_element" @@ -2855,7 +3192,7 @@ class UnsupportedTest(fixtures.TestBase): exc.UnsupportedCompilationError, r"Compiler <sqlalchemy.sql.compiler.StrSQLCompiler .*" r"can't render element of type <class '.*SomeElement'>", - SomeElement().compile + SomeElement().compile, ) def test_unsupported_operator(self): @@ -2863,12 +3200,13 @@ class UnsupportedTest(fixtures.TestBase): def myop(x, y): pass + binary = BinaryExpression(column("foo"), column("bar"), myop) assert_raises_message( exc.UnsupportedCompilationError, r"Compiler <sqlalchemy.sql.compiler.StrSQLCompiler .*" r"can't render element of type <function.*", - binary.compile + binary.compile, ) @@ -2878,15 +3216,12 @@ class StringifySpecialTest(fixtures.TestBase): eq_ignore_whitespace( str(stmt), "SELECT mytable.myid, mytable.name, mytable.description " - "FROM mytable WHERE mytable.myid = :myid_1" + "FROM mytable WHERE mytable.myid = :myid_1", ) def test_unnamed_column(self): stmt = Column(Integer) == 5 - eq_ignore_whitespace( - str(stmt), - '"<name unknown>" = :param_1' - ) + eq_ignore_whitespace(str(stmt), '"<name unknown>" = :param_1') def test_cte(self): # stringify of these was supported anyway by defaultdialect. @@ -2895,7 +3230,7 @@ class StringifySpecialTest(fixtures.TestBase): eq_ignore_whitespace( str(stmt), "WITH anon_1 AS (SELECT mytable.myid AS myid FROM mytable) " - "SELECT anon_1.myid FROM anon_1" + "SELECT anon_1.myid FROM anon_1", ) def test_next_sequence_value(self): @@ -2906,8 +3241,7 @@ class StringifySpecialTest(fixtures.TestBase): seq = Sequence("my_sequence") eq_ignore_whitespace( - str(seq.next_value()), - "<next sequence value: my_sequence>" + str(seq.next_value()), "<next sequence value: my_sequence>" ) def test_returning(self): @@ -2916,47 +3250,43 @@ class StringifySpecialTest(fixtures.TestBase): eq_ignore_whitespace( str(stmt), "INSERT INTO mytable (myid, name, description) " - "VALUES (:myid, :name, :description) RETURNING mytable.myid" + "VALUES (:myid, :name, :description) RETURNING mytable.myid", ) def test_array_index(self): - stmt = select([column('foo', types.ARRAY(Integer))[5]]) + stmt = select([column("foo", types.ARRAY(Integer))[5]]) - eq_ignore_whitespace( - str(stmt), - "SELECT foo[:foo_1] AS anon_1" - ) + eq_ignore_whitespace(str(stmt), "SELECT foo[:foo_1] AS anon_1") def test_unknown_type(self): class MyType(types.TypeEngine): - __visit_name__ = 'mytype' + __visit_name__ = "mytype" stmt = select([cast(table1.c.myid, MyType)]) eq_ignore_whitespace( str(stmt), - "SELECT CAST(mytable.myid AS MyType) AS anon_1 FROM mytable" + "SELECT CAST(mytable.myid AS MyType) AS anon_1 FROM mytable", ) def test_within_group(self): # stringify of these was supported anyway by defaultdialect. from sqlalchemy import within_group - stmt = select([ - table1.c.myid, - within_group( - func.percentile_cont(0.5), - table1.c.name.desc() - ) - ]) + + stmt = select( + [ + table1.c.myid, + within_group(func.percentile_cont(0.5), table1.c.name.desc()), + ] + ) eq_ignore_whitespace( str(stmt), "SELECT mytable.myid, percentile_cont(:percentile_cont_1) " - "WITHIN GROUP (ORDER BY mytable.name DESC) AS anon_1 FROM mytable" + "WITHIN GROUP (ORDER BY mytable.name DESC) AS anon_1 FROM mytable", ) class KwargPropagationTest(fixtures.TestBase): - @classmethod def setup_class(cls): from sqlalchemy.sql.expression import ColumnClause, TableClause @@ -2969,7 +3299,7 @@ class KwargPropagationTest(fixtures.TestBase): cls.column = CatchCol("x") cls.table = CatchTable("y") - cls.criterion = cls.column == CatchCol('y') + cls.criterion = cls.column == CatchCol("y") @compiles(CatchCol) def compile_col(element, compiler, **kw): @@ -2983,16 +3313,18 @@ class KwargPropagationTest(fixtures.TestBase): def _do_test(self, element): d = default.DefaultDialect() - d.statement_compiler(d, element, - compile_kwargs={"canary": True}) + d.statement_compiler(d, element, compile_kwargs={"canary": True}) def test_binary(self): self._do_test(self.column == 5) def test_select(self): - s = select([self.column]).select_from(self.table).\ - where(self.column == self.criterion).\ - order_by(self.column) + s = ( + select([self.column]) + .select_from(self.table) + .where(self.column == self.criterion) + .order_by(self.column) + ) self._do_test(s) def test_case(self): @@ -3029,8 +3361,11 @@ class ExecutionOptionsTest(fixtures.TestBase): def test_embedded_element_true_to_false(self): stmt = table1.insert().cte() eq_(stmt._execution_options, {"autocommit": True}) - s2 = select([table1]).select_from(stmt).\ - execution_options(autocommit=False) + s2 = ( + select([table1]) + .select_from(stmt) + .execution_options(autocommit=False) + ) eq_(s2._execution_options, {"autocommit": False}) compiled = s2.compile() @@ -3038,7 +3373,7 @@ class ExecutionOptionsTest(fixtures.TestBase): class DDLTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" def _illegal_type_fixture(self): class MyType(types.TypeEngine): @@ -3047,195 +3382,197 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): @compiles(MyType) def compile(element, compiler, **kw): raise exc.CompileError("Couldn't compile type") + return MyType def test_reraise_of_column_spec_issue(self): MyType = self._illegal_type_fixture() - t1 = Table('t', MetaData(), - Column('x', MyType()) - ) + t1 = Table("t", MetaData(), Column("x", MyType())) assert_raises_message( exc.CompileError, r"\(in table 't', column 'x'\): Couldn't compile type", - schema.CreateTable(t1).compile + schema.CreateTable(t1).compile, ) def test_reraise_of_column_spec_issue_unicode(self): MyType = self._illegal_type_fixture() - t1 = Table('t', MetaData(), - Column(u('méil'), MyType()) - ) + t1 = Table("t", MetaData(), Column(u("méil"), MyType())) assert_raises_message( exc.CompileError, u(r"\(in table 't', column 'méil'\): Couldn't compile type"), - schema.CreateTable(t1).compile + schema.CreateTable(t1).compile, ) def test_system_flag(self): m = MetaData() - t = Table('t', m, Column('x', Integer), - Column('y', Integer, system=True), - Column('z', Integer)) + t = Table( + "t", + m, + Column("x", Integer), + Column("y", Integer, system=True), + Column("z", Integer), + ) self.assert_compile( - schema.CreateTable(t), - "CREATE TABLE t (x INTEGER, z INTEGER)" + schema.CreateTable(t), "CREATE TABLE t (x INTEGER, z INTEGER)" ) m2 = MetaData() t2 = t.tometadata(m2) self.assert_compile( - schema.CreateTable(t2), - "CREATE TABLE t (x INTEGER, z INTEGER)" + schema.CreateTable(t2), "CREATE TABLE t (x INTEGER, z INTEGER)" ) def test_composite_pk_constraint_autoinc_first_implicit(self): m = MetaData() t = Table( - 't', m, - Column('a', Integer, primary_key=True), - Column('b', Integer, primary_key=True, autoincrement=True) + "t", + m, + Column("a", Integer, primary_key=True), + Column("b", Integer, primary_key=True, autoincrement=True), ) self.assert_compile( schema.CreateTable(t), "CREATE TABLE t (" "a INTEGER NOT NULL, " "b INTEGER NOT NULL, " - "PRIMARY KEY (b, a))" + "PRIMARY KEY (b, a))", ) def test_composite_pk_constraint_maintains_order_explicit(self): m = MetaData() t = Table( - 't', m, - Column('a', Integer), - Column('b', Integer, autoincrement=True), - schema.PrimaryKeyConstraint('a', 'b') + "t", + m, + Column("a", Integer), + Column("b", Integer, autoincrement=True), + schema.PrimaryKeyConstraint("a", "b"), ) self.assert_compile( schema.CreateTable(t), "CREATE TABLE t (" "a INTEGER NOT NULL, " "b INTEGER NOT NULL, " - "PRIMARY KEY (a, b))" + "PRIMARY KEY (a, b))", ) def test_create_table_suffix(self): class MyDialect(default.DefaultDialect): class MyCompiler(compiler.DDLCompiler): def create_table_suffix(self, table): - return 'SOME SUFFIX' + return "SOME SUFFIX" ddl_compiler = MyCompiler m = MetaData() - t1 = Table('t1', m, Column('q', Integer)) + t1 = Table("t1", m, Column("q", Integer)) self.assert_compile( schema.CreateTable(t1), "CREATE TABLE t1 SOME SUFFIX (q INTEGER)", - dialect=MyDialect() + dialect=MyDialect(), ) def test_table_no_cols(self): m = MetaData() - t1 = Table('t1', m) - self.assert_compile( - schema.CreateTable(t1), - "CREATE TABLE t1 ()" - ) + t1 = Table("t1", m) + self.assert_compile(schema.CreateTable(t1), "CREATE TABLE t1 ()") def test_table_no_cols_w_constraint(self): m = MetaData() - t1 = Table('t1', m, CheckConstraint('a = 1')) + t1 = Table("t1", m, CheckConstraint("a = 1")) self.assert_compile( - schema.CreateTable(t1), - "CREATE TABLE t1 (CHECK (a = 1))" + schema.CreateTable(t1), "CREATE TABLE t1 (CHECK (a = 1))" ) def test_table_one_col_w_constraint(self): m = MetaData() - t1 = Table('t1', m, Column('q', Integer), CheckConstraint('a = 1')) + t1 = Table("t1", m, Column("q", Integer), CheckConstraint("a = 1")) self.assert_compile( schema.CreateTable(t1), - "CREATE TABLE t1 (q INTEGER, CHECK (a = 1))" + "CREATE TABLE t1 (q INTEGER, CHECK (a = 1))", ) def test_schema_translate_map_table(self): m = MetaData() - t1 = Table('t1', m, Column('q', Integer)) - t2 = Table('t2', m, Column('q', Integer), schema='foo') - t3 = Table('t3', m, Column('q', Integer), schema='bar') + t1 = Table("t1", m, Column("q", Integer)) + t2 = Table("t2", m, Column("q", Integer), schema="foo") + t3 = Table("t3", m, Column("q", Integer), schema="bar") schema_translate_map = {None: "z", "bar": None, "foo": "bat"} self.assert_compile( schema.CreateTable(t1), "CREATE TABLE z.t1 (q INTEGER)", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( schema.CreateTable(t2), "CREATE TABLE bat.t2 (q INTEGER)", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( schema.CreateTable(t3), "CREATE TABLE t3 (q INTEGER)", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) def test_schema_translate_map_sequence(self): - s1 = schema.Sequence('s1') - s2 = schema.Sequence('s2', schema='foo') - s3 = schema.Sequence('s3', schema='bar') + s1 = schema.Sequence("s1") + s2 = schema.Sequence("s2", schema="foo") + s3 = schema.Sequence("s3", schema="bar") schema_translate_map = {None: "z", "bar": None, "foo": "bat"} self.assert_compile( schema.CreateSequence(s1), "CREATE SEQUENCE z.s1", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( schema.CreateSequence(s2), "CREATE SEQUENCE bat.s2", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( schema.CreateSequence(s3), "CREATE SEQUENCE s3", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" def test_select(self): - self.assert_compile(table4.select(), - "SELECT remote_owner.remotetable.rem_id, " - "remote_owner.remotetable.datatype_id," - " remote_owner.remotetable.value " - "FROM remote_owner.remotetable") + self.assert_compile( + table4.select(), + "SELECT remote_owner.remotetable.rem_id, " + "remote_owner.remotetable.datatype_id," + " remote_owner.remotetable.value " + "FROM remote_owner.remotetable", + ) self.assert_compile( table4.select( - and_( - table4.c.datatype_id == 7, - table4.c.value == 'hi')), + and_(table4.c.datatype_id == 7, table4.c.value == "hi") + ), "SELECT remote_owner.remotetable.rem_id, " "remote_owner.remotetable.datatype_id," " remote_owner.remotetable.value " "FROM remote_owner.remotetable WHERE " "remote_owner.remotetable.datatype_id = :datatype_id_1 AND" - " remote_owner.remotetable.value = :value_1") + " remote_owner.remotetable.value = :value_1", + ) - s = table4.select(and_(table4.c.datatype_id == 7, - table4.c.value == 'hi'), use_labels=True) + s = table4.select( + and_(table4.c.datatype_id == 7, table4.c.value == "hi"), + use_labels=True, + ) self.assert_compile( - s, "SELECT remote_owner.remotetable.rem_id AS" + s, + "SELECT remote_owner.remotetable.rem_id AS" " remote_owner_remotetable_rem_id, " "remote_owner.remotetable.datatype_id AS" " remote_owner_remotetable_datatype_id, " @@ -3243,86 +3580,94 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): "AS remote_owner_remotetable_value FROM " "remote_owner.remotetable WHERE " "remote_owner.remotetable.datatype_id = :datatype_id_1 AND " - "remote_owner.remotetable.value = :value_1") + "remote_owner.remotetable.value = :value_1", + ) # multi-part schema name - self.assert_compile(table5.select(), - 'SELECT "dbo.remote_owner".remotetable.rem_id, ' - '"dbo.remote_owner".remotetable.datatype_id, ' - '"dbo.remote_owner".remotetable.value ' - 'FROM "dbo.remote_owner".remotetable' - ) + self.assert_compile( + table5.select(), + 'SELECT "dbo.remote_owner".remotetable.rem_id, ' + '"dbo.remote_owner".remotetable.datatype_id, ' + '"dbo.remote_owner".remotetable.value ' + 'FROM "dbo.remote_owner".remotetable', + ) # multi-part schema name labels - convert '.' to '_' - self.assert_compile(table5.select(use_labels=True), - 'SELECT "dbo.remote_owner".remotetable.rem_id AS' - ' dbo_remote_owner_remotetable_rem_id, ' - '"dbo.remote_owner".remotetable.datatype_id' - ' AS dbo_remote_owner_remotetable_datatype_id,' - ' "dbo.remote_owner".remotetable.value AS ' - 'dbo_remote_owner_remotetable_value FROM' - ' "dbo.remote_owner".remotetable' - ) + self.assert_compile( + table5.select(use_labels=True), + 'SELECT "dbo.remote_owner".remotetable.rem_id AS' + " dbo_remote_owner_remotetable_rem_id, " + '"dbo.remote_owner".remotetable.datatype_id' + " AS dbo_remote_owner_remotetable_datatype_id," + ' "dbo.remote_owner".remotetable.value AS ' + "dbo_remote_owner_remotetable_value FROM" + ' "dbo.remote_owner".remotetable', + ) def test_schema_translate_select(self): m = MetaData() table1 = Table( - 'mytable', m, Column('myid', Integer), - Column('name', String), - Column('description', String) + "mytable", + m, + Column("myid", Integer), + Column("name", String), + Column("description", String), ) - schema_translate_map = {"remote_owner": "foob", None: 'bar'} + schema_translate_map = {"remote_owner": "foob", None: "bar"} self.assert_compile( - table1.select().where(table1.c.name == 'hi'), + table1.select().where(table1.c.name == "hi"), "SELECT bar.mytable.myid, bar.mytable.name, " "bar.mytable.description FROM bar.mytable " "WHERE bar.mytable.name = :name_1", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( - table4.select().where(table4.c.value == 'hi'), + table4.select().where(table4.c.value == "hi"), "SELECT foob.remotetable.rem_id, foob.remotetable.datatype_id, " "foob.remotetable.value FROM foob.remotetable " "WHERE foob.remotetable.value = :value_1", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) schema_translate_map = {"remote_owner": "foob"} self.assert_compile( - select([ - table1, table4 - ]).select_from( + select([table1, table4]).select_from( join(table1, table4, table1.c.myid == table4.c.rem_id) ), "SELECT mytable.myid, mytable.name, mytable.description, " "foob.remotetable.rem_id, foob.remotetable.datatype_id, " "foob.remotetable.value FROM mytable JOIN foob.remotetable " "ON mytable.myid = foob.remotetable.rem_id", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) def test_schema_translate_aliases(self): - schema_translate_map = {None: 'bar'} + schema_translate_map = {None: "bar"} m = MetaData() table1 = Table( - 'mytable', m, Column('myid', Integer), - Column('name', String), - Column('description', String) + "mytable", + m, + Column("myid", Integer), + Column("name", String), + Column("description", String), ) table2 = Table( - 'myothertable', m, Column('otherid', Integer), - Column('othername', String), + "myothertable", + m, + Column("otherid", Integer), + Column("othername", String), ) alias = table1.alias() - stmt = select([ - table2, alias - ]).select_from(table2.join(alias, table2.c.otherid == alias.c.myid)).\ - where(alias.c.name == 'foo') + stmt = ( + select([table2, alias]) + .select_from(table2.join(alias, table2.c.otherid == alias.c.myid)) + .where(alias.c.name == "foo") + ) self.assert_compile( stmt, @@ -3331,109 +3676,122 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL): "FROM bar.myothertable JOIN bar.mytable AS mytable_1 " "ON bar.myothertable.otherid = mytable_1.myid " "WHERE mytable_1.name = :name_1", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) def test_schema_translate_crud(self): - schema_translate_map = {"remote_owner": "foob", None: 'bar'} + schema_translate_map = {"remote_owner": "foob", None: "bar"} m = MetaData() table1 = Table( - 'mytable', m, - Column('myid', Integer), Column('name', String), - Column('description', String) + "mytable", + m, + Column("myid", Integer), + Column("name", String), + Column("description", String), ) self.assert_compile( - table1.insert().values(description='foo'), + table1.insert().values(description="foo"), "INSERT INTO bar.mytable (description) VALUES (:description)", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( - table1.update().where(table1.c.name == 'hi'). - values(description='foo'), + table1.update() + .where(table1.c.name == "hi") + .values(description="foo"), "UPDATE bar.mytable SET description=:description " "WHERE bar.mytable.name = :name_1", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( - table1.delete().where(table1.c.name == 'hi'), + table1.delete().where(table1.c.name == "hi"), "DELETE FROM bar.mytable WHERE bar.mytable.name = :name_1", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( - table4.insert().values(value='there'), + table4.insert().values(value="there"), "INSERT INTO foob.remotetable (value) VALUES (:value)", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( - table4.update().where(table4.c.value == 'hi'). - values(value='there'), + table4.update() + .where(table4.c.value == "hi") + .values(value="there"), "UPDATE foob.remotetable SET value=:value " "WHERE foob.remotetable.value = :value_1", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) self.assert_compile( - table4.delete().where(table4.c.value == 'hi'), + table4.delete().where(table4.c.value == "hi"), "DELETE FROM foob.remotetable WHERE " "foob.remotetable.value = :value_1", - schema_translate_map=schema_translate_map + schema_translate_map=schema_translate_map, ) def test_alias(self): - a = alias(table4, 'remtable') - self.assert_compile(a.select(a.c.datatype_id == 7), - "SELECT remtable.rem_id, remtable.datatype_id, " - "remtable.value FROM" - " remote_owner.remotetable AS remtable " - "WHERE remtable.datatype_id = :datatype_id_1") + a = alias(table4, "remtable") + self.assert_compile( + a.select(a.c.datatype_id == 7), + "SELECT remtable.rem_id, remtable.datatype_id, " + "remtable.value FROM" + " remote_owner.remotetable AS remtable " + "WHERE remtable.datatype_id = :datatype_id_1", + ) def test_update(self): self.assert_compile( - table4.update(table4.c.value == 'test', - values={table4.c.datatype_id: 12}), + table4.update( + table4.c.value == "test", values={table4.c.datatype_id: 12} + ), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id " - "WHERE remote_owner.remotetable.value = :value_1") + "WHERE remote_owner.remotetable.value = :value_1", + ) def test_insert(self): - self.assert_compile(table4.insert(values=(2, 5, 'test')), - "INSERT INTO remote_owner.remotetable " - "(rem_id, datatype_id, value) VALUES " - "(:rem_id, :datatype_id, :value)") + self.assert_compile( + table4.insert(values=(2, 5, "test")), + "INSERT INTO remote_owner.remotetable " + "(rem_id, datatype_id, value) VALUES " + "(:rem_id, :datatype_id, :value)", + ) class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" def test_dont_overcorrelate(self): - self.assert_compile(select([table1], from_obj=[table1, - table1.select()]), - "SELECT mytable.myid, mytable.name, " - "mytable.description FROM mytable, (SELECT " - "mytable.myid AS myid, mytable.name AS " - "name, mytable.description AS description " - "FROM mytable)") + self.assert_compile( + select([table1], from_obj=[table1, table1.select()]), + "SELECT mytable.myid, mytable.name, " + "mytable.description FROM mytable, (SELECT " + "mytable.myid AS myid, mytable.name AS " + "name, mytable.description AS description " + "FROM mytable)", + ) def _fixture(self): - t1 = table('t1', column('a')) - t2 = table('t2', column('a')) + t1 = table("t1", column("a")) + t2 = table("t2", column("a")) return t1, t2, select([t1]).where(t1.c.a == t2.c.a) def _assert_where_correlated(self, stmt): self.assert_compile( stmt, "SELECT t2.a FROM t2 WHERE t2.a = " - "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)") + "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)", + ) def _assert_where_all_correlated(self, stmt): self.assert_compile( stmt, "SELECT t1.a, t2.a FROM t1, t2 WHERE t2.a = " - "(SELECT t1.a WHERE t1.a = t2.a)") + "(SELECT t1.a WHERE t1.a = t2.a)", + ) # note there's no more "backwards" correlation after # we've done #2746 @@ -3452,171 +3810,197 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( stmt, "SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) " - "AS anon_1 FROM t2") + "AS anon_1 FROM t2", + ) def _assert_column_all_correlated(self, stmt): self.assert_compile( stmt, "SELECT t1.a, t2.a, " - "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2") + "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2", + ) def _assert_having_correlated(self, stmt): - self.assert_compile(stmt, - "SELECT t2.a FROM t2 HAVING t2.a = " - "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)") + self.assert_compile( + stmt, + "SELECT t2.a FROM t2 HAVING t2.a = " + "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)", + ) def _assert_from_uncorrelated(self, stmt): self.assert_compile( stmt, "SELECT t2.a, anon_1.a FROM t2, " - "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1") + "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1", + ) def _assert_from_all_uncorrelated(self, stmt): self.assert_compile( stmt, "SELECT t1.a, t2.a, anon_1.a FROM t1, t2, " - "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1") + "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1", + ) def _assert_where_uncorrelated(self, stmt): - self.assert_compile(stmt, - "SELECT t2.a FROM t2 WHERE t2.a = " - "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)") + self.assert_compile( + stmt, + "SELECT t2.a FROM t2 WHERE t2.a = " + "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)", + ) def _assert_column_uncorrelated(self, stmt): - self.assert_compile(stmt, - "SELECT t2.a, (SELECT t1.a FROM t1, t2 " - "WHERE t1.a = t2.a) AS anon_1 FROM t2") + self.assert_compile( + stmt, + "SELECT t2.a, (SELECT t1.a FROM t1, t2 " + "WHERE t1.a = t2.a) AS anon_1 FROM t2", + ) def _assert_having_uncorrelated(self, stmt): - self.assert_compile(stmt, - "SELECT t2.a FROM t2 HAVING t2.a = " - "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)") + self.assert_compile( + stmt, + "SELECT t2.a FROM t2 HAVING t2.a = " + "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)", + ) def _assert_where_single_full_correlated(self, stmt): - self.assert_compile(stmt, - "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)") + self.assert_compile( + stmt, "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)" + ) def test_correlate_semiauto_where(self): t1, t2, s1 = self._fixture() self._assert_where_correlated( - select([t2]).where(t2.c.a == s1.correlate(t2))) + select([t2]).where(t2.c.a == s1.correlate(t2)) + ) def test_correlate_semiauto_column(self): t1, t2, s1 = self._fixture() self._assert_column_correlated( - select([t2, s1.correlate(t2).as_scalar()])) + select([t2, s1.correlate(t2).as_scalar()]) + ) def test_correlate_semiauto_from(self): t1, t2, s1 = self._fixture() - self._assert_from_uncorrelated( - select([t2, s1.correlate(t2).alias()])) + self._assert_from_uncorrelated(select([t2, s1.correlate(t2).alias()])) def test_correlate_semiauto_having(self): t1, t2, s1 = self._fixture() self._assert_having_correlated( - select([t2]).having(t2.c.a == s1.correlate(t2))) + select([t2]).having(t2.c.a == s1.correlate(t2)) + ) def test_correlate_except_inclusion_where(self): t1, t2, s1 = self._fixture() self._assert_where_correlated( - select([t2]).where(t2.c.a == s1.correlate_except(t1))) + select([t2]).where(t2.c.a == s1.correlate_except(t1)) + ) def test_correlate_except_exclusion_where(self): t1, t2, s1 = self._fixture() self._assert_where_uncorrelated( - select([t2]).where(t2.c.a == s1.correlate_except(t2))) + select([t2]).where(t2.c.a == s1.correlate_except(t2)) + ) def test_correlate_except_inclusion_column(self): t1, t2, s1 = self._fixture() self._assert_column_correlated( - select([t2, s1.correlate_except(t1).as_scalar()])) + select([t2, s1.correlate_except(t1).as_scalar()]) + ) def test_correlate_except_exclusion_column(self): t1, t2, s1 = self._fixture() self._assert_column_uncorrelated( - select([t2, s1.correlate_except(t2).as_scalar()])) + select([t2, s1.correlate_except(t2).as_scalar()]) + ) def test_correlate_except_inclusion_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( - select([t2, s1.correlate_except(t1).alias()])) + select([t2, s1.correlate_except(t1).alias()]) + ) def test_correlate_except_exclusion_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( - select([t2, s1.correlate_except(t2).alias()])) + select([t2, s1.correlate_except(t2).alias()]) + ) def test_correlate_except_none(self): t1, t2, s1 = self._fixture() self._assert_where_all_correlated( - select([t1, t2]).where(t2.c.a == s1.correlate_except(None))) + select([t1, t2]).where(t2.c.a == s1.correlate_except(None)) + ) def test_correlate_except_having(self): t1, t2, s1 = self._fixture() self._assert_having_correlated( - select([t2]).having(t2.c.a == s1.correlate_except(t1))) + select([t2]).having(t2.c.a == s1.correlate_except(t1)) + ) def test_correlate_auto_where(self): t1, t2, s1 = self._fixture() - self._assert_where_correlated( - select([t2]).where(t2.c.a == s1)) + self._assert_where_correlated(select([t2]).where(t2.c.a == s1)) def test_correlate_auto_column(self): t1, t2, s1 = self._fixture() - self._assert_column_correlated( - select([t2, s1.as_scalar()])) + self._assert_column_correlated(select([t2, s1.as_scalar()])) def test_correlate_auto_from(self): t1, t2, s1 = self._fixture() - self._assert_from_uncorrelated( - select([t2, s1.alias()])) + self._assert_from_uncorrelated(select([t2, s1.alias()])) def test_correlate_auto_having(self): t1, t2, s1 = self._fixture() - self._assert_having_correlated( - select([t2]).having(t2.c.a == s1)) + self._assert_having_correlated(select([t2]).having(t2.c.a == s1)) def test_correlate_disabled_where(self): t1, t2, s1 = self._fixture() self._assert_where_uncorrelated( - select([t2]).where(t2.c.a == s1.correlate(None))) + select([t2]).where(t2.c.a == s1.correlate(None)) + ) def test_correlate_disabled_column(self): t1, t2, s1 = self._fixture() self._assert_column_uncorrelated( - select([t2, s1.correlate(None).as_scalar()])) + select([t2, s1.correlate(None).as_scalar()]) + ) def test_correlate_disabled_from(self): t1, t2, s1 = self._fixture() self._assert_from_uncorrelated( - select([t2, s1.correlate(None).alias()])) + select([t2, s1.correlate(None).alias()]) + ) def test_correlate_disabled_having(self): t1, t2, s1 = self._fixture() self._assert_having_uncorrelated( - select([t2]).having(t2.c.a == s1.correlate(None))) + select([t2]).having(t2.c.a == s1.correlate(None)) + ) def test_correlate_all_where(self): t1, t2, s1 = self._fixture() self._assert_where_all_correlated( - select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2))) + select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2)) + ) def test_correlate_all_column(self): t1, t2, s1 = self._fixture() self._assert_column_all_correlated( - select([t1, t2, s1.correlate(t1, t2).as_scalar()])) + select([t1, t2, s1.correlate(t1, t2).as_scalar()]) + ) def test_correlate_all_from(self): t1, t2, s1 = self._fixture() self._assert_from_all_uncorrelated( - select([t1, t2, s1.correlate(t1, t2).alias()])) + select([t1, t2, s1.correlate(t1, t2).alias()]) + ) def test_correlate_where_all_unintentional(self): t1, t2, s1 = self._fixture() assert_raises_message( exc.InvalidRequestError, "returned no FROM clauses due to auto-correlation", - select([t1, t2]).where(t2.c.a == s1).compile + select([t1, t2]).where(t2.c.a == s1).compile, ) def test_correlate_from_all_ok(self): @@ -3624,16 +4008,16 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( select([t1, t2, s1]), "SELECT t1.a, t2.a, a FROM t1, t2, " - "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a)" + "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a)", ) def test_correlate_auto_where_singlefrom(self): t1, t2, s1 = self._fixture() s = select([t1.c.a]) s2 = select([t1]).where(t1.c.a == s) - self.assert_compile(s2, - "SELECT t1.a FROM t1 WHERE t1.a = " - "(SELECT t1.a FROM t1)") + self.assert_compile( + s2, "SELECT t1.a FROM t1 WHERE t1.a = " "(SELECT t1.a FROM t1)" + ) def test_correlate_semiauto_where_singlefrom(self): t1, t2, s1 = self._fixture() @@ -3654,89 +4038,103 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL): def test_correlate_alone_noeffect(self): # new as of #2668 t1, t2, s1 = self._fixture() - self.assert_compile(s1.correlate(t1, t2), - "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a") + self.assert_compile( + s1.correlate(t1, t2), "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a" + ) def test_correlate_except_froms(self): # new as of #2748 - t1 = table('t1', column('a')) - t2 = table('t2', column('a'), column('b')) + t1 = table("t1", column("a")) + t2 = table("t2", column("a"), column("b")) s = select([t2.c.b]).where(t1.c.a == t2.c.a) - s = s.correlate_except(t2).alias('s') + s = s.correlate_except(t2).alias("s") s2 = select([func.foo(s.c.b)]).as_scalar() s3 = select([t1], order_by=s2) self.assert_compile( - s3, "SELECT t1.a FROM t1 ORDER BY " + s3, + "SELECT t1.a FROM t1 ORDER BY " "(SELECT foo(s.b) AS foo_1 FROM " - "(SELECT t2.b AS b FROM t2 WHERE t1.a = t2.a) AS s)") + "(SELECT t2.b AS b FROM t2 WHERE t1.a = t2.a) AS s)", + ) def test_multilevel_froms_correlation(self): # new as of #2748 - p = table('parent', column('id')) - c = table('child', column('id'), column('parent_id'), column('pos')) + p = table("parent", column("id")) + c = table("child", column("id"), column("parent_id"), column("pos")) - s = c.select().where( - c.c.parent_id == p.c.id).order_by( - c.c.pos).limit(1) + s = ( + c.select() + .where(c.c.parent_id == p.c.id) + .order_by(c.c.pos) + .limit(1) + ) s = s.correlate(p) s = exists().select_from(s).where(s.c.id == 1) s = select([p]).where(s) self.assert_compile( - s, "SELECT parent.id FROM parent WHERE EXISTS (SELECT * " + s, + "SELECT parent.id FROM parent WHERE EXISTS (SELECT * " "FROM (SELECT child.id AS id, child.parent_id AS parent_id, " "child.pos AS pos FROM child WHERE child.parent_id = parent.id " - "ORDER BY child.pos LIMIT :param_1) WHERE id = :id_1)") + "ORDER BY child.pos LIMIT :param_1) WHERE id = :id_1)", + ) def test_no_contextless_correlate_except(self): # new as of #2748 - t1 = table('t1', column('x')) - t2 = table('t2', column('y')) - t3 = table('t3', column('z')) + t1 = table("t1", column("x")) + t2 = table("t2", column("y")) + t3 = table("t3", column("z")) - s = select([t1]).where(t1.c.x == t2.c.y).\ - where(t2.c.y == t3.c.z).correlate_except(t1) + s = ( + select([t1]) + .where(t1.c.x == t2.c.y) + .where(t2.c.y == t3.c.z) + .correlate_except(t1) + ) self.assert_compile( - s, - "SELECT t1.x FROM t1, t2, t3 WHERE t1.x = t2.y AND t2.y = t3.z") + s, "SELECT t1.x FROM t1, t2, t3 WHERE t1.x = t2.y AND t2.y = t3.z" + ) def test_multilevel_implicit_correlation_disabled(self): # test that implicit correlation with multilevel WHERE correlation # behaves like 0.8.1, 0.7 (i.e. doesn't happen) - t1 = table('t1', column('x')) - t2 = table('t2', column('y')) - t3 = table('t3', column('z')) + t1 = table("t1", column("x")) + t2 = table("t2", column("y")) + t3 = table("t3", column("z")) s = select([t1.c.x]).where(t1.c.x == t2.c.y) s2 = select([t3.c.z]).where(t3.c.z == s.as_scalar()) s3 = select([t1]).where(t1.c.x == s2.as_scalar()) - self.assert_compile(s3, - "SELECT t1.x FROM t1 " - "WHERE t1.x = (SELECT t3.z " - "FROM t3 " - "WHERE t3.z = (SELECT t1.x " - "FROM t1, t2 " - "WHERE t1.x = t2.y))" - ) + self.assert_compile( + s3, + "SELECT t1.x FROM t1 " + "WHERE t1.x = (SELECT t3.z " + "FROM t3 " + "WHERE t3.z = (SELECT t1.x " + "FROM t1, t2 " + "WHERE t1.x = t2.y))", + ) def test_from_implicit_correlation_disabled(self): # test that implicit correlation with immediate and # multilevel FROM clauses behaves like 0.8.1 (i.e. doesn't happen) - t1 = table('t1', column('x')) - t2 = table('t2', column('y')) + t1 = table("t1", column("x")) + t2 = table("t2", column("y")) s = select([t1.c.x]).where(t1.c.x == t2.c.y) s2 = select([t2, s]) s3 = select([t1, s2]) - self.assert_compile(s3, - "SELECT t1.x, y, x FROM t1, " - "(SELECT t2.y AS y, x FROM t2, " - "(SELECT t1.x AS x FROM t1, t2 WHERE t1.x = t2.y))" - ) + self.assert_compile( + s3, + "SELECT t1.x, y, x FROM t1, " + "(SELECT t2.y AS y, x FROM t2, " + "(SELECT t1.x AS x FROM t1, t2 WHERE t1.x = t2.y))", + ) class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): @@ -3744,28 +4142,27 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): def _fixture(self): m = MetaData() - return Table('foo', m, - Column('id', Integer)) + return Table("foo", m, Column("id", Integer)) - bool_table = table('t', column('x', Boolean)) + bool_table = table("t", column("x", Boolean)) def test_coerce_bool_where(self): self.assert_compile( select([self.bool_table]).where(self.bool_table.c.x), - "SELECT t.x FROM t WHERE t.x" + "SELECT t.x FROM t WHERE t.x", ) def test_coerce_bool_where_non_native(self): self.assert_compile( select([self.bool_table]).where(self.bool_table.c.x), "SELECT t.x FROM t WHERE t.x = 1", - dialect=default.DefaultDialect(supports_native_boolean=False) + dialect=default.DefaultDialect(supports_native_boolean=False), ) self.assert_compile( select([self.bool_table]).where(~self.bool_table.c.x), "SELECT t.x FROM t WHERE t.x = 0", - dialect=default.DefaultDialect(supports_native_boolean=False) + dialect=default.DefaultDialect(supports_native_boolean=False), ) def test_null_constant(self): @@ -3779,40 +4176,35 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL): def test_val_and_false(self): t = self._fixture() - self.assert_compile(and_(t.c.id == 1, False), - "false") + self.assert_compile(and_(t.c.id == 1, False), "false") def test_val_and_true_coerced(self): t = self._fixture() - self.assert_compile(and_(t.c.id == 1, True), - "foo.id = :id_1") + self.assert_compile(and_(t.c.id == 1, True), "foo.id = :id_1") def test_val_is_null_coerced(self): t = self._fixture() - self.assert_compile(and_(t.c.id == None), # noqa - "foo.id IS NULL") + self.assert_compile(and_(t.c.id == None), "foo.id IS NULL") # noqa def test_val_and_None(self): t = self._fixture() - self.assert_compile(and_(t.c.id == 1, None), - "foo.id = :id_1 AND NULL") + self.assert_compile(and_(t.c.id == 1, None), "foo.id = :id_1 AND NULL") def test_None_and_val(self): t = self._fixture() - self.assert_compile(and_(None, t.c.id == 1), - "NULL AND foo.id = :id_1") + self.assert_compile(and_(None, t.c.id == 1), "NULL AND foo.id = :id_1") def test_None_and_nothing(self): # current convention is None in and_() # returns None May want # to revise this at some point. - self.assert_compile( - and_(None), "NULL") + self.assert_compile(and_(None), "NULL") def test_val_and_null(self): t = self._fixture() - self.assert_compile(and_(t.c.id == 1, null()), - "foo.id = :id_1 AND NULL") + self.assert_compile( + and_(t.c.id == 1, null()), "foo.id = :id_1 AND NULL" + ) class ResultMapTest(fixtures.TestBase): @@ -3823,101 +4215,109 @@ class ResultMapTest(fixtures.TestBase): """ def test_compound_populates(self): - t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer)) + t = Table("t", MetaData(), Column("a", Integer), Column("b", Integer)) stmt = select([t]).union(select([t])) comp = stmt.compile() eq_( comp._create_result_map(), - {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type), - 'b': ('b', (t.c.b, 'b', 'b'), t.c.b.type)} + { + "a": ("a", (t.c.a, "a", "a"), t.c.a.type), + "b": ("b", (t.c.b, "b", "b"), t.c.b.type), + }, ) def test_compound_not_toplevel_doesnt_populate(self): - t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer)) + t = Table("t", MetaData(), Column("a", Integer), Column("b", Integer)) subq = select([t]).union(select([t])) stmt = select([t.c.a]).select_from(t.join(subq, t.c.a == subq.c.a)) comp = stmt.compile() eq_( comp._create_result_map(), - {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)} + {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, ) def test_compound_only_top_populates(self): - t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer)) + t = Table("t", MetaData(), Column("a", Integer), Column("b", Integer)) stmt = select([t.c.a]).union(select([t.c.b])) comp = stmt.compile() eq_( comp._create_result_map(), - {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)}, + {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)}, ) def test_label_plus_element(self): - t = Table('t', MetaData(), Column('a', Integer)) - l1 = t.c.a.label('bar') + t = Table("t", MetaData(), Column("a", Integer)) + l1 = t.c.a.label("bar") tc = type_coerce(t.c.a, String) stmt = select([t.c.a, l1, tc]) comp = stmt.compile() - tc_anon_label = comp._create_result_map()['anon_1'][1][0] + tc_anon_label = comp._create_result_map()["anon_1"][1][0] eq_( comp._create_result_map(), { - 'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type), - 'bar': ('bar', (l1, 'bar'), l1.type), - 'anon_1': ( - '%%(%d anon)s' % id(tc), - (tc_anon_label, 'anon_1', tc), tc.type), + "a": ("a", (t.c.a, "a", "a"), t.c.a.type), + "bar": ("bar", (l1, "bar"), l1.type), + "anon_1": ( + "%%(%d anon)s" % id(tc), + (tc_anon_label, "anon_1", tc), + tc.type, + ), }, ) def test_label_conflict_union(self): - t1 = Table('t1', MetaData(), Column('a', Integer), - Column('b', Integer)) - t2 = Table('t2', MetaData(), Column('t1_a', Integer)) + t1 = Table( + "t1", MetaData(), Column("a", Integer), Column("b", Integer) + ) + t2 = Table("t2", MetaData(), Column("t1_a", Integer)) union = select([t2]).union(select([t2])).alias() t1_alias = t1.alias() - stmt = select([t1, t1_alias]).select_from( - t1.join(union, t1.c.a == union.c.t1_a)).apply_labels() + stmt = ( + select([t1, t1_alias]) + .select_from(t1.join(union, t1.c.a == union.c.t1_a)) + .apply_labels() + ) comp = stmt.compile() eq_( set(comp._create_result_map()), - set(['t1_1_b', 't1_1_a', 't1_a', 't1_b']) - ) - is_( - comp._create_result_map()['t1_a'][1][2], t1.c.a + set(["t1_1_b", "t1_1_a", "t1_a", "t1_b"]), ) + is_(comp._create_result_map()["t1_a"][1][2], t1.c.a) def test_insert_with_select_values(self): - astring = Column('a', String) - aint = Column('a', Integer) + astring = Column("a", String) + aint = Column("a", Integer) m = MetaData() - Table('t1', m, astring) - t2 = Table('t2', m, aint) + Table("t1", m, astring) + t2 = Table("t2", m, aint) stmt = t2.insert().values(a=select([astring])).returning(aint) comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), - {'a': ('a', (aint, 'a', 'a'), aint.type)} + {"a": ("a", (aint, "a", "a"), aint.type)}, ) def test_insert_from_select(self): - astring = Column('a', String) - aint = Column('a', Integer) + astring = Column("a", String) + aint = Column("a", Integer) m = MetaData() - Table('t1', m, astring) - t2 = Table('t2', m, aint) + Table("t1", m, astring) + t2 = Table("t2", m, aint) - stmt = t2.insert().from_select(['a'], select([astring])).\ - returning(aint) + stmt = ( + t2.insert().from_select(["a"], select([astring])).returning(aint) + ) comp = stmt.compile(dialect=postgresql.dialect()) eq_( comp._create_result_map(), - {'a': ('a', (aint, 'a', 'a'), aint.type)} + {"a": ("a", (aint, "a", "a"), aint.type)}, ) def test_nested_api(self): from sqlalchemy.engine.result import ResultMetaData + stmt2 = select([table2]) stmt1 = select([table1]).select_from(stmt2) @@ -3936,7 +4336,8 @@ class ResultMapTest(fixtures.TestBase): self._add_to_result_map("k1", "k1", (1, 2, 3), int_) else: text = super(MyCompiler, self).visit_select( - stmt, *arg, **kw) + stmt, *arg, **kw + ) self._add_to_result_map("k2", "k2", (3, 4, 5), int_) return text @@ -3945,62 +4346,68 @@ class ResultMapTest(fixtures.TestBase): eq_( ResultMetaData._create_result_map(contexts[stmt2][0]), { - 'otherid': ( - 'otherid', - (table2.c.otherid, 'otherid', 'otherid'), - table2.c.otherid.type), - 'othername': ( - 'othername', - (table2.c.othername, 'othername', 'othername'), - table2.c.othername.type), - 'k1': ('k1', (1, 2, 3), int_) - } + "otherid": ( + "otherid", + (table2.c.otherid, "otherid", "otherid"), + table2.c.otherid.type, + ), + "othername": ( + "othername", + (table2.c.othername, "othername", "othername"), + table2.c.othername.type, + ), + "k1": ("k1", (1, 2, 3), int_), + }, ) eq_( comp._create_result_map(), { - 'myid': ( - 'myid', - (table1.c.myid, 'myid', 'myid'), table1.c.myid.type + "myid": ( + "myid", + (table1.c.myid, "myid", "myid"), + table1.c.myid.type, + ), + "k2": ("k2", (3, 4, 5), int_), + "name": ( + "name", + (table1.c.name, "name", "name"), + table1.c.name.type, ), - 'k2': ('k2', (3, 4, 5), int_), - 'name': ( - 'name', (table1.c.name, 'name', 'name'), - table1.c.name.type), - 'description': ( - 'description', - (table1.c.description, 'description', 'description'), - table1.c.description.type)} + "description": ( + "description", + (table1.c.description, "description", "description"), + table1.c.description.type, + ), + }, ) def test_select_wraps_for_translate_ambiguity(self): # test for issue #3657 - t = table('a', column('x'), column('y'), column('z')) + t = table("a", column("x"), column("y"), column("z")) - l1, l2, l3 = t.c.z.label('a'), t.c.x.label('b'), t.c.x.label('c') + l1, l2, l3 = t.c.z.label("a"), t.c.x.label("b"), t.c.x.label("c") orig = [t.c.x, t.c.y, l1, l2, l3] stmt = select(orig) wrapped = stmt._generate() wrapped = wrapped.column( - func.ROW_NUMBER().over(order_by=t.c.z)).alias() + func.ROW_NUMBER().over(order_by=t.c.z) + ).alias() wrapped_again = select([c for c in wrapped.c]) compiled = wrapped_again.compile( - compile_kwargs={'select_wraps_for': stmt}) + compile_kwargs={"select_wraps_for": stmt} + ) proxied = [obj[0] for (k, n, obj, type_) in compiled._result_columns] - for orig_obj, proxied_obj in zip( - orig, - proxied - ): + for orig_obj, proxied_obj in zip(orig, proxied): is_(orig_obj, proxied_obj) def test_select_wraps_for_translate_ambiguity_dupe_cols(self): # test for issue #3657 - t = table('a', column('x'), column('y'), column('z')) + t = table("a", column("x"), column("y"), column("z")) - l1, l2, l3 = t.c.z.label('a'), t.c.x.label('b'), t.c.x.label('c') + l1, l2, l3 = t.c.z.label("a"), t.c.x.label("b"), t.c.x.label("c") orig = [t.c.x, t.c.y, l1, l2, l3] # create the statement with some duplicate columns. right now @@ -4018,7 +4425,8 @@ class ResultMapTest(fixtures.TestBase): wrapped = stmt._generate() wrapped = wrapped.column( - func.ROW_NUMBER().over(order_by=t.c.z)).alias() + func.ROW_NUMBER().over(order_by=t.c.z) + ).alias() # so when we wrap here we're going to have only 5 columns wrapped_again = select([c for c in wrapped.c]) @@ -4027,11 +4435,9 @@ class ResultMapTest(fixtures.TestBase): # "select_wraps_for" can't use inner_columns to match because # these collections are not the same compiled = wrapped_again.compile( - compile_kwargs={'select_wraps_for': stmt}) + compile_kwargs={"select_wraps_for": stmt} + ) proxied = [obj[0] for (k, n, obj, type_) in compiled._result_columns] - for orig_obj, proxied_obj in zip( - orig, - proxied - ): + for orig_obj, proxied_obj in zip(orig, proxied): is_(orig_obj, proxied_obj) |
