summaryrefslogtreecommitdiff
path: root/test/sql/test_compiler.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/test_compiler.py')
-rw-r--r--test/sql/test_compiler.py3542
1 files changed, 1974 insertions, 1568 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index f543b8677..f3305743a 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -10,147 +10,201 @@ styling and coherent test organization.
"""
-from sqlalchemy.testing import eq_, is_, assert_raises, \
- assert_raises_message, eq_ignore_whitespace
+from sqlalchemy.testing import (
+ eq_,
+ is_,
+ assert_raises,
+ assert_raises_message,
+ eq_ignore_whitespace,
+)
from sqlalchemy import testing
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
-from sqlalchemy import Integer, String, MetaData, Table, Column, select, \
- func, not_, cast, text, tuple_, exists, update, bindparam,\
- literal, and_, null, type_coerce, alias, or_, literal_column,\
- Float, TIMESTAMP, Numeric, Date, Text, union, except_,\
- intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\
- over, subquery, case, true, CheckConstraint, Sequence
+from sqlalchemy import (
+ Integer,
+ String,
+ MetaData,
+ Table,
+ Column,
+ select,
+ func,
+ not_,
+ cast,
+ text,
+ tuple_,
+ exists,
+ update,
+ bindparam,
+ literal,
+ and_,
+ null,
+ type_coerce,
+ alias,
+ or_,
+ literal_column,
+ Float,
+ TIMESTAMP,
+ Numeric,
+ Date,
+ Text,
+ union,
+ except_,
+ intersect,
+ union_all,
+ Boolean,
+ distinct,
+ join,
+ outerjoin,
+ asc,
+ desc,
+ over,
+ subquery,
+ case,
+ true,
+ CheckConstraint,
+ Sequence,
+)
import decimal
from sqlalchemy.util import u
from sqlalchemy import exc, sql, util, types, schema
from sqlalchemy.sql import table, column, label
from sqlalchemy.sql.expression import ClauseList, _literal_as_text, HasPrefixes
from sqlalchemy.engine import default
-from sqlalchemy.dialects import mysql, mssql, postgresql, oracle, \
- sqlite, sybase
+from sqlalchemy.dialects import (
+ mysql,
+ mssql,
+ postgresql,
+ oracle,
+ sqlite,
+ sybase,
+)
from sqlalchemy.dialects.postgresql.base import PGCompiler, PGDialect
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql import compiler
-table1 = table('mytable',
- column('myid', Integer),
- column('name', String),
- column('description', String),
- )
+table1 = table(
+ "mytable",
+ column("myid", Integer),
+ column("name", String),
+ column("description", String),
+)
table2 = table(
- 'myothertable',
- column('otherid', Integer),
- column('othername', String),
+ "myothertable", column("otherid", Integer), column("othername", String)
)
table3 = table(
- 'thirdtable',
- column('userid', Integer),
- column('otherstuff', String),
+ "thirdtable", column("userid", Integer), column("otherstuff", String)
)
metadata = MetaData()
# table with a schema
table4 = Table(
- 'remotetable', metadata,
- Column('rem_id', Integer, primary_key=True),
- Column('datatype_id', Integer),
- Column('value', String(20)),
- schema='remote_owner'
+ "remotetable",
+ metadata,
+ Column("rem_id", Integer, primary_key=True),
+ Column("datatype_id", Integer),
+ Column("value", String(20)),
+ schema="remote_owner",
)
# table with a 'multipart' schema
table5 = Table(
- 'remotetable', metadata,
- Column('rem_id', Integer, primary_key=True),
- Column('datatype_id', Integer),
- Column('value', String(20)),
- schema='dbo.remote_owner'
+ "remotetable",
+ metadata,
+ Column("rem_id", Integer, primary_key=True),
+ Column("datatype_id", Integer),
+ Column("value", String(20)),
+ schema="dbo.remote_owner",
)
-users = table('users',
- column('user_id'),
- column('user_name'),
- column('password'),
- )
+users = table(
+ "users", column("user_id"), column("user_name"), column("password")
+)
-addresses = table('addresses',
- column('address_id'),
- column('user_id'),
- column('street'),
- column('city'),
- column('state'),
- column('zip')
- )
+addresses = table(
+ "addresses",
+ column("address_id"),
+ column("user_id"),
+ column("street"),
+ column("city"),
+ column("state"),
+ column("zip"),
+)
-keyed = Table('keyed', metadata,
- Column('x', Integer, key='colx'),
- Column('y', Integer, key='coly'),
- Column('z', Integer),
- )
+keyed = Table(
+ "keyed",
+ metadata,
+ Column("x", Integer, key="colx"),
+ Column("y", Integer, key="coly"),
+ Column("z", Integer),
+)
class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = 'default'
+ __dialect__ = "default"
def test_attribute_sanity(self):
- assert hasattr(table1, 'c')
- assert hasattr(table1.select(), 'c')
- assert not hasattr(table1.c.myid.self_group(), 'columns')
- assert hasattr(table1.select().self_group(), 'columns')
- assert not hasattr(table1.c.myid, 'columns')
- assert not hasattr(table1.c.myid, 'c')
- assert not hasattr(table1.select().c.myid, 'c')
- assert not hasattr(table1.select().c.myid, 'columns')
- assert not hasattr(table1.alias().c.myid, 'columns')
- assert not hasattr(table1.alias().c.myid, 'c')
+ assert hasattr(table1, "c")
+ assert hasattr(table1.select(), "c")
+ assert not hasattr(table1.c.myid.self_group(), "columns")
+ assert hasattr(table1.select().self_group(), "columns")
+ assert not hasattr(table1.c.myid, "columns")
+ assert not hasattr(table1.c.myid, "c")
+ assert not hasattr(table1.select().c.myid, "c")
+ assert not hasattr(table1.select().c.myid, "columns")
+ assert not hasattr(table1.alias().c.myid, "columns")
+ assert not hasattr(table1.alias().c.myid, "c")
if util.compat.py32:
assert_raises_message(
exc.InvalidRequestError,
- 'Scalar Select expression has no '
- 'columns; use this object directly within a '
- 'column-level expression.',
+ "Scalar Select expression has no "
+ "columns; use this object directly within a "
+ "column-level expression.",
lambda: hasattr(
- select([table1.c.myid]).as_scalar().self_group(),
- 'columns'))
+ select([table1.c.myid]).as_scalar().self_group(), "columns"
+ ),
+ )
assert_raises_message(
exc.InvalidRequestError,
- 'Scalar Select expression has no '
- 'columns; use this object directly within a '
- 'column-level expression.',
- lambda: hasattr(select([table1.c.myid]).as_scalar(),
- 'columns'))
+ "Scalar Select expression has no "
+ "columns; use this object directly within a "
+ "column-level expression.",
+ lambda: hasattr(
+ select([table1.c.myid]).as_scalar(), "columns"
+ ),
+ )
else:
assert not hasattr(
- select([table1.c.myid]).as_scalar().self_group(),
- 'columns')
- assert not hasattr(select([table1.c.myid]).as_scalar(), 'columns')
+ select([table1.c.myid]).as_scalar().self_group(), "columns"
+ )
+ assert not hasattr(select([table1.c.myid]).as_scalar(), "columns")
def test_prefix_constructor(self):
class Pref(HasPrefixes):
-
def _generate(self):
return self
- assert_raises(exc.ArgumentError,
- Pref().prefix_with,
- "some prefix", not_a_dialect=True
- )
+
+ assert_raises(
+ exc.ArgumentError,
+ Pref().prefix_with,
+ "some prefix",
+ not_a_dialect=True,
+ )
def test_table_select(self):
- self.assert_compile(table1.select(),
- "SELECT mytable.myid, mytable.name, "
- "mytable.description FROM mytable")
+ self.assert_compile(
+ table1.select(),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable",
+ )
self.assert_compile(
- select(
- [
- table1,
- table2]),
+ select([table1, table2]),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername FROM mytable, "
- "myothertable")
+ "myothertable",
+ )
def test_invalid_col_argument(self):
assert_raises(exc.ArgumentError, select, table1)
@@ -178,13 +232,12 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
exp1 = literal_column("Q")
exp2 = literal_column("Y")
self.assert_compile(
- select([1]).limit(exp1).offset(exp2),
- "SELECT 1 LIMIT Q OFFSET Y"
+ select([1]).limit(exp1).offset(exp2), "SELECT 1 LIMIT Q OFFSET Y"
)
self.assert_compile(
- select([1]).limit(bindparam('x')).offset(bindparam('y')),
- "SELECT 1 LIMIT :x OFFSET :y"
+ select([1]).limit(bindparam("x")).offset(bindparam("y")),
+ "SELECT 1 LIMIT :x OFFSET :y",
)
def test_limit_offset_no_int_coercion_two(self):
@@ -196,14 +249,18 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
exc.CompileError,
"This SELECT structure does not use a simple integer "
"value for limit",
- getattr, sel, "_limit"
+ getattr,
+ sel,
+ "_limit",
)
assert_raises_message(
exc.CompileError,
"This SELECT structure does not use a simple integer "
"value for offset",
- getattr, sel, "_offset"
+ getattr,
+ sel,
+ "_offset",
)
def test_limit_offset_no_int_coercion_three(self):
@@ -215,37 +272,47 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
exc.CompileError,
"This SELECT structure does not use a simple integer "
"value for limit",
- getattr, sel, "_limit"
+ getattr,
+ sel,
+ "_limit",
)
assert_raises_message(
exc.CompileError,
"This SELECT structure does not use a simple integer "
"value for offset",
- getattr, sel, "_offset"
+ getattr,
+ sel,
+ "_offset",
)
def test_limit_offset(self):
for lim, offset, exp, params in [
- (5, 10, "LIMIT :param_1 OFFSET :param_2",
- {'param_1': 5, 'param_2': 10}),
- (None, 10, "LIMIT -1 OFFSET :param_1", {'param_1': 10}),
- (5, None, "LIMIT :param_1", {'param_1': 5}),
- (0, 0, "LIMIT :param_1 OFFSET :param_2",
- {'param_1': 0, 'param_2': 0}),
+ (
+ 5,
+ 10,
+ "LIMIT :param_1 OFFSET :param_2",
+ {"param_1": 5, "param_2": 10},
+ ),
+ (None, 10, "LIMIT -1 OFFSET :param_1", {"param_1": 10}),
+ (5, None, "LIMIT :param_1", {"param_1": 5}),
+ (
+ 0,
+ 0,
+ "LIMIT :param_1 OFFSET :param_2",
+ {"param_1": 0, "param_2": 0},
+ ),
]:
self.assert_compile(
select([1]).limit(lim).offset(offset),
"SELECT 1 " + exp,
- checkparams=params
+ checkparams=params,
)
def test_limit_offset_select_literal_binds(self):
stmt = select([1]).limit(5).offset(6)
self.assert_compile(
- stmt,
- "SELECT 1 LIMIT 5 OFFSET 6",
- literal_binds=True
+ stmt, "SELECT 1 LIMIT 5 OFFSET 6", literal_binds=True
)
def test_limit_offset_compound_select_literal_binds(self):
@@ -253,25 +320,24 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
stmt,
"SELECT 1 UNION SELECT 2 LIMIT 5 OFFSET 6",
- literal_binds=True
+ literal_binds=True,
)
def test_select_precol_compile_ordering(self):
- s1 = select([column('x')]).select_from(text('a')).limit(5).as_scalar()
+ s1 = select([column("x")]).select_from(text("a")).limit(5).as_scalar()
s2 = select([s1]).limit(10)
class MyCompiler(compiler.SQLCompiler):
-
def get_select_precolumns(self, select, **kw):
result = ""
if select._limit:
result += "FIRST %s " % self.process(
- literal(
- select._limit), **kw)
+ literal(select._limit), **kw
+ )
if select._offset:
result += "SKIP %s " % self.process(
- literal(
- select._offset), **kw)
+ literal(select._offset), **kw
+ )
return result
def limit_clause(self, select, **kw):
@@ -279,13 +345,13 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
dialect = default.DefaultDialect()
dialect.statement_compiler = MyCompiler
- dialect.paramstyle = 'qmark'
+ dialect.paramstyle = "qmark"
dialect.positional = True
self.assert_compile(
s2,
"SELECT FIRST ? (SELECT FIRST ? x FROM a) AS anon_1",
checkpositional=(10, 5),
- dialect=dialect
+ dialect=dialect,
)
def test_from_subquery(self):
@@ -293,16 +359,15 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
another select, for the
purposes of selecting from the exported columns of that select."""
- s = select([table1], table1.c.name == 'jack')
+ s = select([table1], table1.c.name == "jack")
self.assert_compile(
- select(
- [s],
- s.c.myid == 7),
+ select([s], s.c.myid == 7),
"SELECT myid, name, description FROM "
"(SELECT mytable.myid AS myid, "
"mytable.name AS name, mytable.description AS description "
"FROM mytable "
- "WHERE mytable.name = :name_1) WHERE myid = :myid_1")
+ "WHERE mytable.name = :name_1) WHERE myid = :myid_1",
+ )
sq = select([table1])
self.assert_compile(
@@ -310,44 +375,42 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT myid, name, description FROM "
"(SELECT mytable.myid AS myid, "
"mytable.name AS name, mytable.description "
- "AS description FROM mytable)"
+ "AS description FROM mytable)",
)
- sq = select(
- [table1],
- ).alias('sq')
+ sq = select([table1]).alias("sq")
self.assert_compile(
sq.select(sq.c.myid == 7),
"SELECT sq.myid, sq.name, sq.description FROM "
"(SELECT mytable.myid AS myid, mytable.name AS name, "
"mytable.description AS description FROM mytable) AS sq "
- "WHERE sq.myid = :myid_1"
+ "WHERE sq.myid = :myid_1",
)
sq = select(
[table1, table2],
and_(table1.c.myid == 7, table2.c.otherid == table1.c.myid),
- use_labels=True
- ).alias('sq')
-
- sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS "\
- "mytable_name, mytable.description AS mytable_description, "\
- "myothertable.otherid AS myothertable_otherid, "\
- "myothertable.othername AS myothertable_othername FROM "\
- "mytable, myothertable WHERE mytable.myid = :myid_1 AND "\
+ use_labels=True,
+ ).alias("sq")
+
+ sqstring = (
+ "SELECT mytable.myid AS mytable_myid, mytable.name AS "
+ "mytable_name, mytable.description AS mytable_description, "
+ "myothertable.otherid AS myothertable_otherid, "
+ "myothertable.othername AS myothertable_othername FROM "
+ "mytable, myothertable WHERE mytable.myid = :myid_1 AND "
"myothertable.otherid = mytable.myid"
+ )
self.assert_compile(
sq.select(),
"SELECT sq.mytable_myid, sq.mytable_name, "
"sq.mytable_description, sq.myothertable_otherid, "
- "sq.myothertable_othername FROM (%s) AS sq" % sqstring)
+ "sq.myothertable_othername FROM (%s) AS sq" % sqstring,
+ )
- sq2 = select(
- [sq],
- use_labels=True
- ).alias('sq2')
+ sq2 = select([sq], use_labels=True).alias("sq2")
self.assert_compile(
sq2.select(),
@@ -359,53 +422,53 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"sq.mytable_description AS sq_mytable_description, "
"sq.myothertable_otherid AS sq_myothertable_otherid, "
"sq.myothertable_othername AS sq_myothertable_othername "
- "FROM (%s) AS sq) AS sq2" % sqstring)
+ "FROM (%s) AS sq) AS sq2" % sqstring,
+ )
def test_select_from_clauselist(self):
self.assert_compile(
- select([ClauseList(column('a'), column('b'))]
- ).select_from(text('sometable')),
- 'SELECT a, b FROM sometable'
+ select([ClauseList(column("a"), column("b"))]).select_from(
+ text("sometable")
+ ),
+ "SELECT a, b FROM sometable",
)
def test_use_labels(self):
self.assert_compile(
select([table1.c.myid == 5], use_labels=True),
- "SELECT mytable.myid = :myid_1 AS anon_1 FROM mytable"
+ "SELECT mytable.myid = :myid_1 AS anon_1 FROM mytable",
)
self.assert_compile(
- select([func.foo()], use_labels=True),
- "SELECT foo() AS foo_1"
+ select([func.foo()], use_labels=True), "SELECT foo() AS foo_1"
)
# this is native_boolean=False for default dialect
self.assert_compile(
select([not_(True)], use_labels=True),
- "SELECT :param_1 = 0 AS anon_1"
+ "SELECT :param_1 = 0 AS anon_1",
)
self.assert_compile(
select([cast("data", Integer)], use_labels=True),
- "SELECT CAST(:param_1 AS INTEGER) AS anon_1"
+ "SELECT CAST(:param_1 AS INTEGER) AS anon_1",
)
self.assert_compile(
- select([func.sum(
- func.lala(table1.c.myid).label('foo')).label('bar')]),
- "SELECT sum(lala(mytable.myid)) AS bar FROM mytable"
+ select(
+ [func.sum(func.lala(table1.c.myid).label("foo")).label("bar")]
+ ),
+ "SELECT sum(lala(mytable.myid)) AS bar FROM mytable",
)
self.assert_compile(
- select([keyed]),
- "SELECT keyed.x, keyed.y"
- ", keyed.z FROM keyed"
+ select([keyed]), "SELECT keyed.x, keyed.y" ", keyed.z FROM keyed"
)
self.assert_compile(
select([keyed]).apply_labels(),
"SELECT keyed.x AS keyed_x, keyed.y AS "
- "keyed_y, keyed.z AS keyed_z FROM keyed"
+ "keyed_y, keyed.z AS keyed_z FROM keyed",
)
def test_paramstyles(self):
@@ -414,40 +477,40 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
stmt,
"select ?, ?, ? from sometable",
- dialect=default.DefaultDialect(paramstyle='qmark')
+ dialect=default.DefaultDialect(paramstyle="qmark"),
)
self.assert_compile(
stmt,
"select :foo, :bar, :bat from sometable",
- dialect=default.DefaultDialect(paramstyle='named')
+ dialect=default.DefaultDialect(paramstyle="named"),
)
self.assert_compile(
stmt,
"select %s, %s, %s from sometable",
- dialect=default.DefaultDialect(paramstyle='format')
+ dialect=default.DefaultDialect(paramstyle="format"),
)
self.assert_compile(
stmt,
"select :1, :2, :3 from sometable",
- dialect=default.DefaultDialect(paramstyle='numeric')
+ dialect=default.DefaultDialect(paramstyle="numeric"),
)
self.assert_compile(
stmt,
"select %(foo)s, %(bar)s, %(bat)s from sometable",
- dialect=default.DefaultDialect(paramstyle='pyformat')
+ dialect=default.DefaultDialect(paramstyle="pyformat"),
)
def test_anon_param_name_on_keys(self):
self.assert_compile(
keyed.insert(),
"INSERT INTO keyed (x, y, z) VALUES (%(colx)s, %(coly)s, %(z)s)",
- dialect=default.DefaultDialect(paramstyle='pyformat')
+ dialect=default.DefaultDialect(paramstyle="pyformat"),
)
self.assert_compile(
keyed.c.coly == 5,
"keyed.y = %(coly_1)s",
- checkparams={'coly_1': 5},
- dialect=default.DefaultDialect(paramstyle='pyformat')
+ checkparams={"coly_1": 5},
+ dialect=default.DefaultDialect(paramstyle="pyformat"),
)
def test_dupe_columns(self):
@@ -455,51 +518,54 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
element identity, not rendered result."""
self.assert_compile(
- select([column('a'), column('a'), column('a')]),
- "SELECT a, a, a", dialect=default.DefaultDialect()
+ select([column("a"), column("a"), column("a")]),
+ "SELECT a, a, a",
+ dialect=default.DefaultDialect(),
)
- c = column('a')
+ c = column("a")
self.assert_compile(
- select([c, c, c]),
- "SELECT a", dialect=default.DefaultDialect()
+ select([c, c, c]), "SELECT a", dialect=default.DefaultDialect()
)
- a, b = column('a'), column('b')
+ a, b = column("a"), column("b")
self.assert_compile(
select([a, b, b, b, a, a]),
- "SELECT a, b", dialect=default.DefaultDialect()
+ "SELECT a, b",
+ dialect=default.DefaultDialect(),
)
# using alternate keys.
- a, b, c = Column('a', Integer, key='b'), \
- Column('b', Integer), \
- Column('c', Integer, key='a')
+ a, b, c = (
+ Column("a", Integer, key="b"),
+ Column("b", Integer),
+ Column("c", Integer, key="a"),
+ )
self.assert_compile(
select([a, b, c, a, b, c]),
- "SELECT a, b, c", dialect=default.DefaultDialect()
+ "SELECT a, b, c",
+ dialect=default.DefaultDialect(),
)
self.assert_compile(
- select([bindparam('a'), bindparam('b'), bindparam('c')]),
+ select([bindparam("a"), bindparam("b"), bindparam("c")]),
"SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3",
- dialect=default.DefaultDialect(paramstyle='named')
+ dialect=default.DefaultDialect(paramstyle="named"),
)
self.assert_compile(
- select([bindparam('a'), bindparam('b'), bindparam('c')]),
+ select([bindparam("a"), bindparam("b"), bindparam("c")]),
"SELECT ? AS anon_1, ? AS anon_2, ? AS anon_3",
- dialect=default.DefaultDialect(paramstyle='qmark'),
+ dialect=default.DefaultDialect(paramstyle="qmark"),
)
self.assert_compile(
- select([column("a"), column("a"), column("a")]),
- "SELECT a, a, a"
+ select([column("a"), column("a"), column("a")]), "SELECT a, a, a"
)
- s = select([bindparam('a'), bindparam('b'), bindparam('c')])
- s = s.compile(dialect=default.DefaultDialect(paramstyle='qmark'))
- eq_(s.positiontup, ['a', 'b', 'c'])
+ s = select([bindparam("a"), bindparam("b"), bindparam("c")])
+ s = s.compile(dialect=default.DefaultDialect(paramstyle="qmark"))
+ eq_(s.positiontup, ["a", "b", "c"])
def test_nested_label_targeting(self):
"""test nested anonymous label generation.
@@ -510,234 +576,275 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
s3 = select([s2], use_labels=True)
s4 = s3.alias()
s5 = select([s4], use_labels=True)
- self.assert_compile(s5,
- 'SELECT anon_1.anon_2_myid AS '
- 'anon_1_anon_2_myid, anon_1.anon_2_name AS '
- 'anon_1_anon_2_name, anon_1.anon_2_descript'
- 'ion AS anon_1_anon_2_description FROM '
- '(SELECT anon_2.myid AS anon_2_myid, '
- 'anon_2.name AS anon_2_name, '
- 'anon_2.description AS anon_2_description '
- 'FROM (SELECT mytable.myid AS myid, '
- 'mytable.name AS name, mytable.description '
- 'AS description FROM mytable) AS anon_2) '
- 'AS anon_1')
+ self.assert_compile(
+ s5,
+ "SELECT anon_1.anon_2_myid AS "
+ "anon_1_anon_2_myid, anon_1.anon_2_name AS "
+ "anon_1_anon_2_name, anon_1.anon_2_descript"
+ "ion AS anon_1_anon_2_description FROM "
+ "(SELECT anon_2.myid AS anon_2_myid, "
+ "anon_2.name AS anon_2_name, "
+ "anon_2.description AS anon_2_description "
+ "FROM (SELECT mytable.myid AS myid, "
+ "mytable.name AS name, mytable.description "
+ "AS description FROM mytable) AS anon_2) "
+ "AS anon_1",
+ )
def test_nested_label_targeting_keyed(self):
s1 = keyed.select()
s2 = s1.alias()
s3 = select([s2], use_labels=True)
- self.assert_compile(s3,
- "SELECT anon_1.x AS anon_1_x, "
- "anon_1.y AS anon_1_y, "
- "anon_1.z AS anon_1_z FROM "
- "(SELECT keyed.x AS x, keyed.y "
- "AS y, keyed.z AS z FROM keyed) AS anon_1")
+ self.assert_compile(
+ s3,
+ "SELECT anon_1.x AS anon_1_x, "
+ "anon_1.y AS anon_1_y, "
+ "anon_1.z AS anon_1_z FROM "
+ "(SELECT keyed.x AS x, keyed.y "
+ "AS y, keyed.z AS z FROM keyed) AS anon_1",
+ )
s4 = s3.alias()
s5 = select([s4], use_labels=True)
- self.assert_compile(s5,
- "SELECT anon_1.anon_2_x AS anon_1_anon_2_x, "
- "anon_1.anon_2_y AS anon_1_anon_2_y, "
- "anon_1.anon_2_z AS anon_1_anon_2_z "
- "FROM (SELECT anon_2.x AS anon_2_x, "
- "anon_2.y AS anon_2_y, "
- "anon_2.z AS anon_2_z FROM "
- "(SELECT keyed.x AS x, keyed.y AS y, keyed.z "
- "AS z FROM keyed) AS anon_2) AS anon_1"
- )
+ self.assert_compile(
+ s5,
+ "SELECT anon_1.anon_2_x AS anon_1_anon_2_x, "
+ "anon_1.anon_2_y AS anon_1_anon_2_y, "
+ "anon_1.anon_2_z AS anon_1_anon_2_z "
+ "FROM (SELECT anon_2.x AS anon_2_x, "
+ "anon_2.y AS anon_2_y, "
+ "anon_2.z AS anon_2_z FROM "
+ "(SELECT keyed.x AS x, keyed.y AS y, keyed.z "
+ "AS z FROM keyed) AS anon_2) AS anon_1",
+ )
def test_exists(self):
s = select([table1.c.myid]).where(table1.c.myid == 5)
- self.assert_compile(exists(s),
- "EXISTS (SELECT mytable.myid FROM mytable "
- "WHERE mytable.myid = :myid_1)"
- )
-
- self.assert_compile(exists(s.as_scalar()),
- "EXISTS (SELECT mytable.myid FROM mytable "
- "WHERE mytable.myid = :myid_1)"
- )
-
- self.assert_compile(exists([table1.c.myid], table1.c.myid
- == 5).select(),
- 'SELECT EXISTS (SELECT mytable.myid FROM '
- 'mytable WHERE mytable.myid = :myid_1) AS anon_1',
- params={'mytable_myid': 5})
- self.assert_compile(select([table1, exists([1],
- from_obj=table2)]),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description, EXISTS (SELECT 1 '
- 'FROM myothertable) AS anon_1 FROM mytable',
- params={})
- self.assert_compile(select([table1,
- exists([1],
- from_obj=table2).label('foo')]),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description, EXISTS (SELECT 1 '
- 'FROM myothertable) AS foo FROM mytable',
- params={})
+ self.assert_compile(
+ exists(s),
+ "EXISTS (SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid = :myid_1)",
+ )
+
+ self.assert_compile(
+ exists(s.as_scalar()),
+ "EXISTS (SELECT mytable.myid FROM mytable "
+ "WHERE mytable.myid = :myid_1)",
+ )
+
+ self.assert_compile(
+ exists([table1.c.myid], table1.c.myid == 5).select(),
+ "SELECT EXISTS (SELECT mytable.myid FROM "
+ "mytable WHERE mytable.myid = :myid_1) AS anon_1",
+ params={"mytable_myid": 5},
+ )
+ self.assert_compile(
+ select([table1, exists([1], from_obj=table2)]),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description, EXISTS (SELECT 1 "
+ "FROM myothertable) AS anon_1 FROM mytable",
+ params={},
+ )
+ self.assert_compile(
+ select([table1, exists([1], from_obj=table2).label("foo")]),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description, EXISTS (SELECT 1 "
+ "FROM myothertable) AS foo FROM mytable",
+ params={},
+ )
self.assert_compile(
table1.select(
- exists().where(
- table2.c.otherid == table1.c.myid).correlate(table1)),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable WHERE '
- 'EXISTS (SELECT * FROM myothertable WHERE '
- 'myothertable.otherid = mytable.myid)')
+ exists()
+ .where(table2.c.otherid == table1.c.myid)
+ .correlate(table1)
+ ),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable WHERE "
+ "EXISTS (SELECT * FROM myothertable WHERE "
+ "myothertable.otherid = mytable.myid)",
+ )
self.assert_compile(
table1.select(
- exists().where(
- table2.c.otherid == table1.c.myid).correlate(table1)),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable WHERE '
- 'EXISTS (SELECT * FROM myothertable WHERE '
- 'myothertable.otherid = mytable.myid)')
+ exists()
+ .where(table2.c.otherid == table1.c.myid)
+ .correlate(table1)
+ ),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable WHERE "
+ "EXISTS (SELECT * FROM myothertable WHERE "
+ "myothertable.otherid = mytable.myid)",
+ )
self.assert_compile(
table1.select(
- exists().where(
- table2.c.otherid == table1.c.myid).correlate(table1)
- ).replace_selectable(
- table2,
- table2.alias()),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable WHERE '
- 'EXISTS (SELECT * FROM myothertable AS '
- 'myothertable_1 WHERE myothertable_1.otheri'
- 'd = mytable.myid)')
+ exists()
+ .where(table2.c.otherid == table1.c.myid)
+ .correlate(table1)
+ ).replace_selectable(table2, table2.alias()),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable WHERE "
+ "EXISTS (SELECT * FROM myothertable AS "
+ "myothertable_1 WHERE myothertable_1.otheri"
+ "d = mytable.myid)",
+ )
self.assert_compile(
table1.select(
- exists().where(
- table2.c.otherid == table1.c.myid).correlate(table1)).
- select_from(
- table1.join(
- table2,
- table1.c.myid == table2.c.otherid)).
- replace_selectable(
- table2,
- table2.alias()),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable JOIN '
- 'myothertable AS myothertable_1 ON '
- 'mytable.myid = myothertable_1.otherid '
- 'WHERE EXISTS (SELECT * FROM myothertable '
- 'AS myothertable_1 WHERE '
- 'myothertable_1.otherid = mytable.myid)')
-
- self.assert_compile(
- select([
- or_(
- exists().where(table2.c.otherid == 'foo'),
- exists().where(table2.c.otherid == 'bar')
- )
- ]),
+ exists()
+ .where(table2.c.otherid == table1.c.myid)
+ .correlate(table1)
+ )
+ .select_from(
+ table1.join(table2, table1.c.myid == table2.c.otherid)
+ )
+ .replace_selectable(table2, table2.alias()),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable JOIN "
+ "myothertable AS myothertable_1 ON "
+ "mytable.myid = myothertable_1.otherid "
+ "WHERE EXISTS (SELECT * FROM myothertable "
+ "AS myothertable_1 WHERE "
+ "myothertable_1.otherid = mytable.myid)",
+ )
+
+ self.assert_compile(
+ select(
+ [
+ or_(
+ exists().where(table2.c.otherid == "foo"),
+ exists().where(table2.c.otherid == "bar"),
+ )
+ ]
+ ),
"SELECT (EXISTS (SELECT * FROM myothertable "
"WHERE myothertable.otherid = :otherid_1)) "
"OR (EXISTS (SELECT * FROM myothertable WHERE "
- "myothertable.otherid = :otherid_2)) AS anon_1"
+ "myothertable.otherid = :otherid_2)) AS anon_1",
)
self.assert_compile(
- select([exists([1])]),
- "SELECT EXISTS (SELECT 1) AS anon_1"
+ select([exists([1])]), "SELECT EXISTS (SELECT 1) AS anon_1"
)
self.assert_compile(
- select([~exists([1])]),
- "SELECT NOT (EXISTS (SELECT 1)) AS anon_1"
+ select([~exists([1])]), "SELECT NOT (EXISTS (SELECT 1)) AS anon_1"
)
self.assert_compile(
select([~(~exists([1]))]),
- "SELECT NOT (NOT (EXISTS (SELECT 1))) AS anon_1"
+ "SELECT NOT (NOT (EXISTS (SELECT 1))) AS anon_1",
)
def test_where_subquery(self):
- s = select([addresses.c.street], addresses.c.user_id
- == users.c.user_id, correlate=True).alias('s')
+ s = select(
+ [addresses.c.street],
+ addresses.c.user_id == users.c.user_id,
+ correlate=True,
+ ).alias("s")
# don't correlate in a FROM list
- self.assert_compile(select([users, s.c.street], from_obj=s),
- "SELECT users.user_id, users.user_name, "
- "users.password, s.street FROM users, "
- "(SELECT addresses.street AS street FROM "
- "addresses, users WHERE addresses.user_id = "
- "users.user_id) AS s")
- self.assert_compile(table1.select(
- table1.c.myid == select(
- [table1.c.myid],
- table1.c.name == 'jack')),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable WHERE '
- 'mytable.myid = (SELECT mytable.myid FROM '
- 'mytable WHERE mytable.name = :name_1)')
+ self.assert_compile(
+ select([users, s.c.street], from_obj=s),
+ "SELECT users.user_id, users.user_name, "
+ "users.password, s.street FROM users, "
+ "(SELECT addresses.street AS street FROM "
+ "addresses, users WHERE addresses.user_id = "
+ "users.user_id) AS s",
+ )
+ self.assert_compile(
+ table1.select(
+ table1.c.myid
+ == select([table1.c.myid], table1.c.name == "jack")
+ ),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable WHERE "
+ "mytable.myid = (SELECT mytable.myid FROM "
+ "mytable WHERE mytable.name = :name_1)",
+ )
self.assert_compile(
table1.select(
- table1.c.myid == select(
- [table2.c.otherid],
- table1.c.name == table2.c.othername
+ table1.c.myid
+ == select(
+ [table2.c.otherid], table1.c.name == table2.c.othername
)
),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable WHERE '
- 'mytable.myid = (SELECT '
- 'myothertable.otherid FROM myothertable '
- 'WHERE mytable.name = myothertable.othernam'
- 'e)')
- self.assert_compile(table1.select(exists([1], table2.c.otherid
- == table1.c.myid)),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable WHERE '
- 'EXISTS (SELECT 1 FROM myothertable WHERE '
- 'myothertable.otherid = mytable.myid)')
- talias = table1.alias('ta')
- s = subquery('sq2', [talias], exists([1], table2.c.otherid
- == talias.c.myid))
- self.assert_compile(select([s, table1]),
- 'SELECT sq2.myid, sq2.name, '
- 'sq2.description, mytable.myid, '
- 'mytable.name, mytable.description FROM '
- '(SELECT ta.myid AS myid, ta.name AS name, '
- 'ta.description AS description FROM '
- 'mytable AS ta WHERE EXISTS (SELECT 1 FROM '
- 'myothertable WHERE myothertable.otherid = '
- 'ta.myid)) AS sq2, mytable')
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable WHERE "
+ "mytable.myid = (SELECT "
+ "myothertable.otherid FROM myothertable "
+ "WHERE mytable.name = myothertable.othernam"
+ "e)",
+ )
+ self.assert_compile(
+ table1.select(exists([1], table2.c.otherid == table1.c.myid)),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable WHERE "
+ "EXISTS (SELECT 1 FROM myothertable WHERE "
+ "myothertable.otherid = mytable.myid)",
+ )
+ talias = table1.alias("ta")
+ s = subquery(
+ "sq2", [talias], exists([1], table2.c.otherid == talias.c.myid)
+ )
+ self.assert_compile(
+ select([s, table1]),
+ "SELECT sq2.myid, sq2.name, "
+ "sq2.description, mytable.myid, "
+ "mytable.name, mytable.description FROM "
+ "(SELECT ta.myid AS myid, ta.name AS name, "
+ "ta.description AS description FROM "
+ "mytable AS ta WHERE EXISTS (SELECT 1 FROM "
+ "myothertable WHERE myothertable.otherid = "
+ "ta.myid)) AS sq2, mytable",
+ )
# test constructing the outer query via append_column(), which
# occurs in the ORM's Query object
- s = select([], exists([1], table2.c.otherid == table1.c.myid),
- from_obj=table1)
+ s = select(
+ [], exists([1], table2.c.otherid == table1.c.myid), from_obj=table1
+ )
s.append_column(table1)
- self.assert_compile(s,
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable WHERE '
- 'EXISTS (SELECT 1 FROM myothertable WHERE '
- 'myothertable.otherid = mytable.myid)')
+ self.assert_compile(
+ s,
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable WHERE "
+ "EXISTS (SELECT 1 FROM myothertable WHERE "
+ "myothertable.otherid = mytable.myid)",
+ )
def test_orderby_subquery(self):
self.assert_compile(
table1.select(
order_by=[
select(
- [
- table2.c.otherid],
- table1.c.myid == table2.c.otherid)]),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable ORDER BY '
- '(SELECT myothertable.otherid FROM '
- 'myothertable WHERE mytable.myid = '
- 'myothertable.otherid)')
- self.assert_compile(table1.select(order_by=[
- desc(select([table2.c.otherid],
- table1.c.myid == table2.c.otherid))]),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description FROM mytable ORDER BY '
- '(SELECT myothertable.otherid FROM '
- 'myothertable WHERE mytable.myid = '
- 'myothertable.otherid) DESC')
+ [table2.c.otherid], table1.c.myid == table2.c.otherid
+ )
+ ]
+ ),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable ORDER BY "
+ "(SELECT myothertable.otherid FROM "
+ "myothertable WHERE mytable.myid = "
+ "myothertable.otherid)",
+ )
+ self.assert_compile(
+ table1.select(
+ order_by=[
+ desc(
+ select(
+ [table2.c.otherid],
+ table1.c.myid == table2.c.otherid,
+ )
+ )
+ ]
+ ),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable ORDER BY "
+ "(SELECT myothertable.otherid FROM "
+ "myothertable WHERE mytable.myid = "
+ "myothertable.otherid) DESC",
+ )
def test_scalar_select(self):
assert_raises_message(
@@ -745,72 +852,86 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
r"Select objects don't have a type\. Call as_scalar\(\) "
r"on this Select object to return a 'scalar' "
r"version of this Select\.",
- func.coalesce, select([table1.c.myid])
+ func.coalesce,
+ select([table1.c.myid]),
)
s = select([table1.c.myid], correlate=False).as_scalar()
- self.assert_compile(select([table1, s]),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description, (SELECT mytable.myid '
- 'FROM mytable) AS anon_1 FROM mytable')
+ self.assert_compile(
+ select([table1, s]),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description, (SELECT mytable.myid "
+ "FROM mytable) AS anon_1 FROM mytable",
+ )
s = select([table1.c.myid]).as_scalar()
- self.assert_compile(select([table2, s]),
- 'SELECT myothertable.otherid, '
- 'myothertable.othername, (SELECT '
- 'mytable.myid FROM mytable) AS anon_1 FROM '
- 'myothertable')
+ self.assert_compile(
+ select([table2, s]),
+ "SELECT myothertable.otherid, "
+ "myothertable.othername, (SELECT "
+ "mytable.myid FROM mytable) AS anon_1 FROM "
+ "myothertable",
+ )
s = select([table1.c.myid]).correlate(None).as_scalar()
- self.assert_compile(select([table1, s]),
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description, (SELECT mytable.myid '
- 'FROM mytable) AS anon_1 FROM mytable')
+ self.assert_compile(
+ select([table1, s]),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description, (SELECT mytable.myid "
+ "FROM mytable) AS anon_1 FROM mytable",
+ )
s = select([table1.c.myid]).as_scalar()
s2 = s.where(table1.c.myid == 5)
self.assert_compile(
s2,
- "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
- )
- self.assert_compile(
- s, "(SELECT mytable.myid FROM mytable)"
+ "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
)
+ self.assert_compile(s, "(SELECT mytable.myid FROM mytable)")
# test that aliases use as_scalar() when used in an explicitly
# scalar context
s = select([table1.c.myid]).alias()
- self.assert_compile(select([table1.c.myid]).where(table1.c.myid
- == s),
- 'SELECT mytable.myid FROM mytable WHERE '
- 'mytable.myid = (SELECT mytable.myid FROM '
- 'mytable)')
- self.assert_compile(select([table1.c.myid]).where(s
- > table1.c.myid),
- 'SELECT mytable.myid FROM mytable WHERE '
- 'mytable.myid < (SELECT mytable.myid FROM '
- 'mytable)')
+ self.assert_compile(
+ select([table1.c.myid]).where(table1.c.myid == s),
+ "SELECT mytable.myid FROM mytable WHERE "
+ "mytable.myid = (SELECT mytable.myid FROM "
+ "mytable)",
+ )
+ self.assert_compile(
+ select([table1.c.myid]).where(s > table1.c.myid),
+ "SELECT mytable.myid FROM mytable WHERE "
+ "mytable.myid < (SELECT mytable.myid FROM "
+ "mytable)",
+ )
s = select([table1.c.myid]).as_scalar()
- self.assert_compile(select([table2, s]),
- 'SELECT myothertable.otherid, '
- 'myothertable.othername, (SELECT '
- 'mytable.myid FROM mytable) AS anon_1 FROM '
- 'myothertable')
+ self.assert_compile(
+ select([table2, s]),
+ "SELECT myothertable.otherid, "
+ "myothertable.othername, (SELECT "
+ "mytable.myid FROM mytable) AS anon_1 FROM "
+ "myothertable",
+ )
# test expressions against scalar selects
- self.assert_compile(select([s - literal(8)]),
- 'SELECT (SELECT mytable.myid FROM mytable) '
- '- :param_1 AS anon_1')
- self.assert_compile(select([select([table1.c.name]).as_scalar()
- + literal('x')]),
- 'SELECT (SELECT mytable.name FROM mytable) '
- '|| :param_1 AS anon_1')
- self.assert_compile(select([s > literal(8)]),
- 'SELECT (SELECT mytable.myid FROM mytable) '
- '> :param_1 AS anon_1')
- self.assert_compile(select([select([table1.c.name]).label('foo'
- )]),
- 'SELECT (SELECT mytable.name FROM mytable) '
- 'AS foo')
+ self.assert_compile(
+ select([s - literal(8)]),
+ "SELECT (SELECT mytable.myid FROM mytable) "
+ "- :param_1 AS anon_1",
+ )
+ self.assert_compile(
+ select([select([table1.c.name]).as_scalar() + literal("x")]),
+ "SELECT (SELECT mytable.name FROM mytable) "
+ "|| :param_1 AS anon_1",
+ )
+ self.assert_compile(
+ select([s > literal(8)]),
+ "SELECT (SELECT mytable.myid FROM mytable) "
+ "> :param_1 AS anon_1",
+ )
+ self.assert_compile(
+ select([select([table1.c.name]).label("foo")]),
+ "SELECT (SELECT mytable.name FROM mytable) " "AS foo",
+ )
# scalar selects should not have any attributes on their 'c' or
# 'columns' attribute
@@ -819,101 +940,129 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
try:
s.c.foo
except exc.InvalidRequestError as err:
- assert str(err) \
- == 'Scalar Select expression has no columns; use this '\
- 'object directly within a column-level expression.'
+ assert (
+ str(err)
+ == "Scalar Select expression has no columns; use this "
+ "object directly within a column-level expression."
+ )
try:
s.columns.foo
except exc.InvalidRequestError as err:
- assert str(err) \
- == 'Scalar Select expression has no columns; use this '\
- 'object directly within a column-level expression.'
-
- zips = table('zips',
- column('zipcode'),
- column('latitude'),
- column('longitude'),
- )
- places = table('places',
- column('id'),
- column('nm')
- )
- zip = '12345'
- qlat = select([zips.c.latitude], zips.c.zipcode == zip).\
- correlate(None).as_scalar()
- qlng = select([zips.c.longitude], zips.c.zipcode == zip).\
- correlate(None).as_scalar()
-
- q = select([places.c.id, places.c.nm, zips.c.zipcode,
- func.latlondist(qlat, qlng).label('dist')],
- zips.c.zipcode == zip,
- order_by=['dist', places.c.nm]
- )
-
- self.assert_compile(q,
- 'SELECT places.id, places.nm, '
- 'zips.zipcode, latlondist((SELECT '
- 'zips.latitude FROM zips WHERE '
- 'zips.zipcode = :zipcode_1), (SELECT '
- 'zips.longitude FROM zips WHERE '
- 'zips.zipcode = :zipcode_2)) AS dist FROM '
- 'places, zips WHERE zips.zipcode = '
- ':zipcode_3 ORDER BY dist, places.nm')
-
- zalias = zips.alias('main_zip')
- qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).\
- as_scalar()
- qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).\
- as_scalar()
- q = select([places.c.id, places.c.nm, zalias.c.zipcode,
- func.latlondist(qlat, qlng).label('dist')],
- order_by=['dist', places.c.nm])
- self.assert_compile(q,
- 'SELECT places.id, places.nm, '
- 'main_zip.zipcode, latlondist((SELECT '
- 'zips.latitude FROM zips WHERE '
- 'zips.zipcode = main_zip.zipcode), (SELECT '
- 'zips.longitude FROM zips WHERE '
- 'zips.zipcode = main_zip.zipcode)) AS dist '
- 'FROM places, zips AS main_zip ORDER BY '
- 'dist, places.nm')
-
- a1 = table2.alias('t2alias')
+ assert (
+ str(err)
+ == "Scalar Select expression has no columns; use this "
+ "object directly within a column-level expression."
+ )
+
+ zips = table(
+ "zips", column("zipcode"), column("latitude"), column("longitude")
+ )
+ places = table("places", column("id"), column("nm"))
+ zip = "12345"
+ qlat = (
+ select([zips.c.latitude], zips.c.zipcode == zip)
+ .correlate(None)
+ .as_scalar()
+ )
+ qlng = (
+ select([zips.c.longitude], zips.c.zipcode == zip)
+ .correlate(None)
+ .as_scalar()
+ )
+
+ q = select(
+ [
+ places.c.id,
+ places.c.nm,
+ zips.c.zipcode,
+ func.latlondist(qlat, qlng).label("dist"),
+ ],
+ zips.c.zipcode == zip,
+ order_by=["dist", places.c.nm],
+ )
+
+ self.assert_compile(
+ q,
+ "SELECT places.id, places.nm, "
+ "zips.zipcode, latlondist((SELECT "
+ "zips.latitude FROM zips WHERE "
+ "zips.zipcode = :zipcode_1), (SELECT "
+ "zips.longitude FROM zips WHERE "
+ "zips.zipcode = :zipcode_2)) AS dist FROM "
+ "places, zips WHERE zips.zipcode = "
+ ":zipcode_3 ORDER BY dist, places.nm",
+ )
+
+ zalias = zips.alias("main_zip")
+ qlat = select(
+ [zips.c.latitude], zips.c.zipcode == zalias.c.zipcode
+ ).as_scalar()
+ qlng = select(
+ [zips.c.longitude], zips.c.zipcode == zalias.c.zipcode
+ ).as_scalar()
+ q = select(
+ [
+ places.c.id,
+ places.c.nm,
+ zalias.c.zipcode,
+ func.latlondist(qlat, qlng).label("dist"),
+ ],
+ order_by=["dist", places.c.nm],
+ )
+ self.assert_compile(
+ q,
+ "SELECT places.id, places.nm, "
+ "main_zip.zipcode, latlondist((SELECT "
+ "zips.latitude FROM zips WHERE "
+ "zips.zipcode = main_zip.zipcode), (SELECT "
+ "zips.longitude FROM zips WHERE "
+ "zips.zipcode = main_zip.zipcode)) AS dist "
+ "FROM places, zips AS main_zip ORDER BY "
+ "dist, places.nm",
+ )
+
+ a1 = table2.alias("t2alias")
s1 = select([a1.c.otherid], table1.c.myid == a1.c.otherid).as_scalar()
j1 = table1.join(table2, table1.c.myid == table2.c.otherid)
s2 = select([table1, s1], from_obj=j1)
- self.assert_compile(s2,
- 'SELECT mytable.myid, mytable.name, '
- 'mytable.description, (SELECT '
- 't2alias.otherid FROM myothertable AS '
- 't2alias WHERE mytable.myid = '
- 't2alias.otherid) AS anon_1 FROM mytable '
- 'JOIN myothertable ON mytable.myid = '
- 'myothertable.otherid')
+ self.assert_compile(
+ s2,
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description, (SELECT "
+ "t2alias.otherid FROM myothertable AS "
+ "t2alias WHERE mytable.myid = "
+ "t2alias.otherid) AS anon_1 FROM mytable "
+ "JOIN myothertable ON mytable.myid = "
+ "myothertable.otherid",
+ )
def test_label_comparison_one(self):
- x = func.lala(table1.c.myid).label('foo')
- self.assert_compile(select([x], x == 5),
- 'SELECT lala(mytable.myid) AS foo FROM '
- 'mytable WHERE lala(mytable.myid) = '
- ':param_1')
+ x = func.lala(table1.c.myid).label("foo")
+ self.assert_compile(
+ select([x], x == 5),
+ "SELECT lala(mytable.myid) AS foo FROM "
+ "mytable WHERE lala(mytable.myid) = "
+ ":param_1",
+ )
def test_label_comparison_two(self):
self.assert_compile(
- label('bar', column('foo', type_=String)) + 'foo',
- 'foo || :param_1')
+ label("bar", column("foo", type_=String)) + "foo",
+ "foo || :param_1",
+ )
def test_order_by_labels_enabled(self):
- lab1 = (table1.c.myid + 12).label('foo')
- lab2 = func.somefunc(table1.c.name).label('bar')
+ lab1 = (table1.c.myid + 12).label("foo")
+ lab2 = func.somefunc(table1.c.name).label("bar")
dialect = default.DefaultDialect()
- self.assert_compile(select([lab1, lab2]).order_by(lab1, desc(lab2)),
- "SELECT mytable.myid + :myid_1 AS foo, "
- "somefunc(mytable.name) AS bar FROM mytable "
- "ORDER BY foo, bar DESC",
- dialect=dialect
- )
+ self.assert_compile(
+ select([lab1, lab2]).order_by(lab1, desc(lab2)),
+ "SELECT mytable.myid + :myid_1 AS foo, "
+ "somefunc(mytable.name) AS bar FROM mytable "
+ "ORDER BY foo, bar DESC",
+ dialect=dialect,
+ )
# the function embedded label renders as the function
self.assert_compile(
@@ -921,16 +1070,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT mytable.myid + :myid_1 AS foo, "
"somefunc(mytable.name) AS bar FROM mytable "
"ORDER BY hoho(mytable.myid + :myid_1), bar DESC",
- dialect=dialect
+ dialect=dialect,
)
# binary expressions render as the expression without labels
- self.assert_compile(select([lab1, lab2]).order_by(lab1 + "test"),
- "SELECT mytable.myid + :myid_1 AS foo, "
- "somefunc(mytable.name) AS bar FROM mytable "
- "ORDER BY mytable.myid + :myid_1 + :param_1",
- dialect=dialect
- )
+ self.assert_compile(
+ select([lab1, lab2]).order_by(lab1 + "test"),
+ "SELECT mytable.myid + :myid_1 AS foo, "
+ "somefunc(mytable.name) AS bar FROM mytable "
+ "ORDER BY mytable.myid + :myid_1 + :param_1",
+ dialect=dialect,
+ )
# labels within functions in the columns clause render
# with the expression
@@ -939,98 +1089,92 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT mytable.myid + :myid_1 AS foo, "
"foo(mytable.myid + :myid_1) AS foo_1 FROM mytable "
"ORDER BY foo, foo(mytable.myid + :myid_1)",
- dialect=dialect
+ dialect=dialect,
)
- lx = (table1.c.myid + table1.c.myid).label('lx')
- ly = (func.lower(table1.c.name) + table1.c.description).label('ly')
+ lx = (table1.c.myid + table1.c.myid).label("lx")
+ ly = (func.lower(table1.c.name) + table1.c.description).label("ly")
self.assert_compile(
select([lx, ly]).order_by(lx, ly.desc()),
"SELECT mytable.myid + mytable.myid AS lx, "
"lower(mytable.name) || mytable.description AS ly "
"FROM mytable ORDER BY lx, ly DESC",
- dialect=dialect
+ dialect=dialect,
)
# expression isn't actually the same thing (even though label is)
self.assert_compile(
select([lab1, lab2]).order_by(
- table1.c.myid.label('foo'),
- desc(table1.c.name.label('bar'))
+ table1.c.myid.label("foo"), desc(table1.c.name.label("bar"))
),
"SELECT mytable.myid + :myid_1 AS foo, "
"somefunc(mytable.name) AS bar FROM mytable "
"ORDER BY mytable.myid, mytable.name DESC",
- dialect=dialect
+ dialect=dialect,
)
# it's also an exact match, not aliased etc.
self.assert_compile(
select([lab1, lab2]).order_by(
- desc(table1.alias().c.name.label('bar'))
+ desc(table1.alias().c.name.label("bar"))
),
"SELECT mytable.myid + :myid_1 AS foo, "
"somefunc(mytable.name) AS bar FROM mytable "
"ORDER BY mytable_1.name DESC",
- dialect=dialect
+ dialect=dialect,
)
# but! it's based on lineage
lab2_lineage = lab2.element._clone()
self.assert_compile(
- select([lab1, lab2]).order_by(
- desc(lab2_lineage.label('bar'))
- ),
+ select([lab1, lab2]).order_by(desc(lab2_lineage.label("bar"))),
"SELECT mytable.myid + :myid_1 AS foo, "
"somefunc(mytable.name) AS bar FROM mytable "
"ORDER BY bar DESC",
- dialect=dialect
+ dialect=dialect,
)
# here, 'name' is implicitly available, but w/ #3882 we don't
# want to render a name that isn't specifically a Label elsewhere
# in the query
self.assert_compile(
- select([table1.c.myid]).order_by(table1.c.name.label('name')),
- "SELECT mytable.myid FROM mytable ORDER BY mytable.name"
+ select([table1.c.myid]).order_by(table1.c.name.label("name")),
+ "SELECT mytable.myid FROM mytable ORDER BY mytable.name",
)
# as well as if it doesn't match
self.assert_compile(
select([table1.c.myid]).order_by(
- func.lower(table1.c.name).label('name')),
- "SELECT mytable.myid FROM mytable ORDER BY lower(mytable.name)"
+ func.lower(table1.c.name).label("name")
+ ),
+ "SELECT mytable.myid FROM mytable ORDER BY lower(mytable.name)",
)
def test_order_by_labels_disabled(self):
- lab1 = (table1.c.myid + 12).label('foo')
- lab2 = func.somefunc(table1.c.name).label('bar')
+ lab1 = (table1.c.myid + 12).label("foo")
+ lab2 = func.somefunc(table1.c.name).label("bar")
dialect = default.DefaultDialect()
dialect.supports_simple_order_by_label = False
self.assert_compile(
- select(
- [
- lab1,
- lab2]).order_by(
- lab1,
- desc(lab2)),
+ select([lab1, lab2]).order_by(lab1, desc(lab2)),
"SELECT mytable.myid + :myid_1 AS foo, "
"somefunc(mytable.name) AS bar FROM mytable "
"ORDER BY mytable.myid + :myid_1, somefunc(mytable.name) DESC",
- dialect=dialect)
+ dialect=dialect,
+ )
self.assert_compile(
select([lab1, lab2]).order_by(func.hoho(lab1), desc(lab2)),
"SELECT mytable.myid + :myid_1 AS foo, "
"somefunc(mytable.name) AS bar FROM mytable "
"ORDER BY hoho(mytable.myid + :myid_1), "
"somefunc(mytable.name) DESC",
- dialect=dialect
+ dialect=dialect,
)
def test_no_group_by_labels(self):
- lab1 = (table1.c.myid + 12).label('foo')
- lab2 = func.somefunc(table1.c.name).label('bar')
+ lab1 = (table1.c.myid + 12).label("foo")
+ lab2 = func.somefunc(table1.c.name).label("bar")
dialect = default.DefaultDialect()
self.assert_compile(
@@ -1038,140 +1182,140 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT mytable.myid + :myid_1 AS foo, somefunc(mytable.name) "
"AS bar FROM mytable GROUP BY mytable.myid + :myid_1, "
"somefunc(mytable.name)",
- dialect=dialect
+ dialect=dialect,
)
def test_conjunctions(self):
- a, b, c = text('a'), text('b'), text('c')
+ a, b, c = text("a"), text("b"), text("c")
x = and_(a, b, c)
assert isinstance(x.type, Boolean)
- assert str(x) == 'a AND b AND c'
+ assert str(x) == "a AND b AND c"
self.assert_compile(
- select([x.label('foo')]),
- 'SELECT a AND b AND c AS foo'
+ select([x.label("foo")]), "SELECT a AND b AND c AS foo"
)
self.assert_compile(
- and_(table1.c.myid == 12, table1.c.name == 'asdf',
- table2.c.othername == 'foo', text("sysdate() = today()")),
+ and_(
+ table1.c.myid == 12,
+ table1.c.name == "asdf",
+ table2.c.othername == "foo",
+ text("sysdate() = today()"),
+ ),
"mytable.myid = :myid_1 AND mytable.name = :name_1 "
"AND myothertable.othername = "
- ":othername_1 AND sysdate() = today()"
+ ":othername_1 AND sysdate() = today()",
)
self.assert_compile(
and_(
table1.c.myid == 12,
- or_(table2.c.othername == 'asdf',
- table2.c.othername == 'foo', table2.c.otherid == 9),
+ or_(
+ table2.c.othername == "asdf",
+ table2.c.othername == "foo",
+ table2.c.otherid == 9,
+ ),
text("sysdate() = today()"),
),
- 'mytable.myid = :myid_1 AND (myothertable.othername = '
- ':othername_1 OR myothertable.othername = :othername_2 OR '
- 'myothertable.otherid = :otherid_1) AND sysdate() = '
- 'today()',
- checkparams={'othername_1': 'asdf', 'othername_2': 'foo',
- 'otherid_1': 9, 'myid_1': 12}
+ "mytable.myid = :myid_1 AND (myothertable.othername = "
+ ":othername_1 OR myothertable.othername = :othername_2 OR "
+ "myothertable.otherid = :otherid_1) AND sysdate() = "
+ "today()",
+ checkparams={
+ "othername_1": "asdf",
+ "othername_2": "foo",
+ "otherid_1": 9,
+ "myid_1": 12,
+ },
)
# test a generator
self.assert_compile(
and_(
- conj for conj in [
- table1.c.myid == 12,
- table1.c.name == 'asdf'
- ]
+ conj for conj in [table1.c.myid == 12, table1.c.name == "asdf"]
),
- "mytable.myid = :myid_1 AND mytable.name = :name_1"
+ "mytable.myid = :myid_1 AND mytable.name = :name_1",
)
def test_nested_conjunctions_short_circuit(self):
"""test that empty or_(), and_() conjunctions are collapsed by
an enclosing conjunction."""
- t = table('t', column('x'))
+ t = table("t", column("x"))
self.assert_compile(
- select([t]).where(and_(t.c.x == 5,
- or_(and_(or_(t.c.x == 7))))),
- "SELECT t.x FROM t WHERE t.x = :x_1 AND t.x = :x_2"
+ select([t]).where(and_(t.c.x == 5, or_(and_(or_(t.c.x == 7))))),
+ "SELECT t.x FROM t WHERE t.x = :x_1 AND t.x = :x_2",
)
self.assert_compile(
- select([t]).where(and_(or_(t.c.x == 12,
- and_(or_(t.c.x == 8))))),
- "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2"
+ select([t]).where(and_(or_(t.c.x == 12, and_(or_(t.c.x == 8))))),
+ "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2",
)
self.assert_compile(
- select([t]).
- where(
+ select([t]).where(
and_(
or_(
or_(t.c.x == 12),
- and_(
- or_(),
- or_(and_(t.c.x == 8)),
- and_()
- )
+ and_(or_(), or_(and_(t.c.x == 8)), and_()),
)
)
),
- "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2"
+ "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2",
)
def test_true_short_circuit(self):
- t = table('t', column('x'))
+ t = table("t", column("x"))
self.assert_compile(
select([t]).where(true()),
"SELECT t.x FROM t WHERE 1 = 1",
- dialect=default.DefaultDialect(supports_native_boolean=False)
+ dialect=default.DefaultDialect(supports_native_boolean=False),
)
self.assert_compile(
select([t]).where(true()),
"SELECT t.x FROM t WHERE true",
- dialect=default.DefaultDialect(supports_native_boolean=True)
+ dialect=default.DefaultDialect(supports_native_boolean=True),
)
self.assert_compile(
select([t]),
"SELECT t.x FROM t",
- dialect=default.DefaultDialect(supports_native_boolean=True)
+ dialect=default.DefaultDialect(supports_native_boolean=True),
)
def test_distinct(self):
self.assert_compile(
select([table1.c.myid.distinct()]),
- "SELECT DISTINCT mytable.myid FROM mytable"
+ "SELECT DISTINCT mytable.myid FROM mytable",
)
self.assert_compile(
select([distinct(table1.c.myid)]),
- "SELECT DISTINCT mytable.myid FROM mytable"
+ "SELECT DISTINCT mytable.myid FROM mytable",
)
self.assert_compile(
select([table1.c.myid]).distinct(),
- "SELECT DISTINCT mytable.myid FROM mytable"
+ "SELECT DISTINCT mytable.myid FROM mytable",
)
self.assert_compile(
select([func.count(table1.c.myid.distinct())]),
- "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable"
+ "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable",
)
self.assert_compile(
select([func.count(distinct(table1.c.myid))]),
- "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable"
+ "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable",
)
def test_where_empty(self):
self.assert_compile(
select([table1.c.myid]).where(and_()),
- "SELECT mytable.myid FROM mytable"
+ "SELECT mytable.myid FROM mytable",
)
self.assert_compile(
select([table1.c.myid]).where(or_()),
- "SELECT mytable.myid FROM mytable"
+ "SELECT mytable.myid FROM mytable",
)
def test_multiple_col_binds(self):
@@ -1179,133 +1323,165 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
select(
[literal_column("*")],
or_(
- table1.c.myid == 12, table1.c.myid == 'asdf',
- table1.c.myid == 'foo')
+ table1.c.myid == 12,
+ table1.c.myid == "asdf",
+ table1.c.myid == "foo",
+ ),
),
"SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
- "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3"
+ "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3",
)
def test_order_by_nulls(self):
self.assert_compile(
- table2.select(order_by=[table2.c.otherid,
- table2.c.othername.desc().nullsfirst()]),
+ table2.select(
+ order_by=[
+ table2.c.otherid,
+ table2.c.othername.desc().nullsfirst(),
+ ]
+ ),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, "
- "myothertable.othername DESC NULLS FIRST"
+ "myothertable.othername DESC NULLS FIRST",
)
self.assert_compile(
- table2.select(order_by=[
- table2.c.otherid, table2.c.othername.desc().nullslast()]),
+ table2.select(
+ order_by=[
+ table2.c.otherid,
+ table2.c.othername.desc().nullslast(),
+ ]
+ ),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, "
- "myothertable.othername DESC NULLS LAST"
+ "myothertable.othername DESC NULLS LAST",
)
self.assert_compile(
- table2.select(order_by=[
- table2.c.otherid.nullslast(),
- table2.c.othername.desc().nullsfirst()]),
+ table2.select(
+ order_by=[
+ table2.c.otherid.nullslast(),
+ table2.c.othername.desc().nullsfirst(),
+ ]
+ ),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid NULLS LAST, "
- "myothertable.othername DESC NULLS FIRST"
+ "myothertable.othername DESC NULLS FIRST",
)
self.assert_compile(
- table2.select(order_by=[table2.c.otherid.nullsfirst(),
- table2.c.othername.desc()]),
+ table2.select(
+ order_by=[
+ table2.c.otherid.nullsfirst(),
+ table2.c.othername.desc(),
+ ]
+ ),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid NULLS FIRST, "
- "myothertable.othername DESC"
+ "myothertable.othername DESC",
)
self.assert_compile(
- table2.select(order_by=[table2.c.otherid.nullsfirst(),
- table2.c.othername.desc().nullslast()]),
+ table2.select(
+ order_by=[
+ table2.c.otherid.nullsfirst(),
+ table2.c.othername.desc().nullslast(),
+ ]
+ ),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid NULLS FIRST, "
- "myothertable.othername DESC NULLS LAST"
+ "myothertable.othername DESC NULLS LAST",
)
def test_orderby_groupby(self):
self.assert_compile(
- table2.select(order_by=[table2.c.otherid,
- asc(table2.c.othername)]),
+ table2.select(
+ order_by=[table2.c.otherid, asc(table2.c.othername)]
+ ),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, "
- "myothertable.othername ASC"
+ "myothertable.othername ASC",
)
self.assert_compile(
- table2.select(order_by=[table2.c.otherid,
- table2.c.othername.desc()]),
+ table2.select(
+ order_by=[table2.c.otherid, table2.c.othername.desc()]
+ ),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, "
- "myothertable.othername DESC"
+ "myothertable.othername DESC",
)
# generative order_by
self.assert_compile(
- table2.select().order_by(table2.c.otherid).
- order_by(table2.c.othername.desc()),
+ table2.select()
+ .order_by(table2.c.otherid)
+ .order_by(table2.c.othername.desc()),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, "
- "myothertable.othername DESC"
+ "myothertable.othername DESC",
)
self.assert_compile(
- table2.select().order_by(table2.c.otherid).
- order_by(table2.c.othername.desc()
- ).order_by(None),
+ table2.select()
+ .order_by(table2.c.otherid)
+ .order_by(table2.c.othername.desc())
+ .order_by(None),
"SELECT myothertable.otherid, myothertable.othername "
- "FROM myothertable"
+ "FROM myothertable",
)
self.assert_compile(
select(
[table2.c.othername, func.count(table2.c.otherid)],
- group_by=[table2.c.othername]),
+ group_by=[table2.c.othername],
+ ),
"SELECT myothertable.othername, "
"count(myothertable.otherid) AS count_1 "
- "FROM myothertable GROUP BY myothertable.othername"
+ "FROM myothertable GROUP BY myothertable.othername",
)
# generative group by
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)]).
- group_by(table2.c.othername),
+ select(
+ [table2.c.othername, func.count(table2.c.otherid)]
+ ).group_by(table2.c.othername),
"SELECT myothertable.othername, "
"count(myothertable.otherid) AS count_1 "
- "FROM myothertable GROUP BY myothertable.othername"
+ "FROM myothertable GROUP BY myothertable.othername",
)
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)]).
- group_by(table2.c.othername).group_by(None),
+ select([table2.c.othername, func.count(table2.c.otherid)])
+ .group_by(table2.c.othername)
+ .group_by(None),
"SELECT myothertable.othername, "
"count(myothertable.otherid) AS count_1 "
- "FROM myothertable"
+ "FROM myothertable",
)
self.assert_compile(
- select([table2.c.othername, func.count(table2.c.otherid)],
- group_by=[table2.c.othername],
- order_by=[table2.c.othername]),
+ select(
+ [table2.c.othername, func.count(table2.c.otherid)],
+ group_by=[table2.c.othername],
+ order_by=[table2.c.othername],
+ ),
"SELECT myothertable.othername, "
"count(myothertable.otherid) AS count_1 "
"FROM myothertable "
- "GROUP BY myothertable.othername ORDER BY myothertable.othername"
+ "GROUP BY myothertable.othername ORDER BY myothertable.othername",
)
def test_custom_order_by_clause(self):
class CustomCompiler(PGCompiler):
def order_by_clause(self, select, **kw):
- return super(CustomCompiler, self).\
- order_by_clause(select, **kw) + " CUSTOMIZED"
+ return (
+ super(CustomCompiler, self).order_by_clause(select, **kw)
+ + " CUSTOMIZED"
+ )
class CustomDialect(PGDialect):
- name = 'custom'
+ name = "custom"
statement_compiler = CustomCompiler
stmt = select([table1.c.myid]).order_by(table1.c.myid)
@@ -1313,17 +1489,19 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
stmt,
"SELECT mytable.myid FROM mytable ORDER BY "
"mytable.myid CUSTOMIZED",
- dialect=CustomDialect()
+ dialect=CustomDialect(),
)
def test_custom_group_by_clause(self):
class CustomCompiler(PGCompiler):
def group_by_clause(self, select, **kw):
- return super(CustomCompiler, self).\
- group_by_clause(select, **kw) + " CUSTOMIZED"
+ return (
+ super(CustomCompiler, self).group_by_clause(select, **kw)
+ + " CUSTOMIZED"
+ )
class CustomDialect(PGDialect):
- name = 'custom'
+ name = "custom"
statement_compiler = CustomCompiler
stmt = select([table1.c.myid]).group_by(table1.c.myid)
@@ -1331,44 +1509,51 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
stmt,
"SELECT mytable.myid FROM mytable GROUP BY "
"mytable.myid CUSTOMIZED",
- dialect=CustomDialect()
+ dialect=CustomDialect(),
)
def test_for_update(self):
self.assert_compile(
table1.select(table1.c.myid == 7).with_for_update(),
"SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
+ )
# not supported by dialect, should just use update
self.assert_compile(
table1.select(table1.c.myid == 7).with_for_update(nowait=True),
"SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
+ "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
+ )
assert_raises_message(
exc.ArgumentError,
"Unknown for_update argument: 'unknown_mode'",
- table1.select, table1.c.myid == 7, for_update='unknown_mode'
+ table1.select,
+ table1.c.myid == 7,
+ for_update="unknown_mode",
)
def test_alias(self):
# test the alias for a table1. column names stay the same,
# table name "changes" to "foo".
self.assert_compile(
- select([table1.alias('foo')]),
- "SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
+ select([table1.alias("foo")]),
+ "SELECT foo.myid, foo.name, foo.description FROM mytable AS foo",
+ )
for dialect in (oracle.dialect(),):
self.assert_compile(
- select([table1.alias('foo')]),
+ select([table1.alias("foo")]),
"SELECT foo.myid, foo.name, foo.description FROM mytable foo",
- dialect=dialect)
+ dialect=dialect,
+ )
self.assert_compile(
select([table1.alias()]),
"SELECT mytable_1.myid, mytable_1.name, mytable_1.description "
- "FROM mytable AS mytable_1")
+ "FROM mytable AS mytable_1",
+ )
# create a select for a join of two tables. use_labels
# means the column names will have labels tablename_columnname,
@@ -1377,12 +1562,13 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
# from the first table1.
q = select(
[table1, table2.c.otherid],
- table1.c.myid == table2.c.otherid, use_labels=True
+ table1.c.myid == table2.c.otherid,
+ use_labels=True,
)
# make an alias of the "selectable". column names
# stay the same (i.e. the labels), table name "changes" to "t2view".
- a = alias(q, 't2view')
+ a = alias(q, "t2view")
# select from that alias, also using labels. two levels of labels
# should produce two underscores.
@@ -1401,26 +1587,26 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"myothertable_otherid FROM mytable, myothertable "
"WHERE mytable.myid = "
"myothertable.otherid) AS t2view "
- "WHERE t2view.mytable_myid = :mytable_myid_1"
+ "WHERE t2view.mytable_myid = :mytable_myid_1",
)
def test_prefix(self):
self.assert_compile(
- table1.select().prefix_with("SQL_CALC_FOUND_ROWS").
- prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
+ table1.select()
+ .prefix_with("SQL_CALC_FOUND_ROWS")
+ .prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
"SELECT SQL_CALC_FOUND_ROWS SQL_SOME_WEIRD_MYSQL_THING "
- "mytable.myid, mytable.name, mytable.description FROM mytable"
+ "mytable.myid, mytable.name, mytable.description FROM mytable",
)
def test_prefix_dialect_specific(self):
self.assert_compile(
- table1.select().prefix_with("SQL_CALC_FOUND_ROWS",
- dialect='sqlite').
- prefix_with("SQL_SOME_WEIRD_MYSQL_THING",
- dialect='mysql'),
+ table1.select()
+ .prefix_with("SQL_CALC_FOUND_ROWS", dialect="sqlite")
+ .prefix_with("SQL_SOME_WEIRD_MYSQL_THING", dialect="mysql"),
"SELECT SQL_SOME_WEIRD_MYSQL_THING "
"mytable.myid, mytable.name, mytable.description FROM mytable",
- dialect=mysql.dialect()
+ dialect=mysql.dialect(),
)
def test_render_binds_as_literal(self):
@@ -1431,140 +1617,149 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
class Compiler(dialect.statement_compiler):
ansi_bind_rules = True
+
dialect.statement_compiler = Compiler
self.assert_compile(
select([literal("someliteral")]),
"SELECT 'someliteral' AS anon_1",
- dialect=dialect
+ dialect=dialect,
)
self.assert_compile(
select([table1.c.myid + 3]),
"SELECT mytable.myid + 3 AS anon_1 FROM mytable",
- dialect=dialect
+ dialect=dialect,
)
self.assert_compile(
select([table1.c.myid.in_([4, 5, 6])]),
"SELECT mytable.myid IN (4, 5, 6) AS anon_1 FROM mytable",
- dialect=dialect
+ dialect=dialect,
)
self.assert_compile(
select([func.mod(table1.c.myid, 5)]),
"SELECT mod(mytable.myid, 5) AS mod_1 FROM mytable",
- dialect=dialect
+ dialect=dialect,
)
self.assert_compile(
select([literal("foo").in_([])]),
"SELECT 1 != 1 AS anon_1",
- dialect=dialect
+ dialect=dialect,
)
self.assert_compile(
select([literal(util.b("foo"))]),
"SELECT 'foo' AS anon_1",
- dialect=dialect
+ dialect=dialect,
)
# test callable
self.assert_compile(
select([table1.c.myid == bindparam("foo", callable_=lambda: 5)]),
"SELECT mytable.myid = 5 AS anon_1 FROM mytable",
- dialect=dialect
+ dialect=dialect,
)
- empty_in_dialect = default.DefaultDialect(empty_in_strategy='dynamic')
+ empty_in_dialect = default.DefaultDialect(empty_in_strategy="dynamic")
empty_in_dialect.statement_compiler = Compiler
assert_raises_message(
exc.CompileError,
"Bind parameter 'foo' without a "
"renderable value not allowed here.",
- bindparam("foo").in_(
- []).compile,
- dialect=empty_in_dialect)
+ bindparam("foo").in_([]).compile,
+ dialect=empty_in_dialect,
+ )
def test_collate(self):
# columns clause
self.assert_compile(
- select([column('x').collate('bar')]),
- "SELECT x COLLATE bar AS anon_1"
+ select([column("x").collate("bar")]),
+ "SELECT x COLLATE bar AS anon_1",
)
# WHERE clause
self.assert_compile(
- select([column('x')]).where(column('x').collate('bar') == 'foo'),
- "SELECT x WHERE (x COLLATE bar) = :param_1"
+ select([column("x")]).where(column("x").collate("bar") == "foo"),
+ "SELECT x WHERE (x COLLATE bar) = :param_1",
)
# ORDER BY clause
self.assert_compile(
- select([column('x')]).order_by(column('x').collate('bar')),
- "SELECT x ORDER BY x COLLATE bar"
+ select([column("x")]).order_by(column("x").collate("bar")),
+ "SELECT x ORDER BY x COLLATE bar",
)
def test_literal(self):
- self.assert_compile(select([literal('foo')]),
- "SELECT :param_1 AS anon_1")
+ self.assert_compile(
+ select([literal("foo")]), "SELECT :param_1 AS anon_1"
+ )
self.assert_compile(
- select(
- [
- literal("foo") +
- literal("bar")],
- from_obj=[table1]),
- "SELECT :param_1 || :param_2 AS anon_1 FROM mytable")
+ select([literal("foo") + literal("bar")], from_obj=[table1]),
+ "SELECT :param_1 || :param_2 AS anon_1 FROM mytable",
+ )
def test_calculated_columns(self):
- value_tbl = table('values',
- column('id', Integer),
- column('val1', Float),
- column('val2', Float),
- )
+ value_tbl = table(
+ "values",
+ column("id", Integer),
+ column("val1", Float),
+ column("val2", Float),
+ )
self.assert_compile(
- select([value_tbl.c.id, (value_tbl.c.val2 -
- value_tbl.c.val1) / value_tbl.c.val1]),
+ select(
+ [
+ value_tbl.c.id,
+ (value_tbl.c.val2 - value_tbl.c.val1) / value_tbl.c.val1,
+ ]
+ ),
"SELECT values.id, (values.val2 - values.val1) "
- "/ values.val1 AS anon_1 FROM values"
+ "/ values.val1 AS anon_1 FROM values",
)
self.assert_compile(
- select([
- value_tbl.c.id],
- (value_tbl.c.val2 - value_tbl.c.val1) /
- value_tbl.c.val1 > 2.0),
+ select(
+ [value_tbl.c.id],
+ (value_tbl.c.val2 - value_tbl.c.val1) / value_tbl.c.val1 > 2.0,
+ ),
"SELECT values.id FROM values WHERE "
- "(values.val2 - values.val1) / values.val1 > :param_1"
+ "(values.val2 - values.val1) / values.val1 > :param_1",
)
self.assert_compile(
- select([value_tbl.c.id], value_tbl.c.val1 /
- (value_tbl.c.val2 - value_tbl.c.val1) /
- value_tbl.c.val1 > 2.0),
+ select(
+ [value_tbl.c.id],
+ value_tbl.c.val1
+ / (value_tbl.c.val2 - value_tbl.c.val1)
+ / value_tbl.c.val1
+ > 2.0,
+ ),
"SELECT values.id FROM values WHERE "
"(values.val1 / (values.val2 - values.val1)) "
- "/ values.val1 > :param_1"
+ "/ values.val1 > :param_1",
)
def test_percent_chars(self):
- t = table("table%name",
- column("percent%"),
- column("%(oneofthese)s"),
- column("spaces % more spaces"),
- )
+ t = table(
+ "table%name",
+ column("percent%"),
+ column("%(oneofthese)s"),
+ column("spaces % more spaces"),
+ )
self.assert_compile(
t.select(use_labels=True),
- '''SELECT "table%name"."percent%" AS "table%name_percent%", '''
- '''"table%name"."%(oneofthese)s" AS '''
- '''"table%name_%(oneofthese)s", '''
- '''"table%name"."spaces % more spaces" AS '''
- '''"table%name_spaces % '''
- '''more spaces" FROM "table%name"'''
+ """SELECT "table%name"."percent%" AS "table%name_percent%", """
+ """"table%name"."%(oneofthese)s" AS """
+ """"table%name_%(oneofthese)s", """
+ """"table%name"."spaces % more spaces" AS """
+ """"table%name_spaces % """
+ '''more spaces" FROM "table%name"''',
)
def test_joins(self):
@@ -1572,22 +1767,31 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
join(table2, table1, table1.c.myid == table2.c.otherid).select(),
"SELECT myothertable.otherid, myothertable.othername, "
"mytable.myid, mytable.name, mytable.description FROM "
- "myothertable JOIN mytable ON mytable.myid = myothertable.otherid"
+ "myothertable JOIN mytable ON mytable.myid = myothertable.otherid",
)
self.assert_compile(
select(
[table1],
- from_obj=[join(table1, table2, table1.c.myid
- == table2.c.otherid)]
+ from_obj=[
+ join(table1, table2, table1.c.myid == table2.c.otherid)
+ ],
),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
- "mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
+ "mytable JOIN myothertable ON mytable.myid = myothertable.otherid",
+ )
self.assert_compile(
select(
- [join(join(table1, table2, table1.c.myid == table2.c.otherid),
- table3, table1.c.myid == table3.c.userid)]
+ [
+ join(
+ join(
+ table1, table2, table1.c.myid == table2.c.otherid
+ ),
+ table3,
+ table1.c.myid == table3.c.userid,
+ )
+ ]
),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername, "
@@ -1595,27 +1799,29 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"thirdtable.otherstuff FROM mytable JOIN myothertable "
"ON mytable.myid ="
" myothertable.otherid JOIN thirdtable ON "
- "mytable.myid = thirdtable.userid"
+ "mytable.myid = thirdtable.userid",
)
self.assert_compile(
- join(users, addresses, users.c.user_id ==
- addresses.c.user_id).select(),
+ join(
+ users, addresses, users.c.user_id == addresses.c.user_id
+ ).select(),
"SELECT users.user_id, users.user_name, users.password, "
"addresses.address_id, addresses.user_id, addresses.street, "
"addresses.city, addresses.state, addresses.zip "
"FROM users JOIN addresses "
- "ON users.user_id = addresses.user_id"
+ "ON users.user_id = addresses.user_id",
)
self.assert_compile(
- select([table1, table2, table3],
-
- from_obj=[join(table1, table2,
- table1.c.myid == table2.c.otherid).
- outerjoin(table3,
- table1.c.myid == table3.c.userid)]
- ),
+ select(
+ [table1, table2, table3],
+ from_obj=[
+ join(
+ table1, table2, table1.c.myid == table2.c.otherid
+ ).outerjoin(table3, table1.c.myid == table3.c.userid)
+ ],
+ ),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername, "
"thirdtable.userid,"
@@ -1623,15 +1829,21 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"JOIN myothertable ON mytable.myid "
"= myothertable.otherid LEFT OUTER JOIN thirdtable "
"ON mytable.myid ="
- " thirdtable.userid"
+ " thirdtable.userid",
)
self.assert_compile(
- select([table1, table2, table3],
- from_obj=[outerjoin(table1,
- join(table2, table3, table2.c.otherid
- == table3.c.userid),
- table1.c.myid == table2.c.otherid)]
- ),
+ select(
+ [table1, table2, table3],
+ from_obj=[
+ outerjoin(
+ table1,
+ join(
+ table2, table3, table2.c.otherid == table3.c.userid
+ ),
+ table1.c.myid == table2.c.otherid,
+ )
+ ],
+ ),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername, "
"thirdtable.userid,"
@@ -1639,47 +1851,49 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"(myothertable "
"JOIN thirdtable ON myothertable.otherid = "
"thirdtable.userid) ON "
- "mytable.myid = myothertable.otherid"
+ "mytable.myid = myothertable.otherid",
)
query = select(
[table1, table2],
or_(
- table1.c.name == 'fred',
+ table1.c.name == "fred",
table1.c.myid == 10,
- table2.c.othername != 'jack',
- text("EXISTS (select yay from foo where boo = lar)")
+ table2.c.othername != "jack",
+ text("EXISTS (select yay from foo where boo = lar)"),
),
- from_obj=[outerjoin(table1, table2,
- table1.c.myid == table2.c.otherid)]
+ from_obj=[
+ outerjoin(table1, table2, table1.c.myid == table2.c.otherid)
+ ],
)
self.assert_compile(
- query, "SELECT mytable.myid, mytable.name, mytable.description, "
+ query,
+ "SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername "
"FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = "
"myothertable.otherid WHERE mytable.name = :name_1 OR "
"mytable.myid = :myid_1 OR myothertable.othername != :othername_1 "
- "OR EXISTS (select yay from foo where boo = lar)", )
+ "OR EXISTS (select yay from foo where boo = lar)",
+ )
def test_full_outer_join(self):
for spec in [
join(table1, table2, table1.c.myid == table2.c.otherid, full=True),
outerjoin(
- table1, table2,
- table1.c.myid == table2.c.otherid, full=True),
- table1.join(
- table2,
- table1.c.myid == table2.c.otherid, full=True),
+ table1, table2, table1.c.myid == table2.c.otherid, full=True
+ ),
+ table1.join(table2, table1.c.myid == table2.c.otherid, full=True),
table1.outerjoin(
- table2,
- table1.c.myid == table2.c.otherid, full=True),
+ table2, table1.c.myid == table2.c.otherid, full=True
+ ),
]:
stmt = select([table1]).select_from(spec)
self.assert_compile(
stmt,
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable FULL OUTER JOIN myothertable "
- "ON mytable.myid = myothertable.otherid")
+ "ON mytable.myid = myothertable.otherid",
+ )
def test_compound_selects(self):
assert_raises_message(
@@ -1687,7 +1901,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"All selectables passed to CompoundSelect "
"must have identical numbers of columns; "
"select #1 has 2 columns, select #2 has 3",
- union, table3.select(), table1.select()
+ union,
+ table3.select(),
+ table1.select(),
)
x = union(
@@ -1697,36 +1913,39 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
)
self.assert_compile(
- x, "SELECT mytable.myid, mytable.name, "
+ x,
+ "SELECT mytable.myid, mytable.name, "
"mytable.description "
"FROM mytable WHERE "
"mytable.myid = :myid_1 UNION "
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable WHERE mytable.myid = :myid_2 "
- "ORDER BY mytable.myid")
-
- x = union(
- select([table1]),
- select([table1])
+ "ORDER BY mytable.myid",
)
+
+ x = union(select([table1]), select([table1]))
x = union(x, select([table1]))
self.assert_compile(
- x, "(SELECT mytable.myid, mytable.name, mytable.description "
+ x,
+ "(SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable UNION SELECT mytable.myid, mytable.name, "
"mytable.description FROM mytable) UNION SELECT mytable.myid,"
- " mytable.name, mytable.description FROM mytable")
+ " mytable.name, mytable.description FROM mytable",
+ )
u1 = union(
select([table1.c.myid, table1.c.name]),
select([table2]),
- select([table3])
+ select([table3]),
)
self.assert_compile(
- u1, "SELECT mytable.myid, mytable.name "
+ u1,
+ "SELECT mytable.myid, mytable.name "
"FROM mytable UNION SELECT myothertable.otherid, "
"myothertable.othername FROM myothertable "
"UNION SELECT thirdtable.userid, thirdtable.otherstuff "
- "FROM thirdtable")
+ "FROM thirdtable",
+ )
assert u1.corresponding_column(table2.c.otherid) is u1.c.myid
@@ -1734,25 +1953,30 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
union(
select([table1.c.myid, table1.c.name]),
select([table2]),
- order_by=['myid'],
+ order_by=["myid"],
offset=10,
- limit=5
+ limit=5,
),
"SELECT mytable.myid, mytable.name "
"FROM mytable UNION SELECT myothertable.otherid, "
"myothertable.othername "
"FROM myothertable ORDER BY myid " # note table name is omitted
"LIMIT :param_1 OFFSET :param_2",
- {'param_1': 5, 'param_2': 10}
+ {"param_1": 5, "param_2": 10},
)
self.assert_compile(
union(
- select([table1.c.myid, table1.c.name,
- func.max(table1.c.description)],
- table1.c.name == 'name2',
- group_by=[table1.c.myid, table1.c.name]),
- table1.select(table1.c.name == 'name1')
+ select(
+ [
+ table1.c.myid,
+ table1.c.name,
+ func.max(table1.c.description),
+ ],
+ table1.c.name == "name2",
+ group_by=[table1.c.myid, table1.c.name],
+ ),
+ table1.select(table1.c.name == "name1"),
),
"SELECT mytable.myid, mytable.name, "
"max(mytable.description) AS max_1 "
@@ -1760,183 +1984,155 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"GROUP BY mytable.myid, "
"mytable.name UNION SELECT mytable.myid, mytable.name, "
"mytable.description "
- "FROM mytable WHERE mytable.name = :name_2"
+ "FROM mytable WHERE mytable.name = :name_2",
)
self.assert_compile(
union(
- select([literal(100).label('value')]),
- select([literal(200).label('value')])
+ select([literal(100).label("value")]),
+ select([literal(200).label("value")]),
),
- "SELECT :param_1 AS value UNION SELECT :param_2 AS value"
+ "SELECT :param_1 AS value UNION SELECT :param_2 AS value",
)
self.assert_compile(
union_all(
select([table1.c.myid]),
- union(
- select([table2.c.otherid]),
- select([table3.c.userid]),
- )
+ union(select([table2.c.otherid]), select([table3.c.userid])),
),
-
"SELECT mytable.myid FROM mytable UNION ALL "
"(SELECT myothertable.otherid FROM myothertable UNION "
- "SELECT thirdtable.userid FROM thirdtable)"
+ "SELECT thirdtable.userid FROM thirdtable)",
)
- s = select([column('foo'), column('bar')])
+ s = select([column("foo"), column("bar")])
self.assert_compile(
- union(
- s.order_by("foo"),
- s.order_by("bar")),
+ union(s.order_by("foo"), s.order_by("bar")),
"(SELECT foo, bar ORDER BY foo) UNION "
- "(SELECT foo, bar ORDER BY bar)")
+ "(SELECT foo, bar ORDER BY bar)",
+ )
self.assert_compile(
- union(s.order_by("foo").self_group(),
- s.order_by("bar").limit(10).self_group()),
+ union(
+ s.order_by("foo").self_group(),
+ s.order_by("bar").limit(10).self_group(),
+ ),
"(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, "
"bar ORDER BY bar LIMIT :param_1)",
- {'param_1': 10}
-
+ {"param_1": 10},
)
def test_compound_grouping(self):
- s = select([column('foo'), column('bar')]).select_from(text('bat'))
+ s = select([column("foo"), column("bar")]).select_from(text("bat"))
self.assert_compile(
union(union(union(s, s), s), s),
"((SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) "
- "UNION SELECT foo, bar FROM bat) UNION SELECT foo, bar FROM bat"
+ "UNION SELECT foo, bar FROM bat) UNION SELECT foo, bar FROM bat",
)
self.assert_compile(
union(s, s, s, s),
"SELECT foo, bar FROM bat UNION SELECT foo, bar "
"FROM bat UNION SELECT foo, bar FROM bat "
- "UNION SELECT foo, bar FROM bat"
+ "UNION SELECT foo, bar FROM bat",
)
self.assert_compile(
union(s, union(s, union(s, s))),
"SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat "
"UNION (SELECT foo, bar FROM bat "
- "UNION SELECT foo, bar FROM bat))"
+ "UNION SELECT foo, bar FROM bat))",
)
self.assert_compile(
select([s.alias()]),
- 'SELECT anon_1.foo, anon_1.bar FROM '
- '(SELECT foo, bar FROM bat) AS anon_1'
+ "SELECT anon_1.foo, anon_1.bar FROM "
+ "(SELECT foo, bar FROM bat) AS anon_1",
)
self.assert_compile(
select([union(s, s).alias()]),
- 'SELECT anon_1.foo, anon_1.bar FROM '
- '(SELECT foo, bar FROM bat UNION '
- 'SELECT foo, bar FROM bat) AS anon_1'
+ "SELECT anon_1.foo, anon_1.bar FROM "
+ "(SELECT foo, bar FROM bat UNION "
+ "SELECT foo, bar FROM bat) AS anon_1",
)
self.assert_compile(
select([except_(s, s).alias()]),
- 'SELECT anon_1.foo, anon_1.bar FROM '
- '(SELECT foo, bar FROM bat EXCEPT '
- 'SELECT foo, bar FROM bat) AS anon_1'
+ "SELECT anon_1.foo, anon_1.bar FROM "
+ "(SELECT foo, bar FROM bat EXCEPT "
+ "SELECT foo, bar FROM bat) AS anon_1",
)
# this query sqlite specifically chokes on
self.assert_compile(
- union(
- except_(s, s),
- s
- ),
+ union(except_(s, s), s),
"(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) "
- "UNION SELECT foo, bar FROM bat"
+ "UNION SELECT foo, bar FROM bat",
)
self.assert_compile(
- union(
- s,
- except_(s, s),
- ),
+ union(s, except_(s, s)),
"SELECT foo, bar FROM bat "
- "UNION (SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat)"
+ "UNION (SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat)",
)
# this solves it
self.assert_compile(
- union(
- except_(s, s).alias().select(),
- s
- ),
+ union(except_(s, s).alias().select(), s),
"SELECT anon_1.foo, anon_1.bar FROM "
"(SELECT foo, bar FROM bat EXCEPT "
"SELECT foo, bar FROM bat) AS anon_1 "
- "UNION SELECT foo, bar FROM bat"
+ "UNION SELECT foo, bar FROM bat",
)
self.assert_compile(
- except_(
- union(s, s),
- union(s, s)
- ),
+ except_(union(s, s), union(s, s)),
"(SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) "
- "EXCEPT (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat)"
+ "EXCEPT (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat)",
)
s2 = union(s, s)
s3 = union(s2, s2)
- self.assert_compile(s3, "(SELECT foo, bar FROM bat "
- "UNION SELECT foo, bar FROM bat) "
- "UNION (SELECT foo, bar FROM bat "
- "UNION SELECT foo, bar FROM bat)")
+ self.assert_compile(
+ s3,
+ "(SELECT foo, bar FROM bat "
+ "UNION SELECT foo, bar FROM bat) "
+ "UNION (SELECT foo, bar FROM bat "
+ "UNION SELECT foo, bar FROM bat)",
+ )
self.assert_compile(
- union(
- intersect(s, s),
- intersect(s, s)
- ),
+ union(intersect(s, s), intersect(s, s)),
"(SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat) "
"UNION (SELECT foo, bar FROM bat INTERSECT "
- "SELECT foo, bar FROM bat)"
+ "SELECT foo, bar FROM bat)",
)
# tests for [ticket:2528]
# sqlite hates all of these.
self.assert_compile(
- union(
- s.limit(1),
- s.offset(2)
- ),
+ union(s.limit(1), s.offset(2)),
"(SELECT foo, bar FROM bat LIMIT :param_1) "
- "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_2)"
+ "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_2)",
)
self.assert_compile(
- union(
- s.order_by(column('bar')),
- s.offset(2)
- ),
+ union(s.order_by(column("bar")), s.offset(2)),
"(SELECT foo, bar FROM bat ORDER BY bar) "
- "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_1)"
+ "UNION (SELECT foo, bar FROM bat LIMIT -1 OFFSET :param_1)",
)
self.assert_compile(
- union(
- s.limit(1).alias('a'),
- s.limit(2).alias('b')
- ),
+ union(s.limit(1).alias("a"), s.limit(2).alias("b")),
"(SELECT foo, bar FROM bat LIMIT :param_1) "
- "UNION (SELECT foo, bar FROM bat LIMIT :param_2)"
+ "UNION (SELECT foo, bar FROM bat LIMIT :param_2)",
)
self.assert_compile(
- union(
- s.limit(1).self_group(),
- s.limit(2).self_group()
- ),
+ union(s.limit(1).self_group(), s.limit(2).self_group()),
"(SELECT foo, bar FROM bat LIMIT :param_1) "
- "UNION (SELECT foo, bar FROM bat LIMIT :param_2)"
+ "UNION (SELECT foo, bar FROM bat LIMIT :param_2)",
)
self.assert_compile(
@@ -1944,22 +2140,19 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT anon_1.foo, anon_1.bar FROM "
"((SELECT foo, bar FROM bat LIMIT :param_1) "
"UNION (SELECT foo, bar FROM bat LIMIT :param_2 OFFSET :param_3)) "
- "AS anon_1"
+ "AS anon_1",
)
# this version works for SQLite
self.assert_compile(
- union(
- s.limit(1).alias().select(),
- s.offset(2).alias().select(),
- ),
+ union(s.limit(1).alias().select(), s.offset(2).alias().select()),
"SELECT anon_1.foo, anon_1.bar "
"FROM (SELECT foo, bar FROM bat"
" LIMIT :param_1) AS anon_1 "
"UNION SELECT anon_2.foo, anon_2.bar "
"FROM (SELECT foo, bar "
"FROM bat"
- " LIMIT -1 OFFSET :param_2) AS anon_2"
+ " LIMIT -1 OFFSET :param_2) AS anon_2",
)
def test_binds(self):
@@ -1971,15 +2164,16 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
expected_default_params_list,
test_param_dict,
expected_test_params_dict,
- expected_test_params_list
+ expected_test_params_list,
) in [
(
select(
[table1, table2],
and_(
table1.c.myid == table2.c.otherid,
- table1.c.name == bindparam('mytablename')
- )),
+ table1.c.name == bindparam("mytablename"),
+ ),
+ ),
"SELECT mytable.myid, mytable.name, mytable.description, "
"myothertable.otherid, myothertable.othername FROM mytable, "
"myothertable WHERE mytable.myid = myothertable.otherid "
@@ -1988,55 +2182,80 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"myothertable.otherid, myothertable.othername FROM mytable, "
"myothertable WHERE mytable.myid = myothertable.otherid AND "
"mytable.name = ?",
- {'mytablename': None}, [None],
- {'mytablename': 5}, {'mytablename': 5}, [5]
+ {"mytablename": None},
+ [None],
+ {"mytablename": 5},
+ {"mytablename": 5},
+ [5],
),
(
- select([table1], or_(table1.c.myid == bindparam('myid'),
- table2.c.otherid == bindparam('myid'))),
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myid"),
+ ),
+ ),
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable, myothertable WHERE mytable.myid = :myid "
"OR myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description "
"FROM mytable, myothertable WHERE mytable.myid = ? "
"OR myothertable.otherid = ?",
- {'myid': None}, [None, None],
- {'myid': 5}, {'myid': 5}, [5, 5]
+ {"myid": None},
+ [None, None],
+ {"myid": 5},
+ {"myid": 5},
+ [5, 5],
),
(
- text("SELECT mytable.myid, mytable.name, "
- "mytable.description FROM "
- "mytable, myothertable WHERE mytable.myid = :myid OR "
- "myothertable.otherid = :myid"),
+ text(
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM "
+ "mytable, myothertable WHERE mytable.myid = :myid OR "
+ "myothertable.otherid = :myid"
+ ),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = :myid OR "
"myothertable.otherid = :myid",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = ? OR "
"myothertable.otherid = ?",
- {'myid': None}, [None, None],
- {'myid': 5}, {'myid': 5}, [5, 5]
+ {"myid": None},
+ [None, None],
+ {"myid": 5},
+ {"myid": 5},
+ [5, 5],
),
(
- select([table1], or_(table1.c.myid ==
- bindparam('myid', unique=True),
- table2.c.otherid ==
- bindparam('myid', unique=True))),
+ select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid", unique=True),
+ table2.c.otherid == bindparam("myid", unique=True),
+ ),
+ ),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = ? "
"OR myothertable.otherid = ?",
- {'myid_1': None, 'myid_2': None}, [None, None],
- {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6]
+ {"myid_1": None, "myid_2": None},
+ [None, None],
+ {"myid_1": 5, "myid_2": 6},
+ {"myid_1": 5, "myid_2": 6},
+ [5, 6],
),
(
- bindparam('test', type_=String, required=False) + text("'hi'"),
+ bindparam("test", type_=String, required=False) + text("'hi'"),
":test || 'hi'",
"? || 'hi'",
- {'test': None}, [None],
- {}, {'test': None}, [None]
+ {"test": None},
+ [None],
+ {},
+ {"test": None},
+ [None],
),
(
# testing select.params() here - bindparam() objects
@@ -2044,89 +2263,125 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
select(
[table1],
or_(
- table1.c.myid == bindparam('myid'),
- table2.c.otherid == bindparam('myotherid')
- )).params({'myid': 8, 'myotherid': 7}),
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myotherid"),
+ ),
+ ).params({"myid": 8, "myotherid": 7}),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid OR myothertable.otherid = :myotherid",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
"? OR myothertable.otherid = ?",
- {'myid': 8, 'myotherid': 7}, [8, 7],
- {'myid': 5}, {'myid': 5, 'myotherid': 7}, [5, 7]
+ {"myid": 8, "myotherid": 7},
+ [8, 7],
+ {"myid": 5},
+ {"myid": 5, "myotherid": 7},
+ [5, 7],
),
(
- select([table1], or_(table1.c.myid ==
- bindparam('myid', value=7, unique=True),
- table2.c.otherid ==
- bindparam('myid', value=8, unique=True))),
+ select(
+ [table1],
+ or_(
+ table1.c.myid
+ == bindparam("myid", value=7, unique=True),
+ table2.c.otherid
+ == bindparam("myid", value=8, unique=True),
+ ),
+ ),
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
":myid_1 OR myothertable.otherid = :myid_2",
"SELECT mytable.myid, mytable.name, mytable.description FROM "
"mytable, myothertable WHERE mytable.myid = "
"? OR myothertable.otherid = ?",
- {'myid_1': 7, 'myid_2': 8}, [7, 8],
- {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6]
+ {"myid_1": 7, "myid_2": 8},
+ [7, 8],
+ {"myid_1": 5, "myid_2": 6},
+ {"myid_1": 5, "myid_2": 6},
+ [5, 6],
),
]:
- self.assert_compile(stmt, expected_named_stmt,
- params=expected_default_params_dict)
- self.assert_compile(stmt, expected_positional_stmt,
- dialect=sqlite.dialect())
+ self.assert_compile(
+ stmt, expected_named_stmt, params=expected_default_params_dict
+ )
+ self.assert_compile(
+ stmt, expected_positional_stmt, dialect=sqlite.dialect()
+ )
nonpositional = stmt.compile()
positional = stmt.compile(dialect=sqlite.dialect())
pp = positional.params
- eq_([pp[k] for k in positional.positiontup],
- expected_default_params_list)
+ eq_(
+ [pp[k] for k in positional.positiontup],
+ expected_default_params_list,
+ )
- eq_(nonpositional.construct_params(test_param_dict),
- expected_test_params_dict)
+ eq_(
+ nonpositional.construct_params(test_param_dict),
+ expected_test_params_dict,
+ )
pp = positional.construct_params(test_param_dict)
eq_(
[pp[k] for k in positional.positiontup],
- expected_test_params_list
+ expected_test_params_list,
)
# check that params() doesn't modify original statement
- s = select([table1], or_(table1.c.myid == bindparam('myid'),
- table2.c.otherid ==
- bindparam('myotherid')))
- s2 = s.params({'myid': 8, 'myotherid': 7})
- s3 = s2.params({'myid': 9})
- assert s.compile().params == {'myid': None, 'myotherid': None}
- assert s2.compile().params == {'myid': 8, 'myotherid': 7}
- assert s3.compile().params == {'myid': 9, 'myotherid': 7}
+ s = select(
+ [table1],
+ or_(
+ table1.c.myid == bindparam("myid"),
+ table2.c.otherid == bindparam("myotherid"),
+ ),
+ )
+ s2 = s.params({"myid": 8, "myotherid": 7})
+ s3 = s2.params({"myid": 9})
+ assert s.compile().params == {"myid": None, "myotherid": None}
+ assert s2.compile().params == {"myid": 8, "myotherid": 7}
+ assert s3.compile().params == {"myid": 9, "myotherid": 7}
# test using same 'unique' param object twice in one compile
s = select([table1.c.myid]).where(table1.c.myid == 12).as_scalar()
s2 = select([table1, s], table1.c.myid == s)
self.assert_compile(
- s2, "SELECT mytable.myid, mytable.name, mytable.description, "
+ s2,
+ "SELECT mytable.myid, mytable.name, mytable.description, "
"(SELECT mytable.myid FROM mytable WHERE mytable.myid = "
":myid_1) AS anon_1 FROM mytable WHERE mytable.myid = "
- "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)")
+ "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)",
+ )
positional = s2.compile(dialect=sqlite.dialect())
pp = positional.params
assert [pp[k] for k in positional.positiontup] == [12, 12]
# check that conflicts with "unique" params are caught
- s = select([table1], or_(table1.c.myid == 7,
- table1.c.myid == bindparam('myid_1')))
- assert_raises_message(exc.CompileError,
- "conflicts with unique bind parameter "
- "of the same name",
- str, s)
-
- s = select([table1], or_(table1.c.myid == 7, table1.c.myid == 8,
- table1.c.myid == bindparam('myid_1')))
- assert_raises_message(exc.CompileError,
- "conflicts with unique bind parameter "
- "of the same name",
- str, s)
+ s = select(
+ [table1],
+ or_(table1.c.myid == 7, table1.c.myid == bindparam("myid_1")),
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "conflicts with unique bind parameter " "of the same name",
+ str,
+ s,
+ )
+
+ s = select(
+ [table1],
+ or_(
+ table1.c.myid == 7,
+ table1.c.myid == 8,
+ table1.c.myid == bindparam("myid_1"),
+ ),
+ )
+ assert_raises_message(
+ exc.CompileError,
+ "conflicts with unique bind parameter " "of the same name",
+ str,
+ s,
+ )
def _test_binds_no_hash_collision(self):
"""test that construct_params doesn't corrupt dict
@@ -2134,84 +2389,85 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
total_params = 100000
- in_clause = [':in%d' % i for i in range(total_params)]
- params = dict(('in%d' % i, i) for i in range(total_params))
- t = text('text clause %s' % ', '.join(in_clause))
+ in_clause = [":in%d" % i for i in range(total_params)]
+ params = dict(("in%d" % i, i) for i in range(total_params))
+ t = text("text clause %s" % ", ".join(in_clause))
eq_(len(t.bindparams), total_params)
c = t.compile()
pp = c.construct_params(params)
- eq_(len(set(pp)), total_params, '%s %s' % (len(set(pp)), len(pp)))
+ eq_(len(set(pp)), total_params, "%s %s" % (len(set(pp)), len(pp)))
eq_(len(set(pp.values())), total_params)
def test_bind_as_col(self):
- t = table('foo', column('id'))
+ t = table("foo", column("id"))
- s = select([t, literal('lala').label('hoho')])
+ s = select([t, literal("lala").label("hoho")])
self.assert_compile(s, "SELECT foo.id, :param_1 AS hoho FROM foo")
assert [str(c) for c in s.c] == ["id", "hoho"]
def test_bind_callable(self):
- expr = column('x') == bindparam("key", callable_=lambda: 12)
- self.assert_compile(
- expr,
- "x = :key",
- {'x': 12}
- )
+ expr = column("x") == bindparam("key", callable_=lambda: 12)
+ self.assert_compile(expr, "x = :key", {"x": 12})
def test_bind_params_missing(self):
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x'",
- select(
- [table1]).where(
+ select([table1])
+ .where(
and_(
table1.c.myid == bindparam("x", required=True),
- table1.c.name == bindparam("y", required=True)
+ table1.c.name == bindparam("y", required=True),
)
- ).compile().construct_params,
- params=dict(y=5)
+ )
+ .compile()
+ .construct_params,
+ params=dict(y=5),
)
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x'",
- select(
- [table1]).where(
- table1.c.myid == bindparam(
- "x",
- required=True)).compile().construct_params)
+ select([table1])
+ .where(table1.c.myid == bindparam("x", required=True))
+ .compile()
+ .construct_params,
+ )
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x', "
"in parameter group 2",
- select(
- [table1]).where(
+ select([table1])
+ .where(
and_(
table1.c.myid == bindparam("x", required=True),
- table1.c.name == bindparam("y", required=True)
+ table1.c.name == bindparam("y", required=True),
)
- ).compile().construct_params,
- params=dict(y=5), _group_number=2)
+ )
+ .compile()
+ .construct_params,
+ params=dict(y=5),
+ _group_number=2,
+ )
assert_raises_message(
exc.InvalidRequestError,
r"A value is required for bind parameter 'x', "
"in parameter group 2",
- select(
- [table1]).where(
- table1.c.myid == bindparam(
- "x",
- required=True)).compile().construct_params,
- _group_number=2)
+ select([table1])
+ .where(table1.c.myid == bindparam("x", required=True))
+ .compile()
+ .construct_params,
+ _group_number=2,
+ )
def test_tuple(self):
self.assert_compile(
- tuple_(table1.c.myid, table1.c.name).in_(
- [(1, 'foo'), (5, 'bar')]),
+ tuple_(table1.c.myid, table1.c.name).in_([(1, "foo"), (5, "bar")]),
"(mytable.myid, mytable.name) IN "
- "((:param_1, :param_2), (:param_3, :param_4))"
+ "((:param_1, :param_2), (:param_3, :param_4))",
)
self.assert_compile(
@@ -2219,7 +2475,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
[tuple_(table2.c.otherid, table2.c.othername)]
),
"(mytable.myid, mytable.name) IN "
- "((myothertable.otherid, myothertable.othername))"
+ "((myothertable.otherid, myothertable.othername))",
)
self.assert_compile(
@@ -2227,226 +2483,245 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
select([table2.c.otherid, table2.c.othername])
),
"(mytable.myid, mytable.name) IN (SELECT "
- "myothertable.otherid, myothertable.othername FROM myothertable)"
+ "myothertable.otherid, myothertable.othername FROM myothertable)",
)
def test_expanding_parameter(self):
self.assert_compile(
tuple_(table1.c.myid, table1.c.name).in_(
- bindparam('foo', expanding=True)),
- "(mytable.myid, mytable.name) IN ([EXPANDING_foo])"
+ bindparam("foo", expanding=True)
+ ),
+ "(mytable.myid, mytable.name) IN ([EXPANDING_foo])",
)
self.assert_compile(
- table1.c.myid.in_(bindparam('foo', expanding=True)),
- "mytable.myid IN ([EXPANDING_foo])"
+ table1.c.myid.in_(bindparam("foo", expanding=True)),
+ "mytable.myid IN ([EXPANDING_foo])",
)
def test_cast(self):
- tbl = table('casttest',
- column('id', Integer),
- column('v1', Float),
- column('v2', Float),
- column('ts', TIMESTAMP),
- )
+ tbl = table(
+ "casttest",
+ column("id", Integer),
+ column("v1", Float),
+ column("v2", Float),
+ column("ts", TIMESTAMP),
+ )
def check_results(dialect, expected_results, literal):
- eq_(len(expected_results), 5,
- 'Incorrect number of expected results')
- eq_(str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)),
- 'CAST(casttest.v1 AS %s)' % expected_results[0])
- eq_(str(tbl.c.v1.cast(Numeric).compile(dialect=dialect)),
- 'CAST(casttest.v1 AS %s)' % expected_results[0])
- eq_(str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)),
- 'CAST(casttest.v1 AS %s)' % expected_results[1])
- eq_(str(cast(tbl.c.ts, Date).compile(dialect=dialect)),
- 'CAST(casttest.ts AS %s)' % expected_results[2])
- eq_(str(cast(1234, Text).compile(dialect=dialect)),
- 'CAST(%s AS %s)' % (literal, expected_results[3]))
- eq_(str(cast('test', String(20)).compile(dialect=dialect)),
- 'CAST(%s AS %s)' % (literal, expected_results[4]))
+ eq_(
+ len(expected_results),
+ 5,
+ "Incorrect number of expected results",
+ )
+ eq_(
+ str(cast(tbl.c.v1, Numeric).compile(dialect=dialect)),
+ "CAST(casttest.v1 AS %s)" % expected_results[0],
+ )
+ eq_(
+ str(tbl.c.v1.cast(Numeric).compile(dialect=dialect)),
+ "CAST(casttest.v1 AS %s)" % expected_results[0],
+ )
+ eq_(
+ str(cast(tbl.c.v1, Numeric(12, 9)).compile(dialect=dialect)),
+ "CAST(casttest.v1 AS %s)" % expected_results[1],
+ )
+ eq_(
+ str(cast(tbl.c.ts, Date).compile(dialect=dialect)),
+ "CAST(casttest.ts AS %s)" % expected_results[2],
+ )
+ eq_(
+ str(cast(1234, Text).compile(dialect=dialect)),
+ "CAST(%s AS %s)" % (literal, expected_results[3]),
+ )
+ eq_(
+ str(cast("test", String(20)).compile(dialect=dialect)),
+ "CAST(%s AS %s)" % (literal, expected_results[4]),
+ )
# fixme: shoving all of this dialect-specific stuff in one test
# is now officially completely ridiculous AND non-obviously omits
# coverage on other dialects.
sel = select([tbl, cast(tbl.c.v1, Numeric)]).compile(
- dialect=dialect)
+ dialect=dialect
+ )
if isinstance(dialect, type(mysql.dialect())):
- eq_(str(sel),
+ eq_(
+ str(sel),
"SELECT casttest.id, casttest.v1, casttest.v2, "
"casttest.ts, "
- "CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest")
+ "CAST(casttest.v1 AS DECIMAL) AS anon_1 \nFROM casttest",
+ )
else:
- eq_(str(sel),
+ eq_(
+ str(sel),
"SELECT casttest.id, casttest.v1, casttest.v2, "
"casttest.ts, CAST(casttest.v1 AS NUMERIC) AS "
- "anon_1 \nFROM casttest")
+ "anon_1 \nFROM casttest",
+ )
# first test with PostgreSQL engine
check_results(
- postgresql.dialect(), [
- 'NUMERIC', 'NUMERIC(12, 9)', 'DATE', 'TEXT', 'VARCHAR(20)'],
- '%(param_1)s')
+ postgresql.dialect(),
+ ["NUMERIC", "NUMERIC(12, 9)", "DATE", "TEXT", "VARCHAR(20)"],
+ "%(param_1)s",
+ )
# then the Oracle engine
check_results(
- oracle.dialect(), [
- 'NUMERIC', 'NUMERIC(12, 9)', 'DATE',
- 'CLOB', 'VARCHAR2(20 CHAR)'],
- ':param_1')
+ oracle.dialect(),
+ ["NUMERIC", "NUMERIC(12, 9)", "DATE", "CLOB", "VARCHAR2(20 CHAR)"],
+ ":param_1",
+ )
# then the sqlite engine
- check_results(sqlite.dialect(), ['NUMERIC', 'NUMERIC(12, 9)',
- 'DATE', 'TEXT', 'VARCHAR(20)'], '?')
+ check_results(
+ sqlite.dialect(),
+ ["NUMERIC", "NUMERIC(12, 9)", "DATE", "TEXT", "VARCHAR(20)"],
+ "?",
+ )
# then the MySQL engine
- check_results(mysql.dialect(), ['DECIMAL', 'DECIMAL(12, 9)',
- 'DATE', 'CHAR', 'CHAR(20)'], '%s')
-
- self.assert_compile(cast(text('NULL'), Integer),
- 'CAST(NULL AS INTEGER)',
- dialect=sqlite.dialect())
- self.assert_compile(cast(null(), Integer),
- 'CAST(NULL AS INTEGER)',
- dialect=sqlite.dialect())
- self.assert_compile(cast(literal_column('NULL'), Integer),
- 'CAST(NULL AS INTEGER)',
- dialect=sqlite.dialect())
+ check_results(
+ mysql.dialect(),
+ ["DECIMAL", "DECIMAL(12, 9)", "DATE", "CHAR", "CHAR(20)"],
+ "%s",
+ )
- def test_over(self):
self.assert_compile(
- func.row_number().over(),
- "row_number() OVER ()"
+ cast(text("NULL"), Integer),
+ "CAST(NULL AS INTEGER)",
+ dialect=sqlite.dialect(),
+ )
+ self.assert_compile(
+ cast(null(), Integer),
+ "CAST(NULL AS INTEGER)",
+ dialect=sqlite.dialect(),
+ )
+ self.assert_compile(
+ cast(literal_column("NULL"), Integer),
+ "CAST(NULL AS INTEGER)",
+ dialect=sqlite.dialect(),
)
+
+ def test_over(self):
+ self.assert_compile(func.row_number().over(), "row_number() OVER ()")
self.assert_compile(
func.row_number().over(
order_by=[table1.c.name, table1.c.description]
),
- "row_number() OVER (ORDER BY mytable.name, mytable.description)"
+ "row_number() OVER (ORDER BY mytable.name, mytable.description)",
)
self.assert_compile(
func.row_number().over(
partition_by=[table1.c.name, table1.c.description]
),
"row_number() OVER (PARTITION BY mytable.name, "
- "mytable.description)"
+ "mytable.description)",
)
self.assert_compile(
func.row_number().over(
- partition_by=[table1.c.name],
- order_by=[table1.c.description]
+ partition_by=[table1.c.name], order_by=[table1.c.description]
),
"row_number() OVER (PARTITION BY mytable.name "
- "ORDER BY mytable.description)"
+ "ORDER BY mytable.description)",
)
self.assert_compile(
func.row_number().over(
- partition_by=table1.c.name,
- order_by=table1.c.description
+ partition_by=table1.c.name, order_by=table1.c.description
),
"row_number() OVER (PARTITION BY mytable.name "
- "ORDER BY mytable.description)"
+ "ORDER BY mytable.description)",
)
self.assert_compile(
func.row_number().over(
partition_by=table1.c.name,
- order_by=[table1.c.name, table1.c.description]
+ order_by=[table1.c.name, table1.c.description],
),
"row_number() OVER (PARTITION BY mytable.name "
- "ORDER BY mytable.name, mytable.description)"
+ "ORDER BY mytable.name, mytable.description)",
)
self.assert_compile(
func.row_number().over(
- partition_by=[],
- order_by=[table1.c.name, table1.c.description]
+ partition_by=[], order_by=[table1.c.name, table1.c.description]
),
- "row_number() OVER (ORDER BY mytable.name, mytable.description)"
+ "row_number() OVER (ORDER BY mytable.name, mytable.description)",
)
self.assert_compile(
func.row_number().over(
- partition_by=[table1.c.name, table1.c.description],
- order_by=[]
+ partition_by=[table1.c.name, table1.c.description], order_by=[]
),
"row_number() OVER (PARTITION BY mytable.name, "
- "mytable.description)"
+ "mytable.description)",
)
self.assert_compile(
- func.row_number().over(
- partition_by=[],
- order_by=[]
- ),
- "row_number() OVER ()"
+ func.row_number().over(partition_by=[], order_by=[]),
+ "row_number() OVER ()",
)
self.assert_compile(
- select([func.row_number().over(
- order_by=table1.c.description
- ).label('foo')]),
+ select(
+ [
+ func.row_number()
+ .over(order_by=table1.c.description)
+ .label("foo")
+ ]
+ ),
"SELECT row_number() OVER (ORDER BY mytable.description) "
- "AS foo FROM mytable"
+ "AS foo FROM mytable",
)
# test from_obj generation.
# from func:
self.assert_compile(
- select([
- func.max(table1.c.name).over(
- partition_by=['description']
- )
- ]),
+ select(
+ [func.max(table1.c.name).over(partition_by=["description"])]
+ ),
"SELECT max(mytable.name) OVER (PARTITION BY mytable.description) "
- "AS anon_1 FROM mytable"
+ "AS anon_1 FROM mytable",
)
# from partition_by
self.assert_compile(
- select([
- func.row_number().over(
- partition_by=[table1.c.name]
- )
- ]),
+ select([func.row_number().over(partition_by=[table1.c.name])]),
"SELECT row_number() OVER (PARTITION BY mytable.name) "
- "AS anon_1 FROM mytable"
+ "AS anon_1 FROM mytable",
)
# from order_by
self.assert_compile(
- select([
- func.row_number().over(
- order_by=table1.c.name
- )
- ]),
+ select([func.row_number().over(order_by=table1.c.name)]),
"SELECT row_number() OVER (ORDER BY mytable.name) "
- "AS anon_1 FROM mytable"
+ "AS anon_1 FROM mytable",
)
# this tests that _from_objects
# concantenates OK
self.assert_compile(
select([column("x") + over(func.foo())]),
- "SELECT x + foo() OVER () AS anon_1"
+ "SELECT x + foo() OVER () AS anon_1",
)
# test a reference to a label that in the referecned selectable;
# this resolves
- expr = (table1.c.myid + 5).label('sum')
+ expr = (table1.c.myid + 5).label("sum")
stmt = select([expr]).alias()
self.assert_compile(
select([stmt.c.sum, func.row_number().over(order_by=stmt.c.sum)]),
"SELECT anon_1.sum, row_number() OVER (ORDER BY anon_1.sum) "
"AS anon_2 FROM (SELECT mytable.myid + :myid_1 AS sum "
- "FROM mytable) AS anon_1"
+ "FROM mytable) AS anon_1",
)
# test a reference to a label that's at the same level as the OVER
# in the columns clause; doesn't resolve
- expr = (table1.c.myid + 5).label('sum')
+ expr = (table1.c.myid + 5).label("sum")
self.assert_compile(
select([expr, func.row_number().over(order_by=expr)]),
"SELECT mytable.myid + :myid_1 AS sum, "
"row_number() OVER "
- "(ORDER BY mytable.myid + :myid_1) AS anon_1 FROM mytable"
+ "(ORDER BY mytable.myid + :myid_1) AS anon_1 FROM mytable",
)
def test_over_framespec(self):
@@ -2457,7 +2732,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT row_number() OVER "
"(ORDER BY mytable.myid ROWS BETWEEN CURRENT "
"ROW AND UNBOUNDED FOLLOWING)"
- " AS anon_1 FROM mytable"
+ " AS anon_1 FROM mytable",
)
self.assert_compile(
@@ -2465,7 +2740,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT row_number() OVER "
"(ORDER BY mytable.myid ROWS BETWEEN UNBOUNDED "
"PRECEDING AND UNBOUNDED FOLLOWING)"
- " AS anon_1 FROM mytable"
+ " AS anon_1 FROM mytable",
)
self.assert_compile(
@@ -2473,7 +2748,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"SELECT row_number() OVER "
"(ORDER BY mytable.myid RANGE BETWEEN "
"UNBOUNDED PRECEDING AND CURRENT ROW)"
- " AS anon_1 FROM mytable"
+ " AS anon_1 FROM mytable",
)
self.assert_compile(
@@ -2482,7 +2757,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"(ORDER BY mytable.myid RANGE BETWEEN "
":param_1 PRECEDING AND :param_2 FOLLOWING)"
" AS anon_1 FROM mytable",
- checkparams={'param_1': 5, 'param_2': 10}
+ checkparams={"param_1": 5, "param_2": 10},
)
self.assert_compile(
@@ -2491,7 +2766,7 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"(ORDER BY mytable.myid RANGE BETWEEN "
":param_1 FOLLOWING AND :param_2 FOLLOWING)"
" AS anon_1 FROM mytable",
- checkparams={'param_1': 1, 'param_2': 10}
+ checkparams={"param_1": 1, "param_2": 10},
)
self.assert_compile(
@@ -2500,89 +2775,108 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"(ORDER BY mytable.myid RANGE BETWEEN "
":param_1 PRECEDING AND :param_2 PRECEDING)"
" AS anon_1 FROM mytable",
- checkparams={'param_1': 10, 'param_2': 1}
+ checkparams={"param_1": 10, "param_2": 1},
)
def test_over_invalid_framespecs(self):
assert_raises_message(
exc.ArgumentError,
"Integer or None expected for range value",
- func.row_number().over, range_=("foo", 8)
+ func.row_number().over,
+ range_=("foo", 8),
)
assert_raises_message(
exc.ArgumentError,
"Integer or None expected for range value",
- func.row_number().over, range_=(-5, "foo")
+ func.row_number().over,
+ range_=(-5, "foo"),
)
assert_raises_message(
exc.ArgumentError,
"'range_' and 'rows' are mutually exclusive",
- func.row_number().over, range_=(-5, 8), rows=(-2, 5)
+ func.row_number().over,
+ range_=(-5, 8),
+ rows=(-2, 5),
)
def test_over_within_group(self):
from sqlalchemy import within_group
- stmt = select([
- table1.c.myid,
- within_group(
- func.percentile_cont(0.5),
- table1.c.name.desc()
- ).over(
- range_=(1, 2),
- partition_by=table1.c.name,
- order_by=table1.c.myid
- )
- ])
+
+ stmt = select(
+ [
+ table1.c.myid,
+ within_group(
+ func.percentile_cont(0.5), table1.c.name.desc()
+ ).over(
+ range_=(1, 2),
+ partition_by=table1.c.name,
+ order_by=table1.c.myid,
+ ),
+ ]
+ )
eq_ignore_whitespace(
str(stmt),
"SELECT mytable.myid, percentile_cont(:percentile_cont_1) "
"WITHIN GROUP (ORDER BY mytable.name DESC) "
"OVER (PARTITION BY mytable.name ORDER BY mytable.myid "
"RANGE BETWEEN :param_1 FOLLOWING AND :param_2 FOLLOWING) "
- "AS anon_1 FROM mytable"
+ "AS anon_1 FROM mytable",
+ )
+
+ stmt = select(
+ [
+ table1.c.myid,
+ within_group(
+ func.percentile_cont(0.5), table1.c.name.desc()
+ ).over(
+ rows=(1, 2),
+ partition_by=table1.c.name,
+ order_by=table1.c.myid,
+ ),
+ ]
)
-
- stmt = select([
- table1.c.myid,
- within_group(
- func.percentile_cont(0.5),
- table1.c.name.desc()
- ).over(
- rows=(1, 2),
- partition_by=table1.c.name,
- order_by=table1.c.myid
- )
- ])
eq_ignore_whitespace(
str(stmt),
"SELECT mytable.myid, percentile_cont(:percentile_cont_1) "
"WITHIN GROUP (ORDER BY mytable.name DESC) "
"OVER (PARTITION BY mytable.name ORDER BY mytable.myid "
"ROWS BETWEEN :param_1 FOLLOWING AND :param_2 FOLLOWING) "
- "AS anon_1 FROM mytable"
+ "AS anon_1 FROM mytable",
)
-
-
def test_date_between(self):
import datetime
- table = Table('dt', metadata,
- Column('date', Date))
+
+ table = Table("dt", metadata, Column("date", Date))
self.assert_compile(
- table.select(table.c.date.between(datetime.date(2006, 6, 1),
- datetime.date(2006, 6, 5))),
+ table.select(
+ table.c.date.between(
+ datetime.date(2006, 6, 1), datetime.date(2006, 6, 5)
+ )
+ ),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
- checkparams={'date_1': datetime.date(2006, 6, 1),
- 'date_2': datetime.date(2006, 6, 5)})
+ checkparams={
+ "date_1": datetime.date(2006, 6, 1),
+ "date_2": datetime.date(2006, 6, 5),
+ },
+ )
self.assert_compile(
- table.select(sql.between(table.c.date, datetime.date(2006, 6, 1),
- datetime.date(2006, 6, 5))),
+ table.select(
+ sql.between(
+ table.c.date,
+ datetime.date(2006, 6, 1),
+ datetime.date(2006, 6, 5),
+ )
+ ),
"SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
- checkparams={'date_1': datetime.date(2006, 6, 1),
- 'date_2': datetime.date(2006, 6, 5)})
+ checkparams={
+ "date_1": datetime.date(2006, 6, 1),
+ "date_2": datetime.date(2006, 6, 5),
+ },
+ )
def test_delayed_col_naming(self):
my_str = Column(String)
@@ -2592,18 +2886,18 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
assert_raises_message(
exc.InvalidRequestError,
"Cannot initialize a sub-selectable with this Column",
- lambda: sel1.c
+ lambda: sel1.c,
)
# calling label or as_scalar doesn't compile
# anything.
- sel2 = select([func.substr(my_str, 2, 3)]).label('my_substr')
+ sel2 = select([func.substr(my_str, 2, 3)]).label("my_substr")
assert_raises_message(
exc.CompileError,
"Cannot compile Column object until its 'name' is assigned.",
sel2.compile,
- dialect=default.DefaultDialect()
+ dialect=default.DefaultDialect(),
)
sel3 = select([my_str]).as_scalar()
@@ -2611,24 +2905,17 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
exc.CompileError,
"Cannot compile Column object until its 'name' is assigned.",
sel3.compile,
- dialect=default.DefaultDialect()
+ dialect=default.DefaultDialect(),
)
- my_str.name = 'foo'
+ my_str.name = "foo"
+ self.assert_compile(sel1, "SELECT foo")
self.assert_compile(
- sel1,
- "SELECT foo",
- )
- self.assert_compile(
- sel2,
- '(SELECT substr(foo, :substr_2, :substr_3) AS substr_1)',
+ sel2, "(SELECT substr(foo, :substr_2, :substr_3) AS substr_1)"
)
- self.assert_compile(
- sel3,
- "(SELECT foo)"
- )
+ self.assert_compile(sel3, "(SELECT foo)")
def test_naming(self):
# TODO: the part where we check c.keys() are not "compile" tests, they
@@ -2636,36 +2923,46 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
# version of that suite
f1 = func.hoho(table1.c.name)
- s1 = select([table1.c.myid, table1.c.myid.label('foobar'),
- f1,
- func.lala(table1.c.name).label('gg')])
-
- eq_(
- list(s1.c.keys()),
- ['myid', 'foobar', str(f1), 'gg']
+ s1 = select(
+ [
+ table1.c.myid,
+ table1.c.myid.label("foobar"),
+ f1,
+ func.lala(table1.c.name).label("gg"),
+ ]
)
+ eq_(list(s1.c.keys()), ["myid", "foobar", str(f1), "gg"])
+
meta = MetaData()
- t1 = Table('mytable', meta, Column('col1', Integer))
+ t1 = Table("mytable", meta, Column("col1", Integer))
exprs = (
table1.c.myid == 12,
func.hoho(table1.c.myid),
cast(table1.c.name, Numeric),
- literal('x'),
+ literal("x"),
)
for col, key, expr, lbl in (
- (table1.c.name, 'name', 'mytable.name', None),
- (exprs[0], str(exprs[0]), 'mytable.myid = :myid_1', 'anon_1'),
- (exprs[1], str(exprs[1]), 'hoho(mytable.myid)', 'hoho_1'),
- (exprs[2], str(exprs[2]),
- 'CAST(mytable.name AS NUMERIC)', 'anon_1'),
- (t1.c.col1, 'col1', 'mytable.col1', None),
- (column('some wacky thing'), 'some wacky thing',
- '"some wacky thing"', ''),
- (exprs[3], exprs[3].key, ":param_1", "anon_1")
+ (table1.c.name, "name", "mytable.name", None),
+ (exprs[0], str(exprs[0]), "mytable.myid = :myid_1", "anon_1"),
+ (exprs[1], str(exprs[1]), "hoho(mytable.myid)", "hoho_1"),
+ (
+ exprs[2],
+ str(exprs[2]),
+ "CAST(mytable.name AS NUMERIC)",
+ "anon_1",
+ ),
+ (t1.c.col1, "col1", "mytable.col1", None),
+ (
+ column("some wacky thing"),
+ "some wacky thing",
+ '"some wacky thing"',
+ "",
+ ),
+ (exprs[3], exprs[3].key, ":param_1", "anon_1"),
):
- if getattr(col, 'table', None) is not None:
+ if getattr(col, "table", None) is not None:
t = col.table
else:
t = table1
@@ -2675,107 +2972,151 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
if lbl:
self.assert_compile(
- s1, "SELECT %s AS %s FROM mytable" %
- (expr, lbl))
+ s1, "SELECT %s AS %s FROM mytable" % (expr, lbl)
+ )
else:
self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,))
s1 = select([s1])
if lbl:
self.assert_compile(
- s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" %
- (lbl, expr, lbl))
+ s1,
+ "SELECT %s FROM (SELECT %s AS %s FROM mytable)"
+ % (lbl, expr, lbl),
+ )
elif col.table is not None:
# sqlite rule labels subquery columns
self.assert_compile(
- s1, "SELECT %s FROM (SELECT %s AS %s FROM mytable)" %
- (key, expr, key))
+ s1,
+ "SELECT %s FROM (SELECT %s AS %s FROM mytable)"
+ % (key, expr, key),
+ )
else:
- self.assert_compile(s1,
- "SELECT %s FROM (SELECT %s FROM mytable)" %
- (expr, expr))
+ self.assert_compile(
+ s1,
+ "SELECT %s FROM (SELECT %s FROM mytable)" % (expr, expr),
+ )
def test_hints(self):
s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s")
- s2 = select([table1.c.myid]).\
- with_hint(table1, "index(%(name)s idx)", 'oracle').\
- with_hint(table1, "WITH HINT INDEX idx", 'sybase')
+ s2 = (
+ select([table1.c.myid])
+ .with_hint(table1, "index(%(name)s idx)", "oracle")
+ .with_hint(table1, "WITH HINT INDEX idx", "sybase")
+ )
a1 = table1.alias()
s3 = select([a1.c.myid]).with_hint(a1, "index(%(name)s hint)")
- subs4 = select([
- table1, table2
- ]).select_from(
- table1.join(table2, table1.c.myid == table2.c.otherid)).\
- with_hint(table1, 'hint1')
+ subs4 = (
+ select([table1, table2])
+ .select_from(
+ table1.join(table2, table1.c.myid == table2.c.otherid)
+ )
+ .with_hint(table1, "hint1")
+ )
- s4 = select([table3]).select_from(
- table3.join(
- subs4,
- subs4.c.othername == table3.c.otherstuff
+ s4 = (
+ select([table3])
+ .select_from(
+ table3.join(subs4, subs4.c.othername == table3.c.otherstuff)
)
- ).\
- with_hint(table3, 'hint3')
+ .with_hint(table3, "hint3")
+ )
- t1 = table('QuotedName', column('col1'))
- s6 = select([t1.c.col1]).where(t1.c.col1 > 10).\
- with_hint(t1, '%(name)s idx1')
- a2 = t1.alias('SomeName')
- s7 = select([a2.c.col1]).where(a2.c.col1 > 10).\
- with_hint(a2, '%(name)s idx1')
+ t1 = table("QuotedName", column("col1"))
+ s6 = (
+ select([t1.c.col1])
+ .where(t1.c.col1 > 10)
+ .with_hint(t1, "%(name)s idx1")
+ )
+ a2 = t1.alias("SomeName")
+ s7 = (
+ select([a2.c.col1])
+ .where(a2.c.col1 > 10)
+ .with_hint(a2, "%(name)s idx1")
+ )
- mysql_d, oracle_d, sybase_d = \
- mysql.dialect(), \
- oracle.dialect(), \
- sybase.dialect()
+ mysql_d, oracle_d, sybase_d = (
+ mysql.dialect(),
+ oracle.dialect(),
+ sybase.dialect(),
+ )
for stmt, dialect, expected in [
- (s, mysql_d,
- "SELECT mytable.myid FROM mytable test hint mytable"),
- (s, oracle_d,
- "SELECT /*+ test hint mytable */ mytable.myid FROM mytable"),
- (s, sybase_d,
- "SELECT mytable.myid FROM mytable test hint mytable"),
- (s2, mysql_d,
- "SELECT mytable.myid FROM mytable"),
- (s2, oracle_d,
- "SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable"),
- (s2, sybase_d,
- "SELECT mytable.myid FROM mytable WITH HINT INDEX idx"),
- (s3, mysql_d,
+ (s, mysql_d, "SELECT mytable.myid FROM mytable test hint mytable"),
+ (
+ s,
+ oracle_d,
+ "SELECT /*+ test hint mytable */ mytable.myid FROM mytable",
+ ),
+ (
+ s,
+ sybase_d,
+ "SELECT mytable.myid FROM mytable test hint mytable",
+ ),
+ (s2, mysql_d, "SELECT mytable.myid FROM mytable"),
+ (
+ s2,
+ oracle_d,
+ "SELECT /*+ index(mytable idx) */ mytable.myid FROM mytable",
+ ),
+ (
+ s2,
+ sybase_d,
+ "SELECT mytable.myid FROM mytable WITH HINT INDEX idx",
+ ),
+ (
+ s3,
+ mysql_d,
"SELECT mytable_1.myid FROM mytable AS mytable_1 "
- "index(mytable_1 hint)"),
- (s3, oracle_d,
+ "index(mytable_1 hint)",
+ ),
+ (
+ s3,
+ oracle_d,
"SELECT /*+ index(mytable_1 hint) */ mytable_1.myid FROM "
- "mytable mytable_1"),
- (s3, sybase_d,
+ "mytable mytable_1",
+ ),
+ (
+ s3,
+ sybase_d,
"SELECT mytable_1.myid FROM mytable AS mytable_1 "
- "index(mytable_1 hint)"),
- (s4, mysql_d,
+ "index(mytable_1 hint)",
+ ),
+ (
+ s4,
+ mysql_d,
"SELECT thirdtable.userid, thirdtable.otherstuff "
"FROM thirdtable "
"hint3 INNER JOIN (SELECT mytable.myid, mytable.name, "
"mytable.description, myothertable.otherid, "
"myothertable.othername FROM mytable hint1 INNER "
"JOIN myothertable ON mytable.myid = myothertable.otherid) "
- "ON othername = thirdtable.otherstuff"),
- (s4, sybase_d,
+ "ON othername = thirdtable.otherstuff",
+ ),
+ (
+ s4,
+ sybase_d,
"SELECT thirdtable.userid, thirdtable.otherstuff "
"FROM thirdtable "
"hint3 JOIN (SELECT mytable.myid, mytable.name, "
"mytable.description, myothertable.otherid, "
"myothertable.othername FROM mytable hint1 "
"JOIN myothertable ON mytable.myid = myothertable.otherid) "
- "ON othername = thirdtable.otherstuff"),
- (s4, oracle_d,
+ "ON othername = thirdtable.otherstuff",
+ ),
+ (
+ s4,
+ oracle_d,
"SELECT /*+ hint3 */ thirdtable.userid, thirdtable.otherstuff "
"FROM thirdtable JOIN (SELECT /*+ hint1 */ mytable.myid,"
" mytable.name, mytable.description, myothertable.otherid,"
" myothertable.othername FROM mytable JOIN myothertable ON"
" mytable.myid = myothertable.otherid) ON othername ="
- " thirdtable.otherstuff"),
+ " thirdtable.otherstuff",
+ ),
# TODO: figure out dictionary ordering solution here
# (s5, oracle_d,
# "SELECT /*+ hint3 */ /*+ hint1 */ thirdtable.userid, "
@@ -2785,68 +3126,64 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
# " myothertable.othername FROM mytable JOIN myothertable ON"
# " mytable.myid = myothertable.otherid) ON othername ="
# " thirdtable.otherstuff"),
- (s6, oracle_d,
+ (
+ s6,
+ oracle_d,
"""SELECT /*+ "QuotedName" idx1 */ "QuotedName".col1 """
- """FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1"""),
- (s7, oracle_d,
- """SELECT /*+ "SomeName" idx1 */ "SomeName".col1 FROM """
- """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1"""),
+ """FROM "QuotedName" WHERE "QuotedName".col1 > :col1_1""",
+ ),
+ (
+ s7,
+ oracle_d,
+ """SELECT /*+ "SomeName" idx1 */ "SomeName".col1 FROM """
+ """"QuotedName" "SomeName" WHERE "SomeName".col1 > :col1_1""",
+ ),
]:
- self.assert_compile(
- stmt,
- expected,
- dialect=dialect
- )
+ self.assert_compile(stmt, expected, dialect=dialect)
def test_statement_hints(self):
- stmt = select([table1.c.myid]).\
- with_statement_hint("test hint one").\
- with_statement_hint("test hint two", 'mysql')
+ stmt = (
+ select([table1.c.myid])
+ .with_statement_hint("test hint one")
+ .with_statement_hint("test hint two", "mysql")
+ )
self.assert_compile(
- stmt,
- "SELECT mytable.myid FROM mytable test hint one",
+ stmt, "SELECT mytable.myid FROM mytable test hint one"
)
self.assert_compile(
stmt,
"SELECT mytable.myid FROM mytable test hint one test hint two",
- dialect='mysql'
+ dialect="mysql",
)
def test_literal_as_text_fromstring(self):
- self.assert_compile(
- and_(text("a"), text("b")),
- "a AND b"
- )
+ self.assert_compile(and_(text("a"), text("b")), "a AND b")
def test_literal_as_text_nonstring_raise(self):
- assert_raises(exc.ArgumentError,
- and_, ("a",), ("b",)
- )
+ assert_raises(exc.ArgumentError, and_, ("a",), ("b",))
class UnsupportedTest(fixtures.TestBase):
-
def test_unsupported_element_str_visit_name(self):
from sqlalchemy.sql.expression import ClauseElement
class SomeElement(ClauseElement):
- __visit_name__ = 'some_element'
+ __visit_name__ = "some_element"
assert_raises_message(
exc.UnsupportedCompilationError,
r"Compiler <sqlalchemy.sql.compiler.StrSQLCompiler .*"
r"can't render element of type <class '.*SomeElement'>",
- SomeElement().compile
+ SomeElement().compile,
)
def test_unsupported_element_meth_visit_name(self):
from sqlalchemy.sql.expression import ClauseElement
class SomeElement(ClauseElement):
-
@classmethod
def __visit_name__(cls):
return "some_element"
@@ -2855,7 +3192,7 @@ class UnsupportedTest(fixtures.TestBase):
exc.UnsupportedCompilationError,
r"Compiler <sqlalchemy.sql.compiler.StrSQLCompiler .*"
r"can't render element of type <class '.*SomeElement'>",
- SomeElement().compile
+ SomeElement().compile,
)
def test_unsupported_operator(self):
@@ -2863,12 +3200,13 @@ class UnsupportedTest(fixtures.TestBase):
def myop(x, y):
pass
+
binary = BinaryExpression(column("foo"), column("bar"), myop)
assert_raises_message(
exc.UnsupportedCompilationError,
r"Compiler <sqlalchemy.sql.compiler.StrSQLCompiler .*"
r"can't render element of type <function.*",
- binary.compile
+ binary.compile,
)
@@ -2878,15 +3216,12 @@ class StringifySpecialTest(fixtures.TestBase):
eq_ignore_whitespace(
str(stmt),
"SELECT mytable.myid, mytable.name, mytable.description "
- "FROM mytable WHERE mytable.myid = :myid_1"
+ "FROM mytable WHERE mytable.myid = :myid_1",
)
def test_unnamed_column(self):
stmt = Column(Integer) == 5
- eq_ignore_whitespace(
- str(stmt),
- '"<name unknown>" = :param_1'
- )
+ eq_ignore_whitespace(str(stmt), '"<name unknown>" = :param_1')
def test_cte(self):
# stringify of these was supported anyway by defaultdialect.
@@ -2895,7 +3230,7 @@ class StringifySpecialTest(fixtures.TestBase):
eq_ignore_whitespace(
str(stmt),
"WITH anon_1 AS (SELECT mytable.myid AS myid FROM mytable) "
- "SELECT anon_1.myid FROM anon_1"
+ "SELECT anon_1.myid FROM anon_1",
)
def test_next_sequence_value(self):
@@ -2906,8 +3241,7 @@ class StringifySpecialTest(fixtures.TestBase):
seq = Sequence("my_sequence")
eq_ignore_whitespace(
- str(seq.next_value()),
- "<next sequence value: my_sequence>"
+ str(seq.next_value()), "<next sequence value: my_sequence>"
)
def test_returning(self):
@@ -2916,47 +3250,43 @@ class StringifySpecialTest(fixtures.TestBase):
eq_ignore_whitespace(
str(stmt),
"INSERT INTO mytable (myid, name, description) "
- "VALUES (:myid, :name, :description) RETURNING mytable.myid"
+ "VALUES (:myid, :name, :description) RETURNING mytable.myid",
)
def test_array_index(self):
- stmt = select([column('foo', types.ARRAY(Integer))[5]])
+ stmt = select([column("foo", types.ARRAY(Integer))[5]])
- eq_ignore_whitespace(
- str(stmt),
- "SELECT foo[:foo_1] AS anon_1"
- )
+ eq_ignore_whitespace(str(stmt), "SELECT foo[:foo_1] AS anon_1")
def test_unknown_type(self):
class MyType(types.TypeEngine):
- __visit_name__ = 'mytype'
+ __visit_name__ = "mytype"
stmt = select([cast(table1.c.myid, MyType)])
eq_ignore_whitespace(
str(stmt),
- "SELECT CAST(mytable.myid AS MyType) AS anon_1 FROM mytable"
+ "SELECT CAST(mytable.myid AS MyType) AS anon_1 FROM mytable",
)
def test_within_group(self):
# stringify of these was supported anyway by defaultdialect.
from sqlalchemy import within_group
- stmt = select([
- table1.c.myid,
- within_group(
- func.percentile_cont(0.5),
- table1.c.name.desc()
- )
- ])
+
+ stmt = select(
+ [
+ table1.c.myid,
+ within_group(func.percentile_cont(0.5), table1.c.name.desc()),
+ ]
+ )
eq_ignore_whitespace(
str(stmt),
"SELECT mytable.myid, percentile_cont(:percentile_cont_1) "
- "WITHIN GROUP (ORDER BY mytable.name DESC) AS anon_1 FROM mytable"
+ "WITHIN GROUP (ORDER BY mytable.name DESC) AS anon_1 FROM mytable",
)
class KwargPropagationTest(fixtures.TestBase):
-
@classmethod
def setup_class(cls):
from sqlalchemy.sql.expression import ColumnClause, TableClause
@@ -2969,7 +3299,7 @@ class KwargPropagationTest(fixtures.TestBase):
cls.column = CatchCol("x")
cls.table = CatchTable("y")
- cls.criterion = cls.column == CatchCol('y')
+ cls.criterion = cls.column == CatchCol("y")
@compiles(CatchCol)
def compile_col(element, compiler, **kw):
@@ -2983,16 +3313,18 @@ class KwargPropagationTest(fixtures.TestBase):
def _do_test(self, element):
d = default.DefaultDialect()
- d.statement_compiler(d, element,
- compile_kwargs={"canary": True})
+ d.statement_compiler(d, element, compile_kwargs={"canary": True})
def test_binary(self):
self._do_test(self.column == 5)
def test_select(self):
- s = select([self.column]).select_from(self.table).\
- where(self.column == self.criterion).\
- order_by(self.column)
+ s = (
+ select([self.column])
+ .select_from(self.table)
+ .where(self.column == self.criterion)
+ .order_by(self.column)
+ )
self._do_test(s)
def test_case(self):
@@ -3029,8 +3361,11 @@ class ExecutionOptionsTest(fixtures.TestBase):
def test_embedded_element_true_to_false(self):
stmt = table1.insert().cte()
eq_(stmt._execution_options, {"autocommit": True})
- s2 = select([table1]).select_from(stmt).\
- execution_options(autocommit=False)
+ s2 = (
+ select([table1])
+ .select_from(stmt)
+ .execution_options(autocommit=False)
+ )
eq_(s2._execution_options, {"autocommit": False})
compiled = s2.compile()
@@ -3038,7 +3373,7 @@ class ExecutionOptionsTest(fixtures.TestBase):
class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = 'default'
+ __dialect__ = "default"
def _illegal_type_fixture(self):
class MyType(types.TypeEngine):
@@ -3047,195 +3382,197 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
@compiles(MyType)
def compile(element, compiler, **kw):
raise exc.CompileError("Couldn't compile type")
+
return MyType
def test_reraise_of_column_spec_issue(self):
MyType = self._illegal_type_fixture()
- t1 = Table('t', MetaData(),
- Column('x', MyType())
- )
+ t1 = Table("t", MetaData(), Column("x", MyType()))
assert_raises_message(
exc.CompileError,
r"\(in table 't', column 'x'\): Couldn't compile type",
- schema.CreateTable(t1).compile
+ schema.CreateTable(t1).compile,
)
def test_reraise_of_column_spec_issue_unicode(self):
MyType = self._illegal_type_fixture()
- t1 = Table('t', MetaData(),
- Column(u('méil'), MyType())
- )
+ t1 = Table("t", MetaData(), Column(u("méil"), MyType()))
assert_raises_message(
exc.CompileError,
u(r"\(in table 't', column 'méil'\): Couldn't compile type"),
- schema.CreateTable(t1).compile
+ schema.CreateTable(t1).compile,
)
def test_system_flag(self):
m = MetaData()
- t = Table('t', m, Column('x', Integer),
- Column('y', Integer, system=True),
- Column('z', Integer))
+ t = Table(
+ "t",
+ m,
+ Column("x", Integer),
+ Column("y", Integer, system=True),
+ Column("z", Integer),
+ )
self.assert_compile(
- schema.CreateTable(t),
- "CREATE TABLE t (x INTEGER, z INTEGER)"
+ schema.CreateTable(t), "CREATE TABLE t (x INTEGER, z INTEGER)"
)
m2 = MetaData()
t2 = t.tometadata(m2)
self.assert_compile(
- schema.CreateTable(t2),
- "CREATE TABLE t (x INTEGER, z INTEGER)"
+ schema.CreateTable(t2), "CREATE TABLE t (x INTEGER, z INTEGER)"
)
def test_composite_pk_constraint_autoinc_first_implicit(self):
m = MetaData()
t = Table(
- 't', m,
- Column('a', Integer, primary_key=True),
- Column('b', Integer, primary_key=True, autoincrement=True)
+ "t",
+ m,
+ Column("a", Integer, primary_key=True),
+ Column("b", Integer, primary_key=True, autoincrement=True),
)
self.assert_compile(
schema.CreateTable(t),
"CREATE TABLE t ("
"a INTEGER NOT NULL, "
"b INTEGER NOT NULL, "
- "PRIMARY KEY (b, a))"
+ "PRIMARY KEY (b, a))",
)
def test_composite_pk_constraint_maintains_order_explicit(self):
m = MetaData()
t = Table(
- 't', m,
- Column('a', Integer),
- Column('b', Integer, autoincrement=True),
- schema.PrimaryKeyConstraint('a', 'b')
+ "t",
+ m,
+ Column("a", Integer),
+ Column("b", Integer, autoincrement=True),
+ schema.PrimaryKeyConstraint("a", "b"),
)
self.assert_compile(
schema.CreateTable(t),
"CREATE TABLE t ("
"a INTEGER NOT NULL, "
"b INTEGER NOT NULL, "
- "PRIMARY KEY (a, b))"
+ "PRIMARY KEY (a, b))",
)
def test_create_table_suffix(self):
class MyDialect(default.DefaultDialect):
class MyCompiler(compiler.DDLCompiler):
def create_table_suffix(self, table):
- return 'SOME SUFFIX'
+ return "SOME SUFFIX"
ddl_compiler = MyCompiler
m = MetaData()
- t1 = Table('t1', m, Column('q', Integer))
+ t1 = Table("t1", m, Column("q", Integer))
self.assert_compile(
schema.CreateTable(t1),
"CREATE TABLE t1 SOME SUFFIX (q INTEGER)",
- dialect=MyDialect()
+ dialect=MyDialect(),
)
def test_table_no_cols(self):
m = MetaData()
- t1 = Table('t1', m)
- self.assert_compile(
- schema.CreateTable(t1),
- "CREATE TABLE t1 ()"
- )
+ t1 = Table("t1", m)
+ self.assert_compile(schema.CreateTable(t1), "CREATE TABLE t1 ()")
def test_table_no_cols_w_constraint(self):
m = MetaData()
- t1 = Table('t1', m, CheckConstraint('a = 1'))
+ t1 = Table("t1", m, CheckConstraint("a = 1"))
self.assert_compile(
- schema.CreateTable(t1),
- "CREATE TABLE t1 (CHECK (a = 1))"
+ schema.CreateTable(t1), "CREATE TABLE t1 (CHECK (a = 1))"
)
def test_table_one_col_w_constraint(self):
m = MetaData()
- t1 = Table('t1', m, Column('q', Integer), CheckConstraint('a = 1'))
+ t1 = Table("t1", m, Column("q", Integer), CheckConstraint("a = 1"))
self.assert_compile(
schema.CreateTable(t1),
- "CREATE TABLE t1 (q INTEGER, CHECK (a = 1))"
+ "CREATE TABLE t1 (q INTEGER, CHECK (a = 1))",
)
def test_schema_translate_map_table(self):
m = MetaData()
- t1 = Table('t1', m, Column('q', Integer))
- t2 = Table('t2', m, Column('q', Integer), schema='foo')
- t3 = Table('t3', m, Column('q', Integer), schema='bar')
+ t1 = Table("t1", m, Column("q", Integer))
+ t2 = Table("t2", m, Column("q", Integer), schema="foo")
+ t3 = Table("t3", m, Column("q", Integer), schema="bar")
schema_translate_map = {None: "z", "bar": None, "foo": "bat"}
self.assert_compile(
schema.CreateTable(t1),
"CREATE TABLE z.t1 (q INTEGER)",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
schema.CreateTable(t2),
"CREATE TABLE bat.t2 (q INTEGER)",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
schema.CreateTable(t3),
"CREATE TABLE t3 (q INTEGER)",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
def test_schema_translate_map_sequence(self):
- s1 = schema.Sequence('s1')
- s2 = schema.Sequence('s2', schema='foo')
- s3 = schema.Sequence('s3', schema='bar')
+ s1 = schema.Sequence("s1")
+ s2 = schema.Sequence("s2", schema="foo")
+ s3 = schema.Sequence("s3", schema="bar")
schema_translate_map = {None: "z", "bar": None, "foo": "bat"}
self.assert_compile(
schema.CreateSequence(s1),
"CREATE SEQUENCE z.s1",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
schema.CreateSequence(s2),
"CREATE SEQUENCE bat.s2",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
schema.CreateSequence(s3),
"CREATE SEQUENCE s3",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = 'default'
+ __dialect__ = "default"
def test_select(self):
- self.assert_compile(table4.select(),
- "SELECT remote_owner.remotetable.rem_id, "
- "remote_owner.remotetable.datatype_id,"
- " remote_owner.remotetable.value "
- "FROM remote_owner.remotetable")
+ self.assert_compile(
+ table4.select(),
+ "SELECT remote_owner.remotetable.rem_id, "
+ "remote_owner.remotetable.datatype_id,"
+ " remote_owner.remotetable.value "
+ "FROM remote_owner.remotetable",
+ )
self.assert_compile(
table4.select(
- and_(
- table4.c.datatype_id == 7,
- table4.c.value == 'hi')),
+ and_(table4.c.datatype_id == 7, table4.c.value == "hi")
+ ),
"SELECT remote_owner.remotetable.rem_id, "
"remote_owner.remotetable.datatype_id,"
" remote_owner.remotetable.value "
"FROM remote_owner.remotetable WHERE "
"remote_owner.remotetable.datatype_id = :datatype_id_1 AND"
- " remote_owner.remotetable.value = :value_1")
+ " remote_owner.remotetable.value = :value_1",
+ )
- s = table4.select(and_(table4.c.datatype_id == 7,
- table4.c.value == 'hi'), use_labels=True)
+ s = table4.select(
+ and_(table4.c.datatype_id == 7, table4.c.value == "hi"),
+ use_labels=True,
+ )
self.assert_compile(
- s, "SELECT remote_owner.remotetable.rem_id AS"
+ s,
+ "SELECT remote_owner.remotetable.rem_id AS"
" remote_owner_remotetable_rem_id, "
"remote_owner.remotetable.datatype_id AS"
" remote_owner_remotetable_datatype_id, "
@@ -3243,86 +3580,94 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
"AS remote_owner_remotetable_value FROM "
"remote_owner.remotetable WHERE "
"remote_owner.remotetable.datatype_id = :datatype_id_1 AND "
- "remote_owner.remotetable.value = :value_1")
+ "remote_owner.remotetable.value = :value_1",
+ )
# multi-part schema name
- self.assert_compile(table5.select(),
- 'SELECT "dbo.remote_owner".remotetable.rem_id, '
- '"dbo.remote_owner".remotetable.datatype_id, '
- '"dbo.remote_owner".remotetable.value '
- 'FROM "dbo.remote_owner".remotetable'
- )
+ self.assert_compile(
+ table5.select(),
+ 'SELECT "dbo.remote_owner".remotetable.rem_id, '
+ '"dbo.remote_owner".remotetable.datatype_id, '
+ '"dbo.remote_owner".remotetable.value '
+ 'FROM "dbo.remote_owner".remotetable',
+ )
# multi-part schema name labels - convert '.' to '_'
- self.assert_compile(table5.select(use_labels=True),
- 'SELECT "dbo.remote_owner".remotetable.rem_id AS'
- ' dbo_remote_owner_remotetable_rem_id, '
- '"dbo.remote_owner".remotetable.datatype_id'
- ' AS dbo_remote_owner_remotetable_datatype_id,'
- ' "dbo.remote_owner".remotetable.value AS '
- 'dbo_remote_owner_remotetable_value FROM'
- ' "dbo.remote_owner".remotetable'
- )
+ self.assert_compile(
+ table5.select(use_labels=True),
+ 'SELECT "dbo.remote_owner".remotetable.rem_id AS'
+ " dbo_remote_owner_remotetable_rem_id, "
+ '"dbo.remote_owner".remotetable.datatype_id'
+ " AS dbo_remote_owner_remotetable_datatype_id,"
+ ' "dbo.remote_owner".remotetable.value AS '
+ "dbo_remote_owner_remotetable_value FROM"
+ ' "dbo.remote_owner".remotetable',
+ )
def test_schema_translate_select(self):
m = MetaData()
table1 = Table(
- 'mytable', m, Column('myid', Integer),
- Column('name', String),
- Column('description', String)
+ "mytable",
+ m,
+ Column("myid", Integer),
+ Column("name", String),
+ Column("description", String),
)
- schema_translate_map = {"remote_owner": "foob", None: 'bar'}
+ schema_translate_map = {"remote_owner": "foob", None: "bar"}
self.assert_compile(
- table1.select().where(table1.c.name == 'hi'),
+ table1.select().where(table1.c.name == "hi"),
"SELECT bar.mytable.myid, bar.mytable.name, "
"bar.mytable.description FROM bar.mytable "
"WHERE bar.mytable.name = :name_1",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
- table4.select().where(table4.c.value == 'hi'),
+ table4.select().where(table4.c.value == "hi"),
"SELECT foob.remotetable.rem_id, foob.remotetable.datatype_id, "
"foob.remotetable.value FROM foob.remotetable "
"WHERE foob.remotetable.value = :value_1",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
schema_translate_map = {"remote_owner": "foob"}
self.assert_compile(
- select([
- table1, table4
- ]).select_from(
+ select([table1, table4]).select_from(
join(table1, table4, table1.c.myid == table4.c.rem_id)
),
"SELECT mytable.myid, mytable.name, mytable.description, "
"foob.remotetable.rem_id, foob.remotetable.datatype_id, "
"foob.remotetable.value FROM mytable JOIN foob.remotetable "
"ON mytable.myid = foob.remotetable.rem_id",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
def test_schema_translate_aliases(self):
- schema_translate_map = {None: 'bar'}
+ schema_translate_map = {None: "bar"}
m = MetaData()
table1 = Table(
- 'mytable', m, Column('myid', Integer),
- Column('name', String),
- Column('description', String)
+ "mytable",
+ m,
+ Column("myid", Integer),
+ Column("name", String),
+ Column("description", String),
)
table2 = Table(
- 'myothertable', m, Column('otherid', Integer),
- Column('othername', String),
+ "myothertable",
+ m,
+ Column("otherid", Integer),
+ Column("othername", String),
)
alias = table1.alias()
- stmt = select([
- table2, alias
- ]).select_from(table2.join(alias, table2.c.otherid == alias.c.myid)).\
- where(alias.c.name == 'foo')
+ stmt = (
+ select([table2, alias])
+ .select_from(table2.join(alias, table2.c.otherid == alias.c.myid))
+ .where(alias.c.name == "foo")
+ )
self.assert_compile(
stmt,
@@ -3331,109 +3676,122 @@ class SchemaTest(fixtures.TestBase, AssertsCompiledSQL):
"FROM bar.myothertable JOIN bar.mytable AS mytable_1 "
"ON bar.myothertable.otherid = mytable_1.myid "
"WHERE mytable_1.name = :name_1",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
def test_schema_translate_crud(self):
- schema_translate_map = {"remote_owner": "foob", None: 'bar'}
+ schema_translate_map = {"remote_owner": "foob", None: "bar"}
m = MetaData()
table1 = Table(
- 'mytable', m,
- Column('myid', Integer), Column('name', String),
- Column('description', String)
+ "mytable",
+ m,
+ Column("myid", Integer),
+ Column("name", String),
+ Column("description", String),
)
self.assert_compile(
- table1.insert().values(description='foo'),
+ table1.insert().values(description="foo"),
"INSERT INTO bar.mytable (description) VALUES (:description)",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
- table1.update().where(table1.c.name == 'hi').
- values(description='foo'),
+ table1.update()
+ .where(table1.c.name == "hi")
+ .values(description="foo"),
"UPDATE bar.mytable SET description=:description "
"WHERE bar.mytable.name = :name_1",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
- table1.delete().where(table1.c.name == 'hi'),
+ table1.delete().where(table1.c.name == "hi"),
"DELETE FROM bar.mytable WHERE bar.mytable.name = :name_1",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
- table4.insert().values(value='there'),
+ table4.insert().values(value="there"),
"INSERT INTO foob.remotetable (value) VALUES (:value)",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
- table4.update().where(table4.c.value == 'hi').
- values(value='there'),
+ table4.update()
+ .where(table4.c.value == "hi")
+ .values(value="there"),
"UPDATE foob.remotetable SET value=:value "
"WHERE foob.remotetable.value = :value_1",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
self.assert_compile(
- table4.delete().where(table4.c.value == 'hi'),
+ table4.delete().where(table4.c.value == "hi"),
"DELETE FROM foob.remotetable WHERE "
"foob.remotetable.value = :value_1",
- schema_translate_map=schema_translate_map
+ schema_translate_map=schema_translate_map,
)
def test_alias(self):
- a = alias(table4, 'remtable')
- self.assert_compile(a.select(a.c.datatype_id == 7),
- "SELECT remtable.rem_id, remtable.datatype_id, "
- "remtable.value FROM"
- " remote_owner.remotetable AS remtable "
- "WHERE remtable.datatype_id = :datatype_id_1")
+ a = alias(table4, "remtable")
+ self.assert_compile(
+ a.select(a.c.datatype_id == 7),
+ "SELECT remtable.rem_id, remtable.datatype_id, "
+ "remtable.value FROM"
+ " remote_owner.remotetable AS remtable "
+ "WHERE remtable.datatype_id = :datatype_id_1",
+ )
def test_update(self):
self.assert_compile(
- table4.update(table4.c.value == 'test',
- values={table4.c.datatype_id: 12}),
+ table4.update(
+ table4.c.value == "test", values={table4.c.datatype_id: 12}
+ ),
"UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "
- "WHERE remote_owner.remotetable.value = :value_1")
+ "WHERE remote_owner.remotetable.value = :value_1",
+ )
def test_insert(self):
- self.assert_compile(table4.insert(values=(2, 5, 'test')),
- "INSERT INTO remote_owner.remotetable "
- "(rem_id, datatype_id, value) VALUES "
- "(:rem_id, :datatype_id, :value)")
+ self.assert_compile(
+ table4.insert(values=(2, 5, "test")),
+ "INSERT INTO remote_owner.remotetable "
+ "(rem_id, datatype_id, value) VALUES "
+ "(:rem_id, :datatype_id, :value)",
+ )
class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL):
- __dialect__ = 'default'
+ __dialect__ = "default"
def test_dont_overcorrelate(self):
- self.assert_compile(select([table1], from_obj=[table1,
- table1.select()]),
- "SELECT mytable.myid, mytable.name, "
- "mytable.description FROM mytable, (SELECT "
- "mytable.myid AS myid, mytable.name AS "
- "name, mytable.description AS description "
- "FROM mytable)")
+ self.assert_compile(
+ select([table1], from_obj=[table1, table1.select()]),
+ "SELECT mytable.myid, mytable.name, "
+ "mytable.description FROM mytable, (SELECT "
+ "mytable.myid AS myid, mytable.name AS "
+ "name, mytable.description AS description "
+ "FROM mytable)",
+ )
def _fixture(self):
- t1 = table('t1', column('a'))
- t2 = table('t2', column('a'))
+ t1 = table("t1", column("a"))
+ t2 = table("t2", column("a"))
return t1, t2, select([t1]).where(t1.c.a == t2.c.a)
def _assert_where_correlated(self, stmt):
self.assert_compile(
stmt,
"SELECT t2.a FROM t2 WHERE t2.a = "
- "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)")
+ "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)",
+ )
def _assert_where_all_correlated(self, stmt):
self.assert_compile(
stmt,
"SELECT t1.a, t2.a FROM t1, t2 WHERE t2.a = "
- "(SELECT t1.a WHERE t1.a = t2.a)")
+ "(SELECT t1.a WHERE t1.a = t2.a)",
+ )
# note there's no more "backwards" correlation after
# we've done #2746
@@ -3452,171 +3810,197 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
stmt,
"SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) "
- "AS anon_1 FROM t2")
+ "AS anon_1 FROM t2",
+ )
def _assert_column_all_correlated(self, stmt):
self.assert_compile(
stmt,
"SELECT t1.a, t2.a, "
- "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2")
+ "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2",
+ )
def _assert_having_correlated(self, stmt):
- self.assert_compile(stmt,
- "SELECT t2.a FROM t2 HAVING t2.a = "
- "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)")
+ self.assert_compile(
+ stmt,
+ "SELECT t2.a FROM t2 HAVING t2.a = "
+ "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)",
+ )
def _assert_from_uncorrelated(self, stmt):
self.assert_compile(
stmt,
"SELECT t2.a, anon_1.a FROM t2, "
- "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1")
+ "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1",
+ )
def _assert_from_all_uncorrelated(self, stmt):
self.assert_compile(
stmt,
"SELECT t1.a, t2.a, anon_1.a FROM t1, t2, "
- "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1")
+ "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1",
+ )
def _assert_where_uncorrelated(self, stmt):
- self.assert_compile(stmt,
- "SELECT t2.a FROM t2 WHERE t2.a = "
- "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)")
+ self.assert_compile(
+ stmt,
+ "SELECT t2.a FROM t2 WHERE t2.a = "
+ "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)",
+ )
def _assert_column_uncorrelated(self, stmt):
- self.assert_compile(stmt,
- "SELECT t2.a, (SELECT t1.a FROM t1, t2 "
- "WHERE t1.a = t2.a) AS anon_1 FROM t2")
+ self.assert_compile(
+ stmt,
+ "SELECT t2.a, (SELECT t1.a FROM t1, t2 "
+ "WHERE t1.a = t2.a) AS anon_1 FROM t2",
+ )
def _assert_having_uncorrelated(self, stmt):
- self.assert_compile(stmt,
- "SELECT t2.a FROM t2 HAVING t2.a = "
- "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)")
+ self.assert_compile(
+ stmt,
+ "SELECT t2.a FROM t2 HAVING t2.a = "
+ "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)",
+ )
def _assert_where_single_full_correlated(self, stmt):
- self.assert_compile(stmt,
- "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)")
+ self.assert_compile(
+ stmt, "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)"
+ )
def test_correlate_semiauto_where(self):
t1, t2, s1 = self._fixture()
self._assert_where_correlated(
- select([t2]).where(t2.c.a == s1.correlate(t2)))
+ select([t2]).where(t2.c.a == s1.correlate(t2))
+ )
def test_correlate_semiauto_column(self):
t1, t2, s1 = self._fixture()
self._assert_column_correlated(
- select([t2, s1.correlate(t2).as_scalar()]))
+ select([t2, s1.correlate(t2).as_scalar()])
+ )
def test_correlate_semiauto_from(self):
t1, t2, s1 = self._fixture()
- self._assert_from_uncorrelated(
- select([t2, s1.correlate(t2).alias()]))
+ self._assert_from_uncorrelated(select([t2, s1.correlate(t2).alias()]))
def test_correlate_semiauto_having(self):
t1, t2, s1 = self._fixture()
self._assert_having_correlated(
- select([t2]).having(t2.c.a == s1.correlate(t2)))
+ select([t2]).having(t2.c.a == s1.correlate(t2))
+ )
def test_correlate_except_inclusion_where(self):
t1, t2, s1 = self._fixture()
self._assert_where_correlated(
- select([t2]).where(t2.c.a == s1.correlate_except(t1)))
+ select([t2]).where(t2.c.a == s1.correlate_except(t1))
+ )
def test_correlate_except_exclusion_where(self):
t1, t2, s1 = self._fixture()
self._assert_where_uncorrelated(
- select([t2]).where(t2.c.a == s1.correlate_except(t2)))
+ select([t2]).where(t2.c.a == s1.correlate_except(t2))
+ )
def test_correlate_except_inclusion_column(self):
t1, t2, s1 = self._fixture()
self._assert_column_correlated(
- select([t2, s1.correlate_except(t1).as_scalar()]))
+ select([t2, s1.correlate_except(t1).as_scalar()])
+ )
def test_correlate_except_exclusion_column(self):
t1, t2, s1 = self._fixture()
self._assert_column_uncorrelated(
- select([t2, s1.correlate_except(t2).as_scalar()]))
+ select([t2, s1.correlate_except(t2).as_scalar()])
+ )
def test_correlate_except_inclusion_from(self):
t1, t2, s1 = self._fixture()
self._assert_from_uncorrelated(
- select([t2, s1.correlate_except(t1).alias()]))
+ select([t2, s1.correlate_except(t1).alias()])
+ )
def test_correlate_except_exclusion_from(self):
t1, t2, s1 = self._fixture()
self._assert_from_uncorrelated(
- select([t2, s1.correlate_except(t2).alias()]))
+ select([t2, s1.correlate_except(t2).alias()])
+ )
def test_correlate_except_none(self):
t1, t2, s1 = self._fixture()
self._assert_where_all_correlated(
- select([t1, t2]).where(t2.c.a == s1.correlate_except(None)))
+ select([t1, t2]).where(t2.c.a == s1.correlate_except(None))
+ )
def test_correlate_except_having(self):
t1, t2, s1 = self._fixture()
self._assert_having_correlated(
- select([t2]).having(t2.c.a == s1.correlate_except(t1)))
+ select([t2]).having(t2.c.a == s1.correlate_except(t1))
+ )
def test_correlate_auto_where(self):
t1, t2, s1 = self._fixture()
- self._assert_where_correlated(
- select([t2]).where(t2.c.a == s1))
+ self._assert_where_correlated(select([t2]).where(t2.c.a == s1))
def test_correlate_auto_column(self):
t1, t2, s1 = self._fixture()
- self._assert_column_correlated(
- select([t2, s1.as_scalar()]))
+ self._assert_column_correlated(select([t2, s1.as_scalar()]))
def test_correlate_auto_from(self):
t1, t2, s1 = self._fixture()
- self._assert_from_uncorrelated(
- select([t2, s1.alias()]))
+ self._assert_from_uncorrelated(select([t2, s1.alias()]))
def test_correlate_auto_having(self):
t1, t2, s1 = self._fixture()
- self._assert_having_correlated(
- select([t2]).having(t2.c.a == s1))
+ self._assert_having_correlated(select([t2]).having(t2.c.a == s1))
def test_correlate_disabled_where(self):
t1, t2, s1 = self._fixture()
self._assert_where_uncorrelated(
- select([t2]).where(t2.c.a == s1.correlate(None)))
+ select([t2]).where(t2.c.a == s1.correlate(None))
+ )
def test_correlate_disabled_column(self):
t1, t2, s1 = self._fixture()
self._assert_column_uncorrelated(
- select([t2, s1.correlate(None).as_scalar()]))
+ select([t2, s1.correlate(None).as_scalar()])
+ )
def test_correlate_disabled_from(self):
t1, t2, s1 = self._fixture()
self._assert_from_uncorrelated(
- select([t2, s1.correlate(None).alias()]))
+ select([t2, s1.correlate(None).alias()])
+ )
def test_correlate_disabled_having(self):
t1, t2, s1 = self._fixture()
self._assert_having_uncorrelated(
- select([t2]).having(t2.c.a == s1.correlate(None)))
+ select([t2]).having(t2.c.a == s1.correlate(None))
+ )
def test_correlate_all_where(self):
t1, t2, s1 = self._fixture()
self._assert_where_all_correlated(
- select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2)))
+ select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2))
+ )
def test_correlate_all_column(self):
t1, t2, s1 = self._fixture()
self._assert_column_all_correlated(
- select([t1, t2, s1.correlate(t1, t2).as_scalar()]))
+ select([t1, t2, s1.correlate(t1, t2).as_scalar()])
+ )
def test_correlate_all_from(self):
t1, t2, s1 = self._fixture()
self._assert_from_all_uncorrelated(
- select([t1, t2, s1.correlate(t1, t2).alias()]))
+ select([t1, t2, s1.correlate(t1, t2).alias()])
+ )
def test_correlate_where_all_unintentional(self):
t1, t2, s1 = self._fixture()
assert_raises_message(
exc.InvalidRequestError,
"returned no FROM clauses due to auto-correlation",
- select([t1, t2]).where(t2.c.a == s1).compile
+ select([t1, t2]).where(t2.c.a == s1).compile,
)
def test_correlate_from_all_ok(self):
@@ -3624,16 +4008,16 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL):
self.assert_compile(
select([t1, t2, s1]),
"SELECT t1.a, t2.a, a FROM t1, t2, "
- "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a)"
+ "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a)",
)
def test_correlate_auto_where_singlefrom(self):
t1, t2, s1 = self._fixture()
s = select([t1.c.a])
s2 = select([t1]).where(t1.c.a == s)
- self.assert_compile(s2,
- "SELECT t1.a FROM t1 WHERE t1.a = "
- "(SELECT t1.a FROM t1)")
+ self.assert_compile(
+ s2, "SELECT t1.a FROM t1 WHERE t1.a = " "(SELECT t1.a FROM t1)"
+ )
def test_correlate_semiauto_where_singlefrom(self):
t1, t2, s1 = self._fixture()
@@ -3654,89 +4038,103 @@ class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL):
def test_correlate_alone_noeffect(self):
# new as of #2668
t1, t2, s1 = self._fixture()
- self.assert_compile(s1.correlate(t1, t2),
- "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a")
+ self.assert_compile(
+ s1.correlate(t1, t2), "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a"
+ )
def test_correlate_except_froms(self):
# new as of #2748
- t1 = table('t1', column('a'))
- t2 = table('t2', column('a'), column('b'))
+ t1 = table("t1", column("a"))
+ t2 = table("t2", column("a"), column("b"))
s = select([t2.c.b]).where(t1.c.a == t2.c.a)
- s = s.correlate_except(t2).alias('s')
+ s = s.correlate_except(t2).alias("s")
s2 = select([func.foo(s.c.b)]).as_scalar()
s3 = select([t1], order_by=s2)
self.assert_compile(
- s3, "SELECT t1.a FROM t1 ORDER BY "
+ s3,
+ "SELECT t1.a FROM t1 ORDER BY "
"(SELECT foo(s.b) AS foo_1 FROM "
- "(SELECT t2.b AS b FROM t2 WHERE t1.a = t2.a) AS s)")
+ "(SELECT t2.b AS b FROM t2 WHERE t1.a = t2.a) AS s)",
+ )
def test_multilevel_froms_correlation(self):
# new as of #2748
- p = table('parent', column('id'))
- c = table('child', column('id'), column('parent_id'), column('pos'))
+ p = table("parent", column("id"))
+ c = table("child", column("id"), column("parent_id"), column("pos"))
- s = c.select().where(
- c.c.parent_id == p.c.id).order_by(
- c.c.pos).limit(1)
+ s = (
+ c.select()
+ .where(c.c.parent_id == p.c.id)
+ .order_by(c.c.pos)
+ .limit(1)
+ )
s = s.correlate(p)
s = exists().select_from(s).where(s.c.id == 1)
s = select([p]).where(s)
self.assert_compile(
- s, "SELECT parent.id FROM parent WHERE EXISTS (SELECT * "
+ s,
+ "SELECT parent.id FROM parent WHERE EXISTS (SELECT * "
"FROM (SELECT child.id AS id, child.parent_id AS parent_id, "
"child.pos AS pos FROM child WHERE child.parent_id = parent.id "
- "ORDER BY child.pos LIMIT :param_1) WHERE id = :id_1)")
+ "ORDER BY child.pos LIMIT :param_1) WHERE id = :id_1)",
+ )
def test_no_contextless_correlate_except(self):
# new as of #2748
- t1 = table('t1', column('x'))
- t2 = table('t2', column('y'))
- t3 = table('t3', column('z'))
+ t1 = table("t1", column("x"))
+ t2 = table("t2", column("y"))
+ t3 = table("t3", column("z"))
- s = select([t1]).where(t1.c.x == t2.c.y).\
- where(t2.c.y == t3.c.z).correlate_except(t1)
+ s = (
+ select([t1])
+ .where(t1.c.x == t2.c.y)
+ .where(t2.c.y == t3.c.z)
+ .correlate_except(t1)
+ )
self.assert_compile(
- s,
- "SELECT t1.x FROM t1, t2, t3 WHERE t1.x = t2.y AND t2.y = t3.z")
+ s, "SELECT t1.x FROM t1, t2, t3 WHERE t1.x = t2.y AND t2.y = t3.z"
+ )
def test_multilevel_implicit_correlation_disabled(self):
# test that implicit correlation with multilevel WHERE correlation
# behaves like 0.8.1, 0.7 (i.e. doesn't happen)
- t1 = table('t1', column('x'))
- t2 = table('t2', column('y'))
- t3 = table('t3', column('z'))
+ t1 = table("t1", column("x"))
+ t2 = table("t2", column("y"))
+ t3 = table("t3", column("z"))
s = select([t1.c.x]).where(t1.c.x == t2.c.y)
s2 = select([t3.c.z]).where(t3.c.z == s.as_scalar())
s3 = select([t1]).where(t1.c.x == s2.as_scalar())
- self.assert_compile(s3,
- "SELECT t1.x FROM t1 "
- "WHERE t1.x = (SELECT t3.z "
- "FROM t3 "
- "WHERE t3.z = (SELECT t1.x "
- "FROM t1, t2 "
- "WHERE t1.x = t2.y))"
- )
+ self.assert_compile(
+ s3,
+ "SELECT t1.x FROM t1 "
+ "WHERE t1.x = (SELECT t3.z "
+ "FROM t3 "
+ "WHERE t3.z = (SELECT t1.x "
+ "FROM t1, t2 "
+ "WHERE t1.x = t2.y))",
+ )
def test_from_implicit_correlation_disabled(self):
# test that implicit correlation with immediate and
# multilevel FROM clauses behaves like 0.8.1 (i.e. doesn't happen)
- t1 = table('t1', column('x'))
- t2 = table('t2', column('y'))
+ t1 = table("t1", column("x"))
+ t2 = table("t2", column("y"))
s = select([t1.c.x]).where(t1.c.x == t2.c.y)
s2 = select([t2, s])
s3 = select([t1, s2])
- self.assert_compile(s3,
- "SELECT t1.x, y, x FROM t1, "
- "(SELECT t2.y AS y, x FROM t2, "
- "(SELECT t1.x AS x FROM t1, t2 WHERE t1.x = t2.y))"
- )
+ self.assert_compile(
+ s3,
+ "SELECT t1.x, y, x FROM t1, "
+ "(SELECT t2.y AS y, x FROM t2, "
+ "(SELECT t1.x AS x FROM t1, t2 WHERE t1.x = t2.y))",
+ )
class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
@@ -3744,28 +4142,27 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
def _fixture(self):
m = MetaData()
- return Table('foo', m,
- Column('id', Integer))
+ return Table("foo", m, Column("id", Integer))
- bool_table = table('t', column('x', Boolean))
+ bool_table = table("t", column("x", Boolean))
def test_coerce_bool_where(self):
self.assert_compile(
select([self.bool_table]).where(self.bool_table.c.x),
- "SELECT t.x FROM t WHERE t.x"
+ "SELECT t.x FROM t WHERE t.x",
)
def test_coerce_bool_where_non_native(self):
self.assert_compile(
select([self.bool_table]).where(self.bool_table.c.x),
"SELECT t.x FROM t WHERE t.x = 1",
- dialect=default.DefaultDialect(supports_native_boolean=False)
+ dialect=default.DefaultDialect(supports_native_boolean=False),
)
self.assert_compile(
select([self.bool_table]).where(~self.bool_table.c.x),
"SELECT t.x FROM t WHERE t.x = 0",
- dialect=default.DefaultDialect(supports_native_boolean=False)
+ dialect=default.DefaultDialect(supports_native_boolean=False),
)
def test_null_constant(self):
@@ -3779,40 +4176,35 @@ class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
def test_val_and_false(self):
t = self._fixture()
- self.assert_compile(and_(t.c.id == 1, False),
- "false")
+ self.assert_compile(and_(t.c.id == 1, False), "false")
def test_val_and_true_coerced(self):
t = self._fixture()
- self.assert_compile(and_(t.c.id == 1, True),
- "foo.id = :id_1")
+ self.assert_compile(and_(t.c.id == 1, True), "foo.id = :id_1")
def test_val_is_null_coerced(self):
t = self._fixture()
- self.assert_compile(and_(t.c.id == None), # noqa
- "foo.id IS NULL")
+ self.assert_compile(and_(t.c.id == None), "foo.id IS NULL") # noqa
def test_val_and_None(self):
t = self._fixture()
- self.assert_compile(and_(t.c.id == 1, None),
- "foo.id = :id_1 AND NULL")
+ self.assert_compile(and_(t.c.id == 1, None), "foo.id = :id_1 AND NULL")
def test_None_and_val(self):
t = self._fixture()
- self.assert_compile(and_(None, t.c.id == 1),
- "NULL AND foo.id = :id_1")
+ self.assert_compile(and_(None, t.c.id == 1), "NULL AND foo.id = :id_1")
def test_None_and_nothing(self):
# current convention is None in and_()
# returns None May want
# to revise this at some point.
- self.assert_compile(
- and_(None), "NULL")
+ self.assert_compile(and_(None), "NULL")
def test_val_and_null(self):
t = self._fixture()
- self.assert_compile(and_(t.c.id == 1, null()),
- "foo.id = :id_1 AND NULL")
+ self.assert_compile(
+ and_(t.c.id == 1, null()), "foo.id = :id_1 AND NULL"
+ )
class ResultMapTest(fixtures.TestBase):
@@ -3823,101 +4215,109 @@ class ResultMapTest(fixtures.TestBase):
"""
def test_compound_populates(self):
- t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer))
+ t = Table("t", MetaData(), Column("a", Integer), Column("b", Integer))
stmt = select([t]).union(select([t]))
comp = stmt.compile()
eq_(
comp._create_result_map(),
- {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type),
- 'b': ('b', (t.c.b, 'b', 'b'), t.c.b.type)}
+ {
+ "a": ("a", (t.c.a, "a", "a"), t.c.a.type),
+ "b": ("b", (t.c.b, "b", "b"), t.c.b.type),
+ },
)
def test_compound_not_toplevel_doesnt_populate(self):
- t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer))
+ t = Table("t", MetaData(), Column("a", Integer), Column("b", Integer))
subq = select([t]).union(select([t]))
stmt = select([t.c.a]).select_from(t.join(subq, t.c.a == subq.c.a))
comp = stmt.compile()
eq_(
comp._create_result_map(),
- {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)}
+ {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)},
)
def test_compound_only_top_populates(self):
- t = Table('t', MetaData(), Column('a', Integer), Column('b', Integer))
+ t = Table("t", MetaData(), Column("a", Integer), Column("b", Integer))
stmt = select([t.c.a]).union(select([t.c.b]))
comp = stmt.compile()
eq_(
comp._create_result_map(),
- {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)},
+ {"a": ("a", (t.c.a, "a", "a"), t.c.a.type)},
)
def test_label_plus_element(self):
- t = Table('t', MetaData(), Column('a', Integer))
- l1 = t.c.a.label('bar')
+ t = Table("t", MetaData(), Column("a", Integer))
+ l1 = t.c.a.label("bar")
tc = type_coerce(t.c.a, String)
stmt = select([t.c.a, l1, tc])
comp = stmt.compile()
- tc_anon_label = comp._create_result_map()['anon_1'][1][0]
+ tc_anon_label = comp._create_result_map()["anon_1"][1][0]
eq_(
comp._create_result_map(),
{
- 'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type),
- 'bar': ('bar', (l1, 'bar'), l1.type),
- 'anon_1': (
- '%%(%d anon)s' % id(tc),
- (tc_anon_label, 'anon_1', tc), tc.type),
+ "a": ("a", (t.c.a, "a", "a"), t.c.a.type),
+ "bar": ("bar", (l1, "bar"), l1.type),
+ "anon_1": (
+ "%%(%d anon)s" % id(tc),
+ (tc_anon_label, "anon_1", tc),
+ tc.type,
+ ),
},
)
def test_label_conflict_union(self):
- t1 = Table('t1', MetaData(), Column('a', Integer),
- Column('b', Integer))
- t2 = Table('t2', MetaData(), Column('t1_a', Integer))
+ t1 = Table(
+ "t1", MetaData(), Column("a", Integer), Column("b", Integer)
+ )
+ t2 = Table("t2", MetaData(), Column("t1_a", Integer))
union = select([t2]).union(select([t2])).alias()
t1_alias = t1.alias()
- stmt = select([t1, t1_alias]).select_from(
- t1.join(union, t1.c.a == union.c.t1_a)).apply_labels()
+ stmt = (
+ select([t1, t1_alias])
+ .select_from(t1.join(union, t1.c.a == union.c.t1_a))
+ .apply_labels()
+ )
comp = stmt.compile()
eq_(
set(comp._create_result_map()),
- set(['t1_1_b', 't1_1_a', 't1_a', 't1_b'])
- )
- is_(
- comp._create_result_map()['t1_a'][1][2], t1.c.a
+ set(["t1_1_b", "t1_1_a", "t1_a", "t1_b"]),
)
+ is_(comp._create_result_map()["t1_a"][1][2], t1.c.a)
def test_insert_with_select_values(self):
- astring = Column('a', String)
- aint = Column('a', Integer)
+ astring = Column("a", String)
+ aint = Column("a", Integer)
m = MetaData()
- Table('t1', m, astring)
- t2 = Table('t2', m, aint)
+ Table("t1", m, astring)
+ t2 = Table("t2", m, aint)
stmt = t2.insert().values(a=select([astring])).returning(aint)
comp = stmt.compile(dialect=postgresql.dialect())
eq_(
comp._create_result_map(),
- {'a': ('a', (aint, 'a', 'a'), aint.type)}
+ {"a": ("a", (aint, "a", "a"), aint.type)},
)
def test_insert_from_select(self):
- astring = Column('a', String)
- aint = Column('a', Integer)
+ astring = Column("a", String)
+ aint = Column("a", Integer)
m = MetaData()
- Table('t1', m, astring)
- t2 = Table('t2', m, aint)
+ Table("t1", m, astring)
+ t2 = Table("t2", m, aint)
- stmt = t2.insert().from_select(['a'], select([astring])).\
- returning(aint)
+ stmt = (
+ t2.insert().from_select(["a"], select([astring])).returning(aint)
+ )
comp = stmt.compile(dialect=postgresql.dialect())
eq_(
comp._create_result_map(),
- {'a': ('a', (aint, 'a', 'a'), aint.type)}
+ {"a": ("a", (aint, "a", "a"), aint.type)},
)
def test_nested_api(self):
from sqlalchemy.engine.result import ResultMetaData
+
stmt2 = select([table2])
stmt1 = select([table1]).select_from(stmt2)
@@ -3936,7 +4336,8 @@ class ResultMapTest(fixtures.TestBase):
self._add_to_result_map("k1", "k1", (1, 2, 3), int_)
else:
text = super(MyCompiler, self).visit_select(
- stmt, *arg, **kw)
+ stmt, *arg, **kw
+ )
self._add_to_result_map("k2", "k2", (3, 4, 5), int_)
return text
@@ -3945,62 +4346,68 @@ class ResultMapTest(fixtures.TestBase):
eq_(
ResultMetaData._create_result_map(contexts[stmt2][0]),
{
- 'otherid': (
- 'otherid',
- (table2.c.otherid, 'otherid', 'otherid'),
- table2.c.otherid.type),
- 'othername': (
- 'othername',
- (table2.c.othername, 'othername', 'othername'),
- table2.c.othername.type),
- 'k1': ('k1', (1, 2, 3), int_)
- }
+ "otherid": (
+ "otherid",
+ (table2.c.otherid, "otherid", "otherid"),
+ table2.c.otherid.type,
+ ),
+ "othername": (
+ "othername",
+ (table2.c.othername, "othername", "othername"),
+ table2.c.othername.type,
+ ),
+ "k1": ("k1", (1, 2, 3), int_),
+ },
)
eq_(
comp._create_result_map(),
{
- 'myid': (
- 'myid',
- (table1.c.myid, 'myid', 'myid'), table1.c.myid.type
+ "myid": (
+ "myid",
+ (table1.c.myid, "myid", "myid"),
+ table1.c.myid.type,
+ ),
+ "k2": ("k2", (3, 4, 5), int_),
+ "name": (
+ "name",
+ (table1.c.name, "name", "name"),
+ table1.c.name.type,
),
- 'k2': ('k2', (3, 4, 5), int_),
- 'name': (
- 'name', (table1.c.name, 'name', 'name'),
- table1.c.name.type),
- 'description': (
- 'description',
- (table1.c.description, 'description', 'description'),
- table1.c.description.type)}
+ "description": (
+ "description",
+ (table1.c.description, "description", "description"),
+ table1.c.description.type,
+ ),
+ },
)
def test_select_wraps_for_translate_ambiguity(self):
# test for issue #3657
- t = table('a', column('x'), column('y'), column('z'))
+ t = table("a", column("x"), column("y"), column("z"))
- l1, l2, l3 = t.c.z.label('a'), t.c.x.label('b'), t.c.x.label('c')
+ l1, l2, l3 = t.c.z.label("a"), t.c.x.label("b"), t.c.x.label("c")
orig = [t.c.x, t.c.y, l1, l2, l3]
stmt = select(orig)
wrapped = stmt._generate()
wrapped = wrapped.column(
- func.ROW_NUMBER().over(order_by=t.c.z)).alias()
+ func.ROW_NUMBER().over(order_by=t.c.z)
+ ).alias()
wrapped_again = select([c for c in wrapped.c])
compiled = wrapped_again.compile(
- compile_kwargs={'select_wraps_for': stmt})
+ compile_kwargs={"select_wraps_for": stmt}
+ )
proxied = [obj[0] for (k, n, obj, type_) in compiled._result_columns]
- for orig_obj, proxied_obj in zip(
- orig,
- proxied
- ):
+ for orig_obj, proxied_obj in zip(orig, proxied):
is_(orig_obj, proxied_obj)
def test_select_wraps_for_translate_ambiguity_dupe_cols(self):
# test for issue #3657
- t = table('a', column('x'), column('y'), column('z'))
+ t = table("a", column("x"), column("y"), column("z"))
- l1, l2, l3 = t.c.z.label('a'), t.c.x.label('b'), t.c.x.label('c')
+ l1, l2, l3 = t.c.z.label("a"), t.c.x.label("b"), t.c.x.label("c")
orig = [t.c.x, t.c.y, l1, l2, l3]
# create the statement with some duplicate columns. right now
@@ -4018,7 +4425,8 @@ class ResultMapTest(fixtures.TestBase):
wrapped = stmt._generate()
wrapped = wrapped.column(
- func.ROW_NUMBER().over(order_by=t.c.z)).alias()
+ func.ROW_NUMBER().over(order_by=t.c.z)
+ ).alias()
# so when we wrap here we're going to have only 5 columns
wrapped_again = select([c for c in wrapped.c])
@@ -4027,11 +4435,9 @@ class ResultMapTest(fixtures.TestBase):
# "select_wraps_for" can't use inner_columns to match because
# these collections are not the same
compiled = wrapped_again.compile(
- compile_kwargs={'select_wraps_for': stmt})
+ compile_kwargs={"select_wraps_for": stmt}
+ )
proxied = [obj[0] for (k, n, obj, type_) in compiled._result_columns]
- for orig_obj, proxied_obj in zip(
- orig,
- proxied
- ):
+ for orig_obj, proxied_obj in zip(orig, proxied):
is_(orig_obj, proxied_obj)