diff options
Diffstat (limited to 'test/sql/test_generative.py')
| -rw-r--r-- | test/sql/test_generative.py | 1542 |
1 files changed, 787 insertions, 755 deletions
diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index 8b1436879..1d064dd3a 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -1,21 +1,45 @@ from sqlalchemy.sql import table, column, ClauseElement, operators from sqlalchemy.sql.expression import _clone, _from_objects -from sqlalchemy import func, select, Integer, Table, \ - Column, MetaData, extract, String, bindparam, tuple_, and_, union, text,\ - case, ForeignKey, literal_column -from sqlalchemy.testing import fixtures, AssertsExecutionResults, \ - AssertsCompiledSQL +from sqlalchemy import ( + func, + select, + Integer, + Table, + Column, + MetaData, + extract, + String, + bindparam, + tuple_, + and_, + union, + text, + case, + ForeignKey, + literal_column, +) +from sqlalchemy.testing import ( + fixtures, + AssertsExecutionResults, + AssertsCompiledSQL, +) from sqlalchemy import testing -from sqlalchemy.sql.visitors import ClauseVisitor, CloningVisitor, \ - cloned_traverse, ReplacingCloningVisitor +from sqlalchemy.sql.visitors import ( + ClauseVisitor, + CloningVisitor, + cloned_traverse, + ReplacingCloningVisitor, +) from sqlalchemy.sql import visitors from sqlalchemy import exc from sqlalchemy.sql import util as sql_util -from sqlalchemy.testing import (eq_, - is_, - is_not_, - assert_raises, - assert_raises_message) +from sqlalchemy.testing import ( + eq_, + is_, + is_not_, + assert_raises, + assert_raises_message, +) A = B = t1 = t2 = t3 = table1 = table2 = table3 = table4 = None @@ -33,7 +57,7 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): # define deep equality semantics as well as deep # identity semantics. class A(ClauseElement): - __visit_name__ = 'a' + __visit_name__ = "a" def __init__(self, expr): self.expr = expr @@ -53,7 +77,7 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): return "A(%s)" % repr(self.expr) class B(ClauseElement): - __visit_name__ = 'b' + __visit_name__ = "b" def __init__(self, *items): self.items = items @@ -93,8 +117,9 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): a1 = A("expr1") struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) struct2 = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) - struct3 = B(a1, A("expr2"), B(A("expr1b"), - A("expr2bmodified")), A("expr3")) + struct3 = B( + a1, A("expr2"), B(A("expr1b"), A("expr2bmodified")), A("expr3") + ) assert a1.is_other(a1) assert struct.is_other(struct) @@ -104,11 +129,11 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): assert not struct.is_other(struct3) def test_clone(self): - struct = B(A("expr1"), A("expr2"), B(A("expr1b"), - A("expr2b")), A("expr3")) + struct = B( + A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3") + ) class Vis(CloningVisitor): - def visit_a(self, a): pass @@ -121,11 +146,11 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): assert not struct.is_other(s2) def test_no_clone(self): - struct = B(A("expr1"), A("expr2"), B(A("expr1b"), - A("expr2b")), A("expr3")) + struct = B( + A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3") + ) class Vis(ClauseVisitor): - def visit_a(self, a): pass @@ -139,7 +164,8 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): def test_clone_anon_label(self): from sqlalchemy.sql.elements import Grouping - c1 = Grouping(literal_column('q')) + + c1 = Grouping(literal_column("q")) s1 = select([c1]) class Vis(CloningVisitor): @@ -151,15 +177,23 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): eq_(list(s2.inner_columns)[0].anon_label, c1.anon_label) def test_change_in_place(self): - struct = B(A("expr1"), A("expr2"), B(A("expr1b"), - A("expr2b")), A("expr3")) - struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"), - A("expr2b")), A("expr3")) - struct3 = B(A("expr1"), A("expr2"), B(A("expr1b"), - A("expr2bmodified")), A("expr3")) + struct = B( + A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3") + ) + struct2 = B( + A("expr1"), + A("expr2modified"), + B(A("expr1b"), A("expr2b")), + A("expr3"), + ) + struct3 = B( + A("expr1"), + A("expr2"), + B(A("expr1b"), A("expr2bmodified")), + A("expr3"), + ) class Vis(CloningVisitor): - def visit_a(self, a): if a.expr == "expr2": a.expr = "expr2modified" @@ -174,7 +208,6 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): assert struct2 == s2 class Vis2(CloningVisitor): - def visit_a(self, a): if a.expr == "expr2b": a.expr = "expr2bmodified" @@ -194,9 +227,9 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults): class CustomObj(Column): pass - assert CustomObj.__visit_name__ == Column.__visit_name__ == 'column' + assert CustomObj.__visit_name__ == Column.__visit_name__ == "column" - foo, bar = CustomObj('foo', String), CustomObj('bar', String) + foo, bar = CustomObj("foo", String), CustomObj("bar", String) bin = foo == bar set(ClauseVisitor().iterate(bin)) assert set(ClauseVisitor().iterate(bin)) == set([foo, bar, bin]) @@ -212,19 +245,13 @@ class BinaryEndpointTraversalTest(fixtures.TestBase): def visit(binary, l, r): canary.append((binary.operator, l, r)) print(binary.operator, l, r) + sql_util.visit_binary_product(visit, expr) - eq_( - canary, expected - ) + eq_(canary, expected) def test_basic(self): a, b = column("a"), column("b") - self._assert_traversal( - a == b, - [ - (operators.eq, a, b) - ] - ) + self._assert_traversal(a == b, [(operators.eq, a, b)]) def test_with_tuples(self): a, b, c, d, b1, b1a, b1b, e, f = ( @@ -236,11 +263,9 @@ class BinaryEndpointTraversalTest(fixtures.TestBase): column("b1a"), column("b1b"), column("e"), - column("f") + column("f"), ) - expr = tuple_( - a, b, b1 == tuple_(b1a, b1b == d), c - ) > tuple_( + expr = tuple_(a, b, b1 == tuple_(b1a, b1b == d), c) > tuple_( func.go(e + f) ) self._assert_traversal( @@ -253,8 +278,8 @@ class BinaryEndpointTraversalTest(fixtures.TestBase): (operators.eq, b1, b1a), (operators.eq, b1b, d), (operators.gt, c, e), - (operators.gt, c, f) - ] + (operators.gt, c, f), + ], ) def test_composed(self): @@ -267,13 +292,7 @@ class BinaryEndpointTraversalTest(fixtures.TestBase): column("j"), column("r"), ) - expr = and_( - (a + b) == q + func.sum(e + f), - and_( - j == r, - f == q - ) - ) + expr = and_((a + b) == q + func.sum(e + f), and_(j == r, f == q)) self._assert_traversal( expr, [ @@ -285,7 +304,7 @@ class BinaryEndpointTraversalTest(fixtures.TestBase): (operators.eq, b, f), (operators.eq, j, r), (operators.eq, f, q), - ] + ], ) def test_subquery(self): @@ -293,11 +312,7 @@ class BinaryEndpointTraversalTest(fixtures.TestBase): subq = select([c]).where(c == a).as_scalar() expr = and_(a == b, b == subq) self._assert_traversal( - expr, - [ - (operators.eq, a, b), - (operators.eq, b, subq), - ] + expr, [(operators.eq, a, b), (operators.eq, b, subq)] ) @@ -305,36 +320,31 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): """test copy-in-place behavior of various ClauseElements.""" - __dialect__ = 'default' + __dialect__ = "default" @classmethod def setup_class(cls): global t1, t2, t3 - t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - ) - t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) - t3 = Table('table3', MetaData(), - Column('col1', Integer), - Column('col2', Integer) - ) + t1 = table("table1", column("col1"), column("col2"), column("col3")) + t2 = table("table2", column("col1"), column("col2"), column("col3")) + t3 = Table( + "table3", + MetaData(), + Column("col1", Integer), + Column("col2", Integer), + ) def test_binary(self): clause = t1.c.col2 == t2.c.col2 eq_(str(clause), str(CloningVisitor().traverse(clause))) def test_binary_anon_label_quirk(self): - t = table('t1', column('col1')) + t = table("t1", column("col1")) f = t.c.col1 * 5 - self.assert_compile(select([f]), - "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1") + self.assert_compile( + select([f]), "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1" + ) f.anon_label @@ -342,9 +352,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): f = sql_util.ClauseAdapter(a).traverse(f) self.assert_compile( - select( - [f]), - "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1") + select([f]), "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1" + ) def test_join(self): clause = t1.join(t2, t1.c.col2 == t2.c.col2) @@ -352,7 +361,6 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): assert str(clause) == str(CloningVisitor().traverse(clause)) class Vis(CloningVisitor): - def visit_binary(self, binary): binary.right = t2.c.col3 @@ -368,24 +376,23 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): adapter = sql_util.ColumnAdapter(aliased) - f = select([ - adapter.columns[c] - for c in aliased2.c - ]).select_from(aliased) + f = select([adapter.columns[c] for c in aliased2.c]).select_from( + aliased + ) s = select([aliased2]).select_from(aliased) eq_(str(s), str(f)) - f = select([ - adapter.columns[func.count(aliased2.c.col1)] - ]).select_from(aliased) + f = select([adapter.columns[func.count(aliased2.c.col1)]]).select_from( + aliased + ) eq_( str(select([func.count(aliased2.c.col1)]).select_from(aliased)), - str(f) + str(f), ) def test_aliased_cloned_column_adapt_inner(self): - clause = select([t1.c.col1, func.foo(t1.c.col2).label('foo')]) + clause = select([t1.c.col1, func.foo(t1.c.col2).label("foo")]) aliased1 = select([clause.c.col1, clause.c.foo]) aliased2 = clause @@ -397,20 +404,12 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): # aliased2. corresponding_column checks these # now. adapter = sql_util.ColumnAdapter(aliased1) - f1 = select([ - adapter.columns[c] - for c in aliased2._raw_columns - ]) - f2 = select([ - adapter.columns[c] - for c in aliased3._raw_columns - ]) - eq_( - str(f1), str(f2) - ) + f1 = select([adapter.columns[c] for c in aliased2._raw_columns]) + f2 = select([adapter.columns[c] for c in aliased3._raw_columns]) + eq_(str(f1), str(f2)) def test_aliased_cloned_column_adapt_exported(self): - clause = select([t1.c.col1, func.foo(t1.c.col2).label('foo')]) + clause = select([t1.c.col1, func.foo(t1.c.col2).label("foo")]) aliased1 = select([clause.c.col1, clause.c.foo]) aliased2 = clause @@ -422,20 +421,12 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): # have an _is_clone_of pointer. But we now modified _make_proxy # to assign this. adapter = sql_util.ColumnAdapter(aliased1) - f1 = select([ - adapter.columns[c] - for c in aliased2.c - ]) - f2 = select([ - adapter.columns[c] - for c in aliased3.c - ]) - eq_( - str(f1), str(f2) - ) + f1 = select([adapter.columns[c] for c in aliased2.c]) + f2 = select([adapter.columns[c] for c in aliased3.c]) + eq_(str(f1), str(f2)) def test_aliased_cloned_schema_column_adapt_exported(self): - clause = select([t3.c.col1, func.foo(t3.c.col2).label('foo')]) + clause = select([t3.c.col1, func.foo(t3.c.col2).label("foo")]) aliased1 = select([clause.c.col1, clause.c.foo]) aliased2 = clause @@ -447,20 +438,12 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): # have an _is_clone_of pointer. But we now modified _make_proxy # to assign this. adapter = sql_util.ColumnAdapter(aliased1) - f1 = select([ - adapter.columns[c] - for c in aliased2.c - ]) - f2 = select([ - adapter.columns[c] - for c in aliased3.c - ]) - eq_( - str(f1), str(f2) - ) + f1 = select([adapter.columns[c] for c in aliased2.c]) + f2 = select([adapter.columns[c] for c in aliased3.c]) + eq_(str(f1), str(f2)) def test_labeled_expression_adapt(self): - lbl_x = (t3.c.col1 == 1).label('x') + lbl_x = (t3.c.col1 == 1).label("x") t3_alias = t3.alias() adapter = sql_util.ColumnAdapter(t3_alias) @@ -471,13 +454,13 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): lblx_adapted = adapter.traverse(lbl_x) self.assert_compile( select([lblx_adapted.self_group()]), - "SELECT (table3_1.col1 = :col1_1) AS x FROM table3 AS table3_1" + "SELECT (table3_1.col1 = :col1_1) AS x FROM table3 AS table3_1", ) self.assert_compile( select([lblx_adapted.is_(True)]), "SELECT (table3_1.col1 = :col1_1) IS 1 AS anon_1 " - "FROM table3 AS table3_1" + "FROM table3 AS table3_1", ) def test_cte_w_union(self): @@ -486,50 +469,55 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s = select([func.sum(t.c.n)]) from sqlalchemy.sql.visitors import cloned_traverse + cloned = cloned_traverse(s, {}, {}) - self.assert_compile(cloned, - "WITH RECURSIVE t(n) AS " - "(SELECT values(:values_1) AS n " - "UNION ALL SELECT t.n + :n_1 AS anon_1 " - "FROM t " - "WHERE t.n < :n_2) " - "SELECT sum(t.n) AS sum_1 FROM t" - ) + self.assert_compile( + cloned, + "WITH RECURSIVE t(n) AS " + "(SELECT values(:values_1) AS n " + "UNION ALL SELECT t.n + :n_1 AS anon_1 " + "FROM t " + "WHERE t.n < :n_2) " + "SELECT sum(t.n) AS sum_1 FROM t", + ) def test_aliased_cte_w_union(self): - t = select([func.values(1).label("n")]).\ - cte("t", recursive=True).alias('foo') + t = ( + select([func.values(1).label("n")]) + .cte("t", recursive=True) + .alias("foo") + ) t = t.union_all(select([t.c.n + 1]).where(t.c.n < 100)) s = select([func.sum(t.c.n)]) from sqlalchemy.sql.visitors import cloned_traverse + cloned = cloned_traverse(s, {}, {}) self.assert_compile( cloned, "WITH RECURSIVE foo(n) AS (SELECT values(:values_1) AS n " "UNION ALL SELECT foo.n + :n_1 AS anon_1 FROM foo " - "WHERE foo.n < :n_2) SELECT sum(foo.n) AS sum_1 FROM foo" + "WHERE foo.n < :n_2) SELECT sum(foo.n) AS sum_1 FROM foo", ) def test_text(self): clause = text( - "select * from table where foo=:bar", - bindparams=[bindparam('bar')]) + "select * from table where foo=:bar", bindparams=[bindparam("bar")] + ) c1 = str(clause) class Vis(CloningVisitor): - def visit_textclause(self, text): text.text = text.text + " SOME MODIFIER=:lala" - text._bindparams['lala'] = bindparam('lala') + text._bindparams["lala"] = bindparam("lala") clause2 = Vis().traverse(clause) assert c1 == str(clause) assert str(clause2) == c1 + " SOME MODIFIER=:lala" - assert list(clause._bindparams.keys()) == ['bar'] - assert set(clause2._bindparams.keys()) == set(['bar', 'lala']) + assert list(clause._bindparams.keys()) == ["bar"] + assert set(clause2._bindparams.keys()) == set(["bar", "lala"]) def test_select(self): s2 = select([t1]) @@ -537,9 +525,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s3_assert = str(select([t1], t1.c.col2 == 7)) class Vis(CloningVisitor): - def visit_select(self, select): select.append_whereclause(t1.c.col2 == 7) + s3 = Vis().traverse(s2) assert str(s3) == s3_assert assert str(s2) == s2_assert @@ -547,18 +535,18 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): print(str(s3)) class Vis(ClauseVisitor): - def visit_select(self, select): select.append_whereclause(t1.c.col2 == 7) + Vis().traverse(s2) assert str(s2) == s3_assert s4_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col3 == 9))) class Vis(CloningVisitor): - def visit_select(self, select): select.append_whereclause(t1.c.col3 == 9) + s4 = Vis().traverse(s3) print(str(s3)) print(str(s4)) @@ -568,11 +556,11 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s5_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col1 == 9))) class Vis(CloningVisitor): - def visit_binary(self, binary): if binary.left is t1.c.col3: binary.left = t1.c.col1 binary.right = bindparam("col1", unique=True) + s5 = Vis().traverse(s4) print(str(s4)) print(str(s5)) @@ -591,18 +579,18 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): assert str(u) == str(u2) assert [str(c) for c in u2.c] == cols - s1 = select([t1], t1.c.col1 == bindparam('id_param')) + s1 = select([t1], t1.c.col1 == bindparam("id_param")) s2 = select([t2]) u = union(s1, s2) u2 = u.params(id_param=7) u3 = u.params(id_param=10) assert str(u) == str(u2) == str(u3) - assert u2.compile().params == {'id_param': 7} - assert u3.compile().params == {'id_param': 10} + assert u2.compile().params == {"id_param": 7} + assert u3.compile().params == {"id_param": 10} def test_in(self): - expr = t1.c.col1.in_(['foo', 'bar']) + expr = t1.c.col1.in_(["foo", "bar"]) expr2 = CloningVisitor().traverse(expr) assert str(expr) == str(expr2) @@ -628,7 +616,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): def test_adapt_union(self): u = union( t1.select().where(t1.c.col1 == 4), - t1.select().where(t1.c.col1 == 5) + t1.select().where(t1.c.col1 == 5), ).alias() assert sql_util.ClauseAdapter(u).traverse(t1) is u @@ -642,48 +630,54 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s3 = select([s], s.c.col2 == s2.c.col2) self.assert_compile( - s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " + s3, + "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " "(SELECT table1.col1 AS col1, table1.col2 AS col2, " "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) " "AS anon_1, " "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 " "AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 " - "WHERE anon_1.col2 = anon_2.col2") + "WHERE anon_1.col2 = anon_2.col2", + ) s = select([t1], t1.c.col1 == 4).alias() s2 = CloningVisitor().traverse(s).alias() s3 = select([s], s.c.col2 == s2.c.col2) self.assert_compile( - s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " + s3, + "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " "(SELECT table1.col1 AS col1, table1.col2 AS col2, " "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) " "AS anon_1, " "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 " "AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 " - "WHERE anon_1.col2 = anon_2.col2") + "WHERE anon_1.col2 = anon_2.col2", + ) def test_extract(self): - s = select([extract('foo', t1.c.col1).label('col1')]) + s = select([extract("foo", t1.c.col1).label("col1")]) self.assert_compile( - s, - "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") + s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1" + ) s2 = CloningVisitor().traverse(s).alias() s3 = select([s2.c.col1]) self.assert_compile( - s, - "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") - self.assert_compile(s3, - "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM " - "table1.col1) AS col1 FROM table1) AS anon_1") + s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1" + ) + self.assert_compile( + s3, + "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM " + "table1.col1) AS col1 FROM table1) AS anon_1", + ) - @testing.emits_warning('.*replaced by another column with the same key') + @testing.emits_warning(".*replaced by another column with the same key") def test_alias(self): - subq = t2.select().alias('subq') - s = select([t1.c.col1, subq.c.col1], - from_obj=[t1, subq, - t1.join(subq, t1.c.col1 == subq.c.col2)] - ) + subq = t2.select().alias("subq") + s = select( + [t1.c.col1, subq.c.col1], + from_obj=[t1, subq, t1.join(subq, t1.c.col1 == subq.c.col2)], + ) orig = str(s) s2 = CloningVisitor().traverse(s) assert orig == str(s) == str(s2) @@ -691,26 +685,26 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s4 = CloningVisitor().traverse(s2) assert orig == str(s) == str(s2) == str(s4) - s3 = sql_util.ClauseAdapter(table('foo')).traverse(s) + s3 = sql_util.ClauseAdapter(table("foo")).traverse(s) assert orig == str(s) == str(s3) - s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3) + s4 = sql_util.ClauseAdapter(table("foo")).traverse(s3) assert orig == str(s) == str(s3) == str(s4) - subq = subq.alias('subq') - s = select([t1.c.col1, subq.c.col1], - from_obj=[t1, subq, - t1.join(subq, t1.c.col1 == subq.c.col2)] - ) + subq = subq.alias("subq") + s = select( + [t1.c.col1, subq.c.col1], + from_obj=[t1, subq, t1.join(subq, t1.c.col1 == subq.c.col2)], + ) s5 = CloningVisitor().traverse(s) assert orig == str(s) == str(s5) def test_correlated_select(self): - s = select([literal_column('*')], t1.c.col1 == t2.c.col1, - from_obj=[t1, t2]).correlate(t2) + s = select( + [literal_column("*")], t1.c.col1 == t2.c.col1, from_obj=[t1, t2] + ).correlate(t2) class Vis(CloningVisitor): - def visit_select(self, select): select.append_whereclause(t1.c.col2 == 7) @@ -719,26 +713,30 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): "SELECT table2.col1, table2.col2, table2.col3 " "FROM table2 WHERE table2.col1 = " "(SELECT * FROM table1 WHERE table1.col1 = table2.col1 " - "AND table1.col2 = :col2_1)" + "AND table1.col2 = :col2_1)", ) def test_this_thing(self): - s = select([t1]).where(t1.c.col1 == 'foo').alias() + s = select([t1]).where(t1.c.col1 == "foo").alias() s2 = select([s.c.col1]) - self.assert_compile(s2, - 'SELECT anon_1.col1 FROM (SELECT ' - 'table1.col1 AS col1, table1.col2 AS col2, ' - 'table1.col3 AS col3 FROM table1 WHERE ' - 'table1.col1 = :col1_1) AS anon_1') + self.assert_compile( + s2, + "SELECT anon_1.col1 FROM (SELECT " + "table1.col1 AS col1, table1.col2 AS col2, " + "table1.col3 AS col3 FROM table1 WHERE " + "table1.col1 = :col1_1) AS anon_1", + ) t1a = t1.alias() s2 = sql_util.ClauseAdapter(t1a).traverse(s2) - self.assert_compile(s2, - 'SELECT anon_1.col1 FROM (SELECT ' - 'table1_1.col1 AS col1, table1_1.col2 AS ' - 'col2, table1_1.col3 AS col3 FROM table1 ' - 'AS table1_1 WHERE table1_1.col1 = ' - ':col1_1) AS anon_1') + self.assert_compile( + s2, + "SELECT anon_1.col1 FROM (SELECT " + "table1_1.col1 AS col1, table1_1.col2 AS " + "col2, table1_1.col3 AS col3 FROM table1 " + "AS table1_1 WHERE table1_1.col1 = " + ":col1_1) AS anon_1", + ) def test_select_fromtwice_one(self): t1a = t1.alias() @@ -746,95 +744,91 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL): s = select([1], t1.c.col1 == t1a.c.col1, from_obj=t1a).correlate(t1a) s = select([t1]).where(t1.c.col1 == s) self.assert_compile( - s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " + s, + "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " "WHERE table1.col1 = " "(SELECT 1 FROM table1, table1 AS table1_1 " - "WHERE table1.col1 = table1_1.col1)") + "WHERE table1.col1 = table1_1.col1)", + ) s = CloningVisitor().traverse(s) self.assert_compile( - s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " + s, + "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " "WHERE table1.col1 = " "(SELECT 1 FROM table1, table1 AS table1_1 " - "WHERE table1.col1 = table1_1.col1)") + "WHERE table1.col1 = table1_1.col1)", + ) def test_select_fromtwice_two(self): - s = select([t1]).where(t1.c.col1 == 'foo').alias() + s = select([t1]).where(t1.c.col1 == "foo").alias() s2 = select([1], t1.c.col1 == s.c.col1, from_obj=s).correlate(t1) s3 = select([t1]).where(t1.c.col1 == s2) self.assert_compile( - s3, "SELECT table1.col1, table1.col2, table1.col3 " + s3, + "SELECT table1.col1, table1.col2, table1.col3 " "FROM table1 WHERE table1.col1 = " "(SELECT 1 FROM " "(SELECT table1.col1 AS col1, table1.col2 AS col2, " "table1.col3 AS col3 FROM table1 " "WHERE table1.col1 = :col1_1) " - "AS anon_1 WHERE table1.col1 = anon_1.col1)") + "AS anon_1 WHERE table1.col1 = anon_1.col1)", + ) s4 = ReplacingCloningVisitor().traverse(s3) self.assert_compile( - s4, "SELECT table1.col1, table1.col2, table1.col3 " + s4, + "SELECT table1.col1, table1.col2, table1.col3 " "FROM table1 WHERE table1.col1 = " "(SELECT 1 FROM " "(SELECT table1.col1 AS col1, table1.col2 AS col2, " "table1.col3 AS col3 FROM table1 " "WHERE table1.col1 = :col1_1) " - "AS anon_1 WHERE table1.col1 = anon_1.col1)") + "AS anon_1 WHERE table1.col1 = anon_1.col1)", + ) class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" @classmethod def setup_class(cls): global t1, t2 - t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - column("col4") - ) - t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) + t1 = table( + "table1", + column("col1"), + column("col2"), + column("col3"), + column("col4"), + ) + t2 = table("table2", column("col1"), column("col2"), column("col3")) def test_traverse_memoizes_w_columns(self): t1a = t1.alias() adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) - expr = select([t1a.c.col1]).label('x') + expr = select([t1a.c.col1]).label("x") expr_adapted = adapter.traverse(expr) is_not_(expr, expr_adapted) - is_( - adapter.columns[expr], - expr_adapted - ) + is_(adapter.columns[expr], expr_adapted) def test_traverse_memoizes_w_itself(self): t1a = t1.alias() adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) - expr = select([t1a.c.col1]).label('x') + expr = select([t1a.c.col1]).label("x") expr_adapted = adapter.traverse(expr) is_not_(expr, expr_adapted) - is_( - adapter.traverse(expr), - expr_adapted - ) + is_(adapter.traverse(expr), expr_adapted) def test_columns_memoizes_w_itself(self): t1a = t1.alias() adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) - expr = select([t1a.c.col1]).label('x') + expr = select([t1a.c.col1]).label("x") expr_adapted = adapter.columns[expr] is_not_(expr, expr_adapted) - is_( - adapter.columns[expr], - expr_adapted - ) + is_(adapter.columns[expr], expr_adapted) def test_wrapping_fallthrough(self): t1a = t1.alias(name="t1a") @@ -850,57 +844,35 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): # t1.c.col1 -> s1.c.t1a_col1 # adapted by a2 - is_( - a3.columns[t1.c.col1], s1.c.t1a_col1 - ) - is_( - a4.columns[t1.c.col1], s1.c.t1a_col1 - ) + is_(a3.columns[t1.c.col1], s1.c.t1a_col1) + is_(a4.columns[t1.c.col1], s1.c.t1a_col1) # chaining can't fall through because a1 grabs it # first - is_( - a5.columns[t1.c.col1], t1a.c.col1 - ) + is_(a5.columns[t1.c.col1], t1a.c.col1) # t2.c.col1 -> s1.c.t2a_col1 # adapted by a2 - is_( - a3.columns[t2.c.col1], s1.c.t2a_col1 - ) - is_( - a4.columns[t2.c.col1], s1.c.t2a_col1 - ) + is_(a3.columns[t2.c.col1], s1.c.t2a_col1) + is_(a4.columns[t2.c.col1], s1.c.t2a_col1) # chaining, t2 hits s1 - is_( - a5.columns[t2.c.col1], s1.c.t2a_col1 - ) + is_(a5.columns[t2.c.col1], s1.c.t2a_col1) # t1.c.col2 -> t1a.c.col2 # fallthrough to a1 - is_( - a3.columns[t1.c.col2], t1a.c.col2 - ) - is_( - a4.columns[t1.c.col2], t1a.c.col2 - ) + is_(a3.columns[t1.c.col2], t1a.c.col2) + is_(a4.columns[t1.c.col2], t1a.c.col2) # chaining hits a1 - is_( - a5.columns[t1.c.col2], t1a.c.col2 - ) + is_(a5.columns[t1.c.col2], t1a.c.col2) # t2.c.col2 -> t2.c.col2 # fallthrough to no adaption - is_( - a3.columns[t2.c.col2], t2.c.col2 - ) - is_( - a4.columns[t2.c.col2], t2.c.col2 - ) + is_(a3.columns[t2.c.col2], t2.c.col2) + is_(a4.columns[t2.c.col2], t2.c.col2) def test_wrapping_ordering(self): """illustrate an example where order of wrappers matters. @@ -926,25 +898,15 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): # in different contexts, order of wrapping matters # t2.c.col1 via a2 is stmt2.c.col1; then ignored by a1 - is_( - a2_to_a1.columns[t2.c.col1], stmt2.c.col1 - ) + is_(a2_to_a1.columns[t2.c.col1], stmt2.c.col1) # t2.c.col1 via a1 is stmt.c.table2_col1; a2 then # sends this to stmt2.c.table2_col1 - is_( - a1_to_a2.columns[t2.c.col1], stmt2.c.table2_col1 - ) + is_(a1_to_a2.columns[t2.c.col1], stmt2.c.table2_col1) # for mutually exclusive columns, order doesn't matter - is_( - a2_to_a1.columns[t1.c.col1], stmt2.c.table1_col1 - ) - is_( - a1_to_a2.columns[t1.c.col1], stmt2.c.table1_col1 - ) - is_( - a2_to_a1.columns[t2.c.col2], stmt2.c.col2 - ) + is_(a2_to_a1.columns[t1.c.col1], stmt2.c.table1_col1) + is_(a1_to_a2.columns[t1.c.col1], stmt2.c.table1_col1) + is_(a2_to_a1.columns[t2.c.col2], stmt2.c.col2) def test_wrapping_multiple(self): """illustrate that wrapping runs both adapters""" @@ -959,7 +921,7 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): self.assert_compile( a3.traverse(stmt), - "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a" + "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a", ) # chaining does too because these adapters don't share any @@ -967,7 +929,7 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): a4 = a2.chain(a1) self.assert_compile( a4.traverse(stmt), - "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a" + "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a", ) def test_wrapping_inclusions(self): @@ -977,13 +939,13 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): t1a = t1.alias(name="t1a") t2a = t2.alias(name="t2a") a1 = sql_util.ColumnAdapter( - t1a, - include_fn=lambda col: "a1" in col._annotations) + t1a, include_fn=lambda col: "a1" in col._annotations + ) s1 = select([t1a, t2a]).apply_labels().alias() a2 = sql_util.ColumnAdapter( - s1, - include_fn=lambda col: "a2" in col._annotations) + s1, include_fn=lambda col: "a2" in col._annotations + ) a3 = a2.wrap(a1) c1a1 = t1.c.col1._annotate(dict(a1=True)) @@ -994,62 +956,45 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): c2a2 = t2.c.col1._annotate(dict(a2=True)) c2aa = t2.c.col1._annotate(dict(a1=True, a2=True)) - is_( - a3.columns[c1a1], t1a.c.col1 - ) - is_( - a3.columns[c1a2], s1.c.t1a_col1 - ) - is_( - a3.columns[c1aa], s1.c.t1a_col1 - ) + is_(a3.columns[c1a1], t1a.c.col1) + is_(a3.columns[c1a2], s1.c.t1a_col1) + is_(a3.columns[c1aa], s1.c.t1a_col1) # not covered by a1, accepted by a2 - is_( - a3.columns[c2aa], s1.c.t2a_col1 - ) + is_(a3.columns[c2aa], s1.c.t2a_col1) # not covered by a1, accepted by a2 - is_( - a3.columns[c2a2], s1.c.t2a_col1 - ) + is_(a3.columns[c2a2], s1.c.t2a_col1) # not covered by a1, rejected by a2 - is_( - a3.columns[c2a1], c2a1 - ) + is_(a3.columns[c2a1], c2a1) class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" @classmethod def setup_class(cls): global t1, t2 - t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - ) - t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) + t1 = table("table1", column("col1"), column("col2"), column("col3")) + t2 = table("table2", column("col1"), column("col2"), column("col3")) def test_correlation_on_clone(self): - t1alias = t1.alias('t1alias') - t2alias = t2.alias('t2alias') + t1alias = t1.alias("t1alias") + t2alias = t2.alias("t2alias") vis = sql_util.ClauseAdapter(t1alias) - s = select([literal_column('*')], - from_obj=[t1alias, t2alias]).as_scalar() + s = select( + [literal_column("*")], from_obj=[t1alias, t2alias] + ).as_scalar() assert t2alias in s._froms assert t1alias in s._froms - self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), - 'SELECT * FROM table2 AS t2alias WHERE ' - 't2alias.col1 = (SELECT * FROM table1 AS ' - 't1alias)') + self.assert_compile( + select([literal_column("*")], t2alias.c.col1 == s), + "SELECT * FROM table2 AS t2alias WHERE " + "t2alias.col1 = (SELECT * FROM table1 AS " + "t1alias)", + ) s = vis.traverse(s) assert t2alias not in s._froms # not present because it's been @@ -1060,61 +1005,91 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): # correlate list on "s" needs to take into account the full # _cloned_set for each element in _froms when correlating - self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), - 'SELECT * FROM table2 AS t2alias WHERE ' - 't2alias.col1 = (SELECT * FROM table1 AS ' - 't1alias)') - s = select([literal_column('*')], - from_obj=[t1alias, t2alias]).correlate(t2alias).as_scalar() - self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), - 'SELECT * FROM table2 AS t2alias WHERE ' - 't2alias.col1 = (SELECT * FROM table1 AS ' - 't1alias)') + self.assert_compile( + select([literal_column("*")], t2alias.c.col1 == s), + "SELECT * FROM table2 AS t2alias WHERE " + "t2alias.col1 = (SELECT * FROM table1 AS " + "t1alias)", + ) + s = ( + select([literal_column("*")], from_obj=[t1alias, t2alias]) + .correlate(t2alias) + .as_scalar() + ) + self.assert_compile( + select([literal_column("*")], t2alias.c.col1 == s), + "SELECT * FROM table2 AS t2alias WHERE " + "t2alias.col1 = (SELECT * FROM table1 AS " + "t1alias)", + ) s = vis.traverse(s) - self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), - 'SELECT * FROM table2 AS t2alias WHERE ' - 't2alias.col1 = (SELECT * FROM table1 AS ' - 't1alias)') + self.assert_compile( + select([literal_column("*")], t2alias.c.col1 == s), + "SELECT * FROM table2 AS t2alias WHERE " + "t2alias.col1 = (SELECT * FROM table1 AS " + "t1alias)", + ) s = CloningVisitor().traverse(s) - self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), - 'SELECT * FROM table2 AS t2alias WHERE ' - 't2alias.col1 = (SELECT * FROM table1 AS ' - 't1alias)') + self.assert_compile( + select([literal_column("*")], t2alias.c.col1 == s), + "SELECT * FROM table2 AS t2alias WHERE " + "t2alias.col1 = (SELECT * FROM table1 AS " + "t1alias)", + ) - s = select([literal_column('*')]).where(t1.c.col1 == t2.c.col1) \ + s = ( + select([literal_column("*")]) + .where(t1.c.col1 == t2.c.col1) .as_scalar() - self.assert_compile(select([t1.c.col1, s]), - 'SELECT table1.col1, (SELECT * FROM table2 ' - 'WHERE table1.col1 = table2.col1) AS ' - 'anon_1 FROM table1') + ) + self.assert_compile( + select([t1.c.col1, s]), + "SELECT table1.col1, (SELECT * FROM table2 " + "WHERE table1.col1 = table2.col1) AS " + "anon_1 FROM table1", + ) vis = sql_util.ClauseAdapter(t1alias) s = vis.traverse(s) - self.assert_compile(select([t1alias.c.col1, s]), - 'SELECT t1alias.col1, (SELECT * FROM ' - 'table2 WHERE t1alias.col1 = table2.col1) ' - 'AS anon_1 FROM table1 AS t1alias') + self.assert_compile( + select([t1alias.c.col1, s]), + "SELECT t1alias.col1, (SELECT * FROM " + "table2 WHERE t1alias.col1 = table2.col1) " + "AS anon_1 FROM table1 AS t1alias", + ) s = CloningVisitor().traverse(s) - self.assert_compile(select([t1alias.c.col1, s]), - 'SELECT t1alias.col1, (SELECT * FROM ' - 'table2 WHERE t1alias.col1 = table2.col1) ' - 'AS anon_1 FROM table1 AS t1alias') - s = select([literal_column('*')]).where(t1.c.col1 == t2.c.col1) \ - .correlate(t1).as_scalar() - self.assert_compile(select([t1.c.col1, s]), - 'SELECT table1.col1, (SELECT * FROM table2 ' - 'WHERE table1.col1 = table2.col1) AS ' - 'anon_1 FROM table1') + self.assert_compile( + select([t1alias.c.col1, s]), + "SELECT t1alias.col1, (SELECT * FROM " + "table2 WHERE t1alias.col1 = table2.col1) " + "AS anon_1 FROM table1 AS t1alias", + ) + s = ( + select([literal_column("*")]) + .where(t1.c.col1 == t2.c.col1) + .correlate(t1) + .as_scalar() + ) + self.assert_compile( + select([t1.c.col1, s]), + "SELECT table1.col1, (SELECT * FROM table2 " + "WHERE table1.col1 = table2.col1) AS " + "anon_1 FROM table1", + ) vis = sql_util.ClauseAdapter(t1alias) s = vis.traverse(s) - self.assert_compile(select([t1alias.c.col1, s]), - 'SELECT t1alias.col1, (SELECT * FROM ' - 'table2 WHERE t1alias.col1 = table2.col1) ' - 'AS anon_1 FROM table1 AS t1alias') + self.assert_compile( + select([t1alias.c.col1, s]), + "SELECT t1alias.col1, (SELECT * FROM " + "table2 WHERE t1alias.col1 = table2.col1) " + "AS anon_1 FROM table1 AS t1alias", + ) s = CloningVisitor().traverse(s) - self.assert_compile(select([t1alias.c.col1, s]), - 'SELECT t1alias.col1, (SELECT * FROM ' - 'table2 WHERE t1alias.col1 = table2.col1) ' - 'AS anon_1 FROM table1 AS t1alias') + self.assert_compile( + select([t1alias.c.col1, s]), + "SELECT t1alias.col1, (SELECT * FROM " + "table2 WHERE t1alias.col1 = table2.col1) " + "AS anon_1 FROM table1 AS t1alias", + ) @testing.fails_on_everything_except() def test_joins_dont_adapt(self): @@ -1122,284 +1097,319 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): # make much sense. ClauseAdapter doesn't make any changes if # it's against a straight join. - users = table('users', column('id')) - addresses = table('addresses', column('id'), column('user_id')) + users = table("users", column("id")) + addresses = table("addresses", column("id"), column("user_id")) ualias = users.alias() - s = select([func.count(addresses.c.id)], users.c.id - == addresses.c.user_id).correlate(users) + s = select( + [func.count(addresses.c.id)], users.c.id == addresses.c.user_id + ).correlate(users) s = sql_util.ClauseAdapter(ualias).traverse(s) j1 = addresses.join(ualias, addresses.c.user_id == ualias.c.id) - self.assert_compile(sql_util.ClauseAdapter(j1).traverse(s), - 'SELECT count(addresses.id) AS count_1 ' - 'FROM addresses WHERE users_1.id = ' - 'addresses.user_id') + self.assert_compile( + sql_util.ClauseAdapter(j1).traverse(s), + "SELECT count(addresses.id) AS count_1 " + "FROM addresses WHERE users_1.id = " + "addresses.user_id", + ) def test_table_to_alias_1(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) - ff = vis.traverse(func.count(t1.c.col1).label('foo')) + ff = vis.traverse(func.count(t1.c.col1).label("foo")) assert list(_from_objects(ff)) == [t1alias] def test_table_to_alias_2(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( - vis.traverse(select([literal_column('*')], from_obj=[t1])), - 'SELECT * FROM table1 AS t1alias') + vis.traverse(select([literal_column("*")], from_obj=[t1])), + "SELECT * FROM table1 AS t1alias", + ) def test_table_to_alias_3(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( - select([literal_column('*')], t1.c.col1 == t2.c.col2), - 'SELECT * FROM table1, table2 WHERE table1.col1 = table2.col2') + select([literal_column("*")], t1.c.col1 == t2.c.col2), + "SELECT * FROM table1, table2 WHERE table1.col1 = table2.col2", + ) def test_table_to_alias_4(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) - self.assert_compile(vis.traverse(select([literal_column('*')], - t1.c.col1 == t2.c.col2)), - 'SELECT * FROM table1 AS t1alias, table2 ' - 'WHERE t1alias.col1 = table2.col2') + self.assert_compile( + vis.traverse( + select([literal_column("*")], t1.c.col1 == t2.c.col2) + ), + "SELECT * FROM table1 AS t1alias, table2 " + "WHERE t1alias.col1 = table2.col2", + ) def test_table_to_alias_5(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( vis.traverse( select( - [literal_column('*')], + [literal_column("*")], t1.c.col1 == t2.c.col2, - from_obj=[ - t1, - t2])), - 'SELECT * FROM table1 AS t1alias, table2 ' - 'WHERE t1alias.col1 = table2.col2') + from_obj=[t1, t2], + ) + ), + "SELECT * FROM table1 AS t1alias, table2 " + "WHERE t1alias.col1 = table2.col2", + ) def test_table_to_alias_6(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) - self.assert_compile(select([t1alias, t2]).where( - t1alias.c.col1 == vis.traverse( - select([literal_column('*')], - t1.c.col1 == t2.c.col2, from_obj=[t1, t2]).correlate(t1) - ) - ), + self.assert_compile( + select([t1alias, t2]).where( + t1alias.c.col1 + == vis.traverse( + select( + [literal_column("*")], + t1.c.col1 == t2.c.col2, + from_obj=[t1, t2], + ).correlate(t1) + ) + ), "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " "table2.col1, table2.col2, table2.col3 " "FROM table1 AS t1alias, table2 WHERE t1alias.col1 = " - "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)" + "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)", ) def test_table_to_alias_7(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( - select([t1alias, t2]). - where(t1alias.c.col1 == vis.traverse( - select([literal_column('*')], - t1.c.col1 == t2.c.col2, from_obj=[t1, t2]). - correlate(t2))), + select([t1alias, t2]).where( + t1alias.c.col1 + == vis.traverse( + select( + [literal_column("*")], + t1.c.col1 == t2.c.col2, + from_obj=[t1, t2], + ).correlate(t2) + ) + ), "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " "table2.col1, table2.col2, table2.col3 " "FROM table1 AS t1alias, table2 " "WHERE t1alias.col1 = " "(SELECT * FROM table1 AS t1alias " - "WHERE t1alias.col1 = table2.col2)") + "WHERE t1alias.col1 = table2.col2)", + ) def test_table_to_alias_8(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)), - 'CASE WHEN (t1alias.col1 = :col1_1) THEN ' - 't1alias.col2 ELSE t1alias.col1 END') + "CASE WHEN (t1alias.col1 = :col1_1) THEN " + "t1alias.col2 ELSE t1alias.col1 END", + ) def test_table_to_alias_9(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) self.assert_compile( vis.traverse( - case( - [ - (5, - t1.c.col2)], - value=t1.c.col1, - else_=t1.c.col1)), - 'CASE t1alias.col1 WHEN :param_1 THEN ' - 't1alias.col2 ELSE t1alias.col1 END') + case([(5, t1.c.col2)], value=t1.c.col1, else_=t1.c.col1) + ), + "CASE t1alias.col1 WHEN :param_1 THEN " + "t1alias.col2 ELSE t1alias.col1 END", + ) def test_table_to_alias_10(self): - s = select([literal_column('*')], from_obj=[t1]).alias('foo') - self.assert_compile(s.select(), - 'SELECT foo.* FROM (SELECT * FROM table1) ' - 'AS foo') + s = select([literal_column("*")], from_obj=[t1]).alias("foo") + self.assert_compile( + s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo" + ) def test_table_to_alias_11(self): - s = select([literal_column('*')], from_obj=[t1]).alias('foo') - t1alias = t1.alias('t1alias') + s = select([literal_column("*")], from_obj=[t1]).alias("foo") + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) - self.assert_compile(vis.traverse(s.select()), - 'SELECT foo.* FROM (SELECT * FROM table1 ' - 'AS t1alias) AS foo') + self.assert_compile( + vis.traverse(s.select()), + "SELECT foo.* FROM (SELECT * FROM table1 " "AS t1alias) AS foo", + ) def test_table_to_alias_12(self): - s = select([literal_column('*')], from_obj=[t1]).alias('foo') - self.assert_compile(s.select(), - 'SELECT foo.* FROM (SELECT * FROM table1) ' - 'AS foo') + s = select([literal_column("*")], from_obj=[t1]).alias("foo") + self.assert_compile( + s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo" + ) def test_table_to_alias_13(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) - ff = vis.traverse(func.count(t1.c.col1).label('foo')) - self.assert_compile(select([ff]), - 'SELECT count(t1alias.col1) AS foo FROM ' - 'table1 AS t1alias') + ff = vis.traverse(func.count(t1.c.col1).label("foo")) + self.assert_compile( + select([ff]), + "SELECT count(t1alias.col1) AS foo FROM " "table1 AS t1alias", + ) assert list(_from_objects(ff)) == [t1alias] # def test_table_to_alias_2(self): - # TODO: self.assert_compile(vis.traverse(select([func.count(t1.c - # .col1).l abel('foo')]), clone=True), "SELECT - # count(t1alias.col1) AS foo FROM table1 AS t1alias") + # TODO: self.assert_compile(vis.traverse(select([func.count(t1.c + # .col1).l abel('foo')]), clone=True), "SELECT + # count(t1alias.col1) AS foo FROM table1 AS t1alias") def test_table_to_alias_14(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) - t2alias = t2.alias('t2alias') + t2alias = t2.alias("t2alias") vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile( vis.traverse( - select([literal_column('*')], t1.c.col1 == t2.c.col2)), - 'SELECT * FROM table1 AS t1alias, table2 ' - 'AS t2alias WHERE t1alias.col1 = ' - 't2alias.col2') + select([literal_column("*")], t1.c.col1 == t2.c.col2) + ), + "SELECT * FROM table1 AS t1alias, table2 " + "AS t2alias WHERE t1alias.col1 = " + "t2alias.col2", + ) def test_table_to_alias_15(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) - t2alias = t2.alias('t2alias') + t2alias = t2.alias("t2alias") vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile( vis.traverse( - select( - ['*'], - t1.c.col1 == t2.c.col2, - from_obj=[ - t1, - t2])), - 'SELECT * FROM table1 AS t1alias, table2 ' - 'AS t2alias WHERE t1alias.col1 = ' - 't2alias.col2') + select(["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]) + ), + "SELECT * FROM table1 AS t1alias, table2 " + "AS t2alias WHERE t1alias.col1 = " + "t2alias.col2", + ) def test_table_to_alias_16(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) - t2alias = t2.alias('t2alias') + t2alias = t2.alias("t2alias") vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile( select([t1alias, t2alias]).where( - t1alias.c.col1 == - vis.traverse(select(['*'], - t1.c.col1 == t2.c.col2, - from_obj=[t1, t2]).correlate(t1)) + t1alias.c.col1 + == vis.traverse( + select( + ["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2] + ).correlate(t1) + ) ), "SELECT t1alias.col1, t1alias.col2, t1alias.col3, " "t2alias.col1, t2alias.col2, t2alias.col3 " "FROM table1 AS t1alias, table2 AS t2alias " "WHERE t1alias.col1 = " "(SELECT * FROM table2 AS t2alias " - "WHERE t1alias.col1 = t2alias.col2)" + "WHERE t1alias.col1 = t2alias.col2)", ) def test_table_to_alias_17(self): - t1alias = t1.alias('t1alias') + t1alias = t1.alias("t1alias") vis = sql_util.ClauseAdapter(t1alias) - t2alias = t2.alias('t2alias') + t2alias = t2.alias("t2alias") vis.chain(sql_util.ClauseAdapter(t2alias)) self.assert_compile( t2alias.select().where( - t2alias.c.col2 == vis.traverse( + t2alias.c.col2 + == vis.traverse( select( - ['*'], - t1.c.col1 == t2.c.col2, - from_obj=[ - t1, - t2]).correlate(t2))), - 'SELECT t2alias.col1, t2alias.col2, t2alias.col3 ' - 'FROM table2 AS t2alias WHERE t2alias.col2 = ' - '(SELECT * FROM table1 AS t1alias WHERE ' - 't1alias.col1 = t2alias.col2)') + ["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2] + ).correlate(t2) + ) + ), + "SELECT t2alias.col1, t2alias.col2, t2alias.col3 " + "FROM table2 AS t2alias WHERE t2alias.col2 = " + "(SELECT * FROM table1 AS t1alias WHERE " + "t1alias.col1 = t2alias.col2)", + ) def test_include_exclude(self): m = MetaData() - a = Table('a', m, - Column('id', Integer, primary_key=True), - Column('xxx_id', Integer, - ForeignKey('a.id', name='adf', use_alter=True) - ) - ) - - e = (a.c.id == a.c.xxx_id) + a = Table( + "a", + m, + Column("id", Integer, primary_key=True), + Column( + "xxx_id", + Integer, + ForeignKey("a.id", name="adf", use_alter=True), + ), + ) + + e = a.c.id == a.c.xxx_id assert str(e) == "a.id = a.xxx_id" b = a.alias() - e = sql_util.ClauseAdapter(b, include_fn=lambda x: x in set([a.c.id]), - equivalents={a.c.id: set([a.c.id])} - ).traverse(e) + e = sql_util.ClauseAdapter( + b, + include_fn=lambda x: x in set([a.c.id]), + equivalents={a.c.id: set([a.c.id])}, + ).traverse(e) assert str(e) == "a_1.id = a.xxx_id" def test_recursive_equivalents(self): m = MetaData() - a = Table('a', m, Column('x', Integer), Column('y', Integer)) - b = Table('b', m, Column('x', Integer), Column('y', Integer)) - c = Table('c', m, Column('x', Integer), Column('y', Integer)) + a = Table("a", m, Column("x", Integer), Column("y", Integer)) + b = Table("b", m, Column("x", Integer), Column("y", Integer)) + c = Table("c", m, Column("x", Integer), Column("y", Integer)) # force a recursion overflow, by linking a.c.x<->c.c.x, and # asking for a nonexistent col. corresponding_column should prevent # endless depth. adapt = sql_util.ClauseAdapter( - b, equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])}) + b, equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])} + ) assert adapt._corresponding_column(a.c.x, False) is None def test_multilevel_equivalents(self): m = MetaData() - a = Table('a', m, Column('x', Integer), Column('y', Integer)) - b = Table('b', m, Column('x', Integer), Column('y', Integer)) - c = Table('c', m, Column('x', Integer), Column('y', Integer)) + a = Table("a", m, Column("x", Integer), Column("y", Integer)) + b = Table("b", m, Column("x", Integer), Column("y", Integer)) + c = Table("c", m, Column("x", Integer), Column("y", Integer)) alias = select([a]).select_from(a.join(b, a.c.x == b.c.x)).alias() # two levels of indirection from c.x->b.x->a.x, requires recursive # corresponding_column call adapt = sql_util.ClauseAdapter( - alias, equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])}) + alias, equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])} + ) assert adapt._corresponding_column(a.c.x, False) is alias.c.x assert adapt._corresponding_column(c.c.x, False) is alias.c.x def test_join_to_alias(self): metadata = MetaData() - a = Table('a', metadata, - Column('id', Integer, primary_key=True)) - b = Table('b', metadata, - Column('id', Integer, primary_key=True), - Column('aid', Integer, ForeignKey('a.id')), - ) - c = Table('c', metadata, - Column('id', Integer, primary_key=True), - Column('bid', Integer, ForeignKey('b.id')), - ) - - d = Table('d', metadata, - Column('id', Integer, primary_key=True), - Column('aid', Integer, ForeignKey('a.id')), - ) + a = Table("a", metadata, Column("id", Integer, primary_key=True)) + b = Table( + "b", + metadata, + Column("id", Integer, primary_key=True), + Column("aid", Integer, ForeignKey("a.id")), + ) + c = Table( + "c", + metadata, + Column("id", Integer, primary_key=True), + Column("bid", Integer, ForeignKey("b.id")), + ) + + d = Table( + "d", + metadata, + Column("id", Integer, primary_key=True), + Column("aid", Integer, ForeignKey("a.id")), + ) j1 = a.outerjoin(b) j2 = select([j1], use_labels=True) @@ -1407,12 +1417,14 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): j3 = c.join(j2, j2.c.b_id == c.c.bid) j4 = j3.outerjoin(d) - self.assert_compile(j4, - 'c JOIN (SELECT a.id AS a_id, b.id AS ' - 'b_id, b.aid AS b_aid FROM a LEFT OUTER ' - 'JOIN b ON a.id = b.aid) ON b_id = c.bid ' - 'LEFT OUTER JOIN d ON a_id = d.aid') - j5 = j3.alias('foo') + self.assert_compile( + j4, + "c JOIN (SELECT a.id AS a_id, b.id AS " + "b_id, b.aid AS b_aid FROM a LEFT OUTER " + "JOIN b ON a.id = b.aid) ON b_id = c.bid " + "LEFT OUTER JOIN d ON a_id = d.aid", + ) + j5 = j3.alias("foo") j6 = sql_util.ClauseAdapter(j5).copy_and_process([j4])[0] # this statement takes c join(a join b), wraps it inside an @@ -1420,14 +1432,16 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): # right side "left outer join d" stays the same, except "d" # joins against foo.a_id instead of plain "a_id" - self.assert_compile(j6, - '(SELECT c.id AS c_id, c.bid AS c_bid, ' - 'a_id AS a_id, b_id AS b_id, b_aid AS ' - 'b_aid FROM c JOIN (SELECT a.id AS a_id, ' - 'b.id AS b_id, b.aid AS b_aid FROM a LEFT ' - 'OUTER JOIN b ON a.id = b.aid) ON b_id = ' - 'c.bid) AS foo LEFT OUTER JOIN d ON ' - 'foo.a_id = d.aid') + self.assert_compile( + j6, + "(SELECT c.id AS c_id, c.bid AS c_bid, " + "a_id AS a_id, b_id AS b_id, b_aid AS " + "b_aid FROM c JOIN (SELECT a.id AS a_id, " + "b.id AS b_id, b.aid AS b_aid FROM a LEFT " + "OUTER JOIN b ON a.id = b.aid) ON b_id = " + "c.bid) AS foo LEFT OUTER JOIN d ON " + "foo.a_id = d.aid", + ) def test_derived_from(self): assert select([t1]).is_derived_from(t1) @@ -1435,7 +1449,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): assert not t1.is_derived_from(select([t1])) assert t1.alias().is_derived_from(t1) - s1 = select([t1, t2]).alias('foo') + s1 = select([t1, t2]).alias("foo") s2 = select([s1]).limit(5).offset(10).alias() assert s2.is_derived_from(s1) s2 = s2._clone() @@ -1445,111 +1459,117 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): # original issue from ticket #904 - s1 = select([t1]).alias('foo') + s1 = select([t1]).alias("foo") s2 = select([s1]).limit(5).offset(10).alias() - self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1), - 'SELECT foo.col1, foo.col2, foo.col3 FROM ' - '(SELECT table1.col1 AS col1, table1.col2 ' - 'AS col2, table1.col3 AS col3 FROM table1) ' - 'AS foo LIMIT :param_1 OFFSET :param_2', - {'param_1': 5, 'param_2': 10}) + self.assert_compile( + sql_util.ClauseAdapter(s2).traverse(s1), + "SELECT foo.col1, foo.col2, foo.col3 FROM " + "(SELECT table1.col1 AS col1, table1.col2 " + "AS col2, table1.col3 AS col3 FROM table1) " + "AS foo LIMIT :param_1 OFFSET :param_2", + {"param_1": 5, "param_2": 10}, + ) def test_aliasedselect_to_aliasedselect_join(self): - s1 = select([t1]).alias('foo') + s1 = select([t1]).alias("foo") s2 = select([s1]).limit(5).offset(10).alias() j = s1.outerjoin(t2, s1.c.col1 == t2.c.col1) - self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), - 'SELECT anon_1.col1, anon_1.col2, ' - 'anon_1.col3, table2.col1, table2.col2, ' - 'table2.col3 FROM (SELECT foo.col1 AS ' - 'col1, foo.col2 AS col2, foo.col3 AS col3 ' - 'FROM (SELECT table1.col1 AS col1, ' - 'table1.col2 AS col2, table1.col3 AS col3 ' - 'FROM table1) AS foo LIMIT :param_1 OFFSET ' - ':param_2) AS anon_1 LEFT OUTER JOIN ' - 'table2 ON anon_1.col1 = table2.col1', - {'param_1': 5, 'param_2': 10}) + self.assert_compile( + sql_util.ClauseAdapter(s2).traverse(j).select(), + "SELECT anon_1.col1, anon_1.col2, " + "anon_1.col3, table2.col1, table2.col2, " + "table2.col3 FROM (SELECT foo.col1 AS " + "col1, foo.col2 AS col2, foo.col3 AS col3 " + "FROM (SELECT table1.col1 AS col1, " + "table1.col2 AS col2, table1.col3 AS col3 " + "FROM table1) AS foo LIMIT :param_1 OFFSET " + ":param_2) AS anon_1 LEFT OUTER JOIN " + "table2 ON anon_1.col1 = table2.col1", + {"param_1": 5, "param_2": 10}, + ) def test_aliasedselect_to_aliasedselect_join_nested_table(self): - s1 = select([t1]).alias('foo') + s1 = select([t1]).alias("foo") s2 = select([s1]).limit(5).offset(10).alias() - talias = t1.alias('bar') + talias = t1.alias("bar") assert not s2.is_derived_from(talias) j = s1.outerjoin(talias, s1.c.col1 == talias.c.col1) - self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), - 'SELECT anon_1.col1, anon_1.col2, ' - 'anon_1.col3, bar.col1, bar.col2, bar.col3 ' - 'FROM (SELECT foo.col1 AS col1, foo.col2 ' - 'AS col2, foo.col3 AS col3 FROM (SELECT ' - 'table1.col1 AS col1, table1.col2 AS col2, ' - 'table1.col3 AS col3 FROM table1) AS foo ' - 'LIMIT :param_1 OFFSET :param_2) AS anon_1 ' - 'LEFT OUTER JOIN table1 AS bar ON ' - 'anon_1.col1 = bar.col1', {'param_1': 5, - 'param_2': 10}) + self.assert_compile( + sql_util.ClauseAdapter(s2).traverse(j).select(), + "SELECT anon_1.col1, anon_1.col2, " + "anon_1.col3, bar.col1, bar.col2, bar.col3 " + "FROM (SELECT foo.col1 AS col1, foo.col2 " + "AS col2, foo.col3 AS col3 FROM (SELECT " + "table1.col1 AS col1, table1.col2 AS col2, " + "table1.col3 AS col3 FROM table1) AS foo " + "LIMIT :param_1 OFFSET :param_2) AS anon_1 " + "LEFT OUTER JOIN table1 AS bar ON " + "anon_1.col1 = bar.col1", + {"param_1": 5, "param_2": 10}, + ) def test_functions(self): self.assert_compile( - sql_util.ClauseAdapter(t1.alias()). - traverse(func.count(t1.c.col1)), - 'count(table1_1.col1)') + sql_util.ClauseAdapter(t1.alias()).traverse(func.count(t1.c.col1)), + "count(table1_1.col1)", + ) s = select([func.count(t1.c.col1)]) - self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(s), - 'SELECT count(table1_1.col1) AS count_1 ' - 'FROM table1 AS table1_1') + self.assert_compile( + sql_util.ClauseAdapter(t1.alias()).traverse(s), + "SELECT count(table1_1.col1) AS count_1 " + "FROM table1 AS table1_1", + ) def test_recursive(self): metadata = MetaData() - a = Table('a', metadata, - Column('id', Integer, primary_key=True)) - b = Table('b', metadata, - Column('id', Integer, primary_key=True), - Column('aid', Integer, ForeignKey('a.id')), - ) - c = Table('c', metadata, - Column('id', Integer, primary_key=True), - Column('bid', Integer, ForeignKey('b.id')), - ) - - d = Table('d', metadata, - Column('id', Integer, primary_key=True), - Column('aid', Integer, ForeignKey('a.id')), - ) + a = Table("a", metadata, Column("id", Integer, primary_key=True)) + b = Table( + "b", + metadata, + Column("id", Integer, primary_key=True), + Column("aid", Integer, ForeignKey("a.id")), + ) + c = Table( + "c", + metadata, + Column("id", Integer, primary_key=True), + Column("bid", Integer, ForeignKey("b.id")), + ) + + d = Table( + "d", + metadata, + Column("id", Integer, primary_key=True), + Column("aid", Integer, ForeignKey("a.id")), + ) u = union( a.join(b).select().apply_labels(), - a.join(d).select().apply_labels() + a.join(d).select().apply_labels(), ).alias() self.assert_compile( - sql_util.ClauseAdapter(u). - traverse(select([c.c.bid]).where(c.c.bid == u.c.b_aid)), + sql_util.ClauseAdapter(u).traverse( + select([c.c.bid]).where(c.c.bid == u.c.b_aid) + ), "SELECT c.bid " "FROM c, (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid " "FROM a JOIN b ON a.id = b.aid UNION SELECT a.id AS a_id, d.id " "AS d_id, d.aid AS d_aid " "FROM a JOIN d ON a.id = d.aid) AS anon_1 " - "WHERE c.bid = anon_1.b_aid" + "WHERE c.bid = anon_1.b_aid", ) - t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - ) - t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) + t1 = table("table1", column("col1"), column("col2"), column("col3")) + t2 = table("table2", column("col1"), column("col2"), column("col3")) def test_label_anonymize_one(self): t1a = t1.alias() adapter = sql_util.ClauseAdapter(t1a, anonymize_labels=True) - expr = select([t1.c.col2]).where(t1.c.col3 == 5).label('expr') + expr = select([t1.c.col2]).where(t1.c.col3 == 5).label("expr") expr_adapted = adapter.traverse(expr) stmt = select([expr, expr_adapted]).order_by(expr, expr_adapted) @@ -1560,7 +1580,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): "AS expr, " "(SELECT table1_1.col2 FROM table1 AS table1_1 " "WHERE table1_1.col3 = :col3_2) AS anon_1 " - "ORDER BY expr, anon_1" + "ORDER BY expr, anon_1", ) def test_label_anonymize_two(self): @@ -1578,14 +1598,14 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): "AS anon_1, " "(SELECT table1_1.col2 FROM table1 AS table1_1 " "WHERE table1_1.col3 = :col3_2) AS anon_2 " - "ORDER BY anon_1, anon_2" + "ORDER BY anon_1, anon_2", ) def test_label_anonymize_three(self): t1a = t1.alias() adapter = sql_util.ColumnAdapter( - t1a, anonymize_labels=True, - allow_label_resolve=False) + t1a, anonymize_labels=True, allow_label_resolve=False + ) expr = select([t1.c.col2]).where(t1.c.col3 == 5).label(None) l1 = expr @@ -1603,236 +1623,235 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = 'default' + __dialect__ = "default" @classmethod def setup_class(cls): global table1, table2, table3, table4 def _table(name): - return table(name, column('col1'), column('col2'), - column('col3')) + return table(name, column("col1"), column("col2"), column("col3")) table1, table2, table3, table4 = [ - _table(name) for name in ( - 'table1', 'table2', 'table3', 'table4')] + _table(name) for name in ("table1", "table2", "table3", "table4") + ] def test_splice(self): t1, t2, t3, t4 = table1, table2, table1.alias(), table2.alias() - j = t1.join( - t2, - t1.c.col1 == t2.c.col1).join( - t3, - t2.c.col1 == t3.c.col1).join( - t4, - t4.c.col1 == t1.c.col1) + j = ( + t1.join(t2, t1.c.col1 == t2.c.col1) + .join(t3, t2.c.col1 == t3.c.col1) + .join(t4, t4.c.col1 == t1.c.col1) + ) s = select([t1]).where(t1.c.col2 < 5).alias() - self.assert_compile(sql_util.splice_joins(s, j), - '(SELECT table1.col1 AS col1, table1.col2 ' - 'AS col2, table1.col3 AS col3 FROM table1 ' - 'WHERE table1.col2 < :col2_1) AS anon_1 ' - 'JOIN table2 ON anon_1.col1 = table2.col1 ' - 'JOIN table1 AS table1_1 ON table2.col1 = ' - 'table1_1.col1 JOIN table2 AS table2_1 ON ' - 'table2_1.col1 = anon_1.col1') + self.assert_compile( + sql_util.splice_joins(s, j), + "(SELECT table1.col1 AS col1, table1.col2 " + "AS col2, table1.col3 AS col3 FROM table1 " + "WHERE table1.col2 < :col2_1) AS anon_1 " + "JOIN table2 ON anon_1.col1 = table2.col1 " + "JOIN table1 AS table1_1 ON table2.col1 = " + "table1_1.col1 JOIN table2 AS table2_1 ON " + "table2_1.col1 = anon_1.col1", + ) def test_stop_on(self): t1, t2, t3 = table1, table2, table3 j1 = t1.join(t2, t1.c.col1 == t2.c.col1) j2 = j1.join(t3, t2.c.col1 == t3.c.col1) s = select([t1]).select_from(j1).alias() - self.assert_compile(sql_util.splice_joins(s, j2), - '(SELECT table1.col1 AS col1, table1.col2 ' - 'AS col2, table1.col3 AS col3 FROM table1 ' - 'JOIN table2 ON table1.col1 = table2.col1) ' - 'AS anon_1 JOIN table2 ON anon_1.col1 = ' - 'table2.col1 JOIN table3 ON table2.col1 = ' - 'table3.col1') - self.assert_compile(sql_util.splice_joins(s, j2, j1), - '(SELECT table1.col1 AS col1, table1.col2 ' - 'AS col2, table1.col3 AS col3 FROM table1 ' - 'JOIN table2 ON table1.col1 = table2.col1) ' - 'AS anon_1 JOIN table3 ON table2.col1 = ' - 'table3.col1') + self.assert_compile( + sql_util.splice_joins(s, j2), + "(SELECT table1.col1 AS col1, table1.col2 " + "AS col2, table1.col3 AS col3 FROM table1 " + "JOIN table2 ON table1.col1 = table2.col1) " + "AS anon_1 JOIN table2 ON anon_1.col1 = " + "table2.col1 JOIN table3 ON table2.col1 = " + "table3.col1", + ) + self.assert_compile( + sql_util.splice_joins(s, j2, j1), + "(SELECT table1.col1 AS col1, table1.col2 " + "AS col2, table1.col3 AS col3 FROM table1 " + "JOIN table2 ON table1.col1 = table2.col1) " + "AS anon_1 JOIN table3 ON table2.col1 = " + "table3.col1", + ) def test_splice_2(self): t2a = table2.alias() t3a = table3.alias() - j1 = table1.join( - t2a, - table1.c.col1 == t2a.c.col1).join( - t3a, - t2a.c.col2 == t3a.c.col2) + j1 = table1.join(t2a, table1.c.col1 == t2a.c.col1).join( + t3a, t2a.c.col2 == t3a.c.col2 + ) t2b = table4.alias() j2 = table1.join(t2b, table1.c.col3 == t2b.c.col3) - self.assert_compile(sql_util.splice_joins(table1, j1), - 'table1 JOIN table2 AS table2_1 ON ' - 'table1.col1 = table2_1.col1 JOIN table3 ' - 'AS table3_1 ON table2_1.col2 = ' - 'table3_1.col2') - self.assert_compile(sql_util.splice_joins(table1, j2), - 'table1 JOIN table4 AS table4_1 ON ' - 'table1.col3 = table4_1.col3') - self.assert_compile( - sql_util.splice_joins( - sql_util.splice_joins( - table1, - j1), - j2), - 'table1 JOIN table2 AS table2_1 ON ' - 'table1.col1 = table2_1.col1 JOIN table3 ' - 'AS table3_1 ON table2_1.col2 = ' - 'table3_1.col2 JOIN table4 AS table4_1 ON ' - 'table1.col3 = table4_1.col3') + self.assert_compile( + sql_util.splice_joins(table1, j1), + "table1 JOIN table2 AS table2_1 ON " + "table1.col1 = table2_1.col1 JOIN table3 " + "AS table3_1 ON table2_1.col2 = " + "table3_1.col2", + ) + self.assert_compile( + sql_util.splice_joins(table1, j2), + "table1 JOIN table4 AS table4_1 ON " "table1.col3 = table4_1.col3", + ) + self.assert_compile( + sql_util.splice_joins(sql_util.splice_joins(table1, j1), j2), + "table1 JOIN table2 AS table2_1 ON " + "table1.col1 = table2_1.col1 JOIN table3 " + "AS table3_1 ON table2_1.col2 = " + "table3_1.col2 JOIN table4 AS table4_1 ON " + "table1.col3 = table4_1.col3", + ) class SelectTest(fixtures.TestBase, AssertsCompiledSQL): """tests the generative capability of Select""" - __dialect__ = 'default' + __dialect__ = "default" @classmethod def setup_class(cls): global t1, t2 - t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - ) - t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) + t1 = table("table1", column("col1"), column("col2"), column("col3")) + t2 = table("table2", column("col1"), column("col2"), column("col3")) def test_columns(self): s = t1.select() - self.assert_compile(s, - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1') - select_copy = s.column(column('yyy')) - self.assert_compile(select_copy, - 'SELECT table1.col1, table1.col2, ' - 'table1.col3, yyy FROM table1') + self.assert_compile( + s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" + ) + select_copy = s.column(column("yyy")) + self.assert_compile( + select_copy, + "SELECT table1.col1, table1.col2, " "table1.col3, yyy FROM table1", + ) assert s.columns is not select_copy.columns assert s._columns is not select_copy._columns assert s._raw_columns is not select_copy._raw_columns - self.assert_compile(s, - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1') + self.assert_compile( + s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" + ) def test_froms(self): s = t1.select() - self.assert_compile(s, - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1') + self.assert_compile( + s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" + ) select_copy = s.select_from(t2) - self.assert_compile(select_copy, - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1, table2') + self.assert_compile( + select_copy, + "SELECT table1.col1, table1.col2, " + "table1.col3 FROM table1, table2", + ) assert s._froms is not select_copy._froms - self.assert_compile(s, - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1') + self.assert_compile( + s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" + ) def test_prefixes(self): s = t1.select() - self.assert_compile(s, - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1') - select_copy = s.prefix_with('FOOBER') - self.assert_compile(select_copy, - 'SELECT FOOBER table1.col1, table1.col2, ' - 'table1.col3 FROM table1') - self.assert_compile(s, - 'SELECT table1.col1, table1.col2, ' - 'table1.col3 FROM table1') + self.assert_compile( + s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" + ) + select_copy = s.prefix_with("FOOBER") + self.assert_compile( + select_copy, + "SELECT FOOBER table1.col1, table1.col2, " + "table1.col3 FROM table1", + ) + self.assert_compile( + s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" + ) def test_execution_options(self): - s = select().execution_options(foo='bar') - s2 = s.execution_options(bar='baz') - s3 = s.execution_options(foo='not bar') + s = select().execution_options(foo="bar") + s2 = s.execution_options(bar="baz") + s3 = s.execution_options(foo="not bar") # The original select should not be modified. - assert s._execution_options == dict(foo='bar') + assert s._execution_options == dict(foo="bar") # s2 should have its execution_options based on s, though. - assert s2._execution_options == dict(foo='bar', bar='baz') - assert s3._execution_options == dict(foo='not bar') + assert s2._execution_options == dict(foo="bar", bar="baz") + assert s3._execution_options == dict(foo="not bar") def test_invalid_options(self): assert_raises( - exc.ArgumentError, - select().execution_options, compiled_cache={} + exc.ArgumentError, select().execution_options, compiled_cache={} ) assert_raises( exc.ArgumentError, select().execution_options, - isolation_level='READ_COMMITTED' + isolation_level="READ_COMMITTED", ) # this feature not available yet def _NOTYET_test_execution_options_in_kwargs(self): - s = select(execution_options=dict(foo='bar')) - s2 = s.execution_options(bar='baz') + s = select(execution_options=dict(foo="bar")) + s2 = s.execution_options(bar="baz") # The original select should not be modified. - assert s._execution_options == dict(foo='bar') + assert s._execution_options == dict(foo="bar") # s2 should have its execution_options based on s, though. - assert s2._execution_options == dict(foo='bar', bar='baz') + assert s2._execution_options == dict(foo="bar", bar="baz") # this feature not available yet def _NOTYET_test_execution_options_in_text(self): - s = text('select 42', execution_options=dict(foo='bar')) - assert s._execution_options == dict(foo='bar') + s = text("select 42", execution_options=dict(foo="bar")) + assert s._execution_options == dict(foo="bar") class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): """Tests the generative capability of Insert, Update""" - __dialect__ = 'default' + __dialect__ = "default" # fixme: consolidate converage from elsewhere here and expand @classmethod def setup_class(cls): global t1, t2 - t1 = table("table1", - column("col1"), - column("col2"), - column("col3"), - ) - t2 = table("table2", - column("col1"), - column("col2"), - column("col3"), - ) + t1 = table("table1", column("col1"), column("col2"), column("col3")) + t2 = table("table2", column("col1"), column("col2"), column("col3")) def test_prefixes(self): i = t1.insert() - self.assert_compile(i, - "INSERT INTO table1 (col1, col2, col3) " - "VALUES (:col1, :col2, :col3)") + self.assert_compile( + i, + "INSERT INTO table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)", + ) gen = i.prefix_with("foober") - self.assert_compile(gen, - "INSERT foober INTO table1 (col1, col2, col3) " - "VALUES (:col1, :col2, :col3)") + self.assert_compile( + gen, + "INSERT foober INTO table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)", + ) - self.assert_compile(i, - "INSERT INTO table1 (col1, col2, col3) " - "VALUES (:col1, :col2, :col3)") + self.assert_compile( + i, + "INSERT INTO table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)", + ) - i2 = t1.insert(prefixes=['squiznart']) - self.assert_compile(i2, - "INSERT squiznart INTO table1 (col1, col2, col3) " - "VALUES (:col1, :col2, :col3)") + i2 = t1.insert(prefixes=["squiznart"]) + self.assert_compile( + i2, + "INSERT squiznart INTO table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)", + ) gen2 = i2.prefix_with("quux") - self.assert_compile(gen2, - "INSERT squiznart quux INTO " - "table1 (col1, col2, col3) " - "VALUES (:col1, :col2, :col3)") + self.assert_compile( + gen2, + "INSERT squiznart quux INTO " + "table1 (col1, col2, col3) " + "VALUES (:col1, :col2, :col3)", + ) def test_add_kwarg(self): i = t1.insert() @@ -1857,11 +1876,13 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): i = t1.insert() eq_(i.parameters, None) i = i.values([(5, 6, 7), (8, 9, 10)]) - eq_(i.parameters, [ - {"col1": 5, "col2": 6, "col3": 7}, - {"col1": 8, "col2": 9, "col3": 10}, - ] - ) + eq_( + i.parameters, + [ + {"col1": 5, "col2": 6, "col3": 7}, + {"col1": 8, "col2": 9, "col3": 10}, + ], + ) def test_inline_values_single(self): i = t1.insert(values={"col1": 5}) @@ -1895,7 +1916,8 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises_message( exc.InvalidRequestError, "This construct already has multiple parameter sets.", - i.values, col2=7 + i.values, + col2=7, ) def test_cant_mix_single_multi_formats_dict_to_list(self): @@ -1904,7 +1926,8 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): exc.ArgumentError, "Can't mix single-values and multiple values " "formats in one statement", - i.values, [{"col1": 6}] + i.values, + [{"col1": 6}], ) def test_cant_mix_single_multi_formats_list_to_dict(self): @@ -1913,7 +1936,8 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): exc.ArgumentError, "Can't mix single-values and multiple values " "formats in one statement", - i.values, {"col1": 5} + i.values, + {"col1": 5}, ) def test_erroneous_multi_args_dicts(self): @@ -1922,7 +1946,9 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): exc.ArgumentError, "Only a single dictionary/tuple or list of " "dictionaries/tuples is accepted positionally.", - i.values, {"col1": 5}, {"col1": 7} + i.values, + {"col1": 5}, + {"col1": 7}, ) def test_erroneous_multi_args_tuples(self): @@ -1931,7 +1957,9 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): exc.ArgumentError, "Only a single dictionary/tuple or list of " "dictionaries/tuples is accepted positionally.", - i.values, (5, 6, 7), (8, 9, 10) + i.values, + (5, 6, 7), + (8, 9, 10), ) def test_erroneous_multi_args_plus_kw(self): @@ -1939,7 +1967,9 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises_message( exc.ArgumentError, "Can't pass kwargs and multiple parameter sets simultaneously", - i.values, [{"col1": 5}], col2=7 + i.values, + [{"col1": 5}], + col2=7, ) def test_update_no_support_multi_values(self): @@ -1947,12 +1977,14 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL): assert_raises_message( exc.InvalidRequestError, "This construct does not support multiple parameter sets.", - u.values, [{"col1": 5}, {"col1": 7}] + u.values, + [{"col1": 5}, {"col1": 7}], ) def test_update_no_support_multi_constructor(self): assert_raises_message( exc.InvalidRequestError, "This construct does not support multiple parameter sets.", - t1.update, values=[{"col1": 5}, {"col1": 7}] + t1.update, + values=[{"col1": 5}, {"col1": 7}], ) |
