diff options
Diffstat (limited to 'test/sql/test_generative.py')
| -rw-r--r-- | test/sql/test_generative.py | 1542 | 
1 files changed, 787 insertions, 755 deletions
| diff --git a/test/sql/test_generative.py b/test/sql/test_generative.py index 8b1436879..1d064dd3a 100644 --- a/test/sql/test_generative.py +++ b/test/sql/test_generative.py @@ -1,21 +1,45 @@  from sqlalchemy.sql import table, column, ClauseElement, operators  from sqlalchemy.sql.expression import _clone, _from_objects -from sqlalchemy import func, select, Integer, Table, \ -    Column, MetaData, extract, String, bindparam, tuple_, and_, union, text,\ -    case, ForeignKey, literal_column -from sqlalchemy.testing import fixtures, AssertsExecutionResults, \ -    AssertsCompiledSQL +from sqlalchemy import ( +    func, +    select, +    Integer, +    Table, +    Column, +    MetaData, +    extract, +    String, +    bindparam, +    tuple_, +    and_, +    union, +    text, +    case, +    ForeignKey, +    literal_column, +) +from sqlalchemy.testing import ( +    fixtures, +    AssertsExecutionResults, +    AssertsCompiledSQL, +)  from sqlalchemy import testing -from sqlalchemy.sql.visitors import ClauseVisitor, CloningVisitor, \ -    cloned_traverse, ReplacingCloningVisitor +from sqlalchemy.sql.visitors import ( +    ClauseVisitor, +    CloningVisitor, +    cloned_traverse, +    ReplacingCloningVisitor, +)  from sqlalchemy.sql import visitors  from sqlalchemy import exc  from sqlalchemy.sql import util as sql_util -from sqlalchemy.testing import (eq_, -                                is_, -                                is_not_, -                                assert_raises, -                                assert_raises_message) +from sqlalchemy.testing import ( +    eq_, +    is_, +    is_not_, +    assert_raises, +    assert_raises_message, +)  A = B = t1 = t2 = t3 = table1 = table2 = table3 = table4 = None @@ -33,7 +57,7 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):          # define deep equality semantics as well as deep          # identity semantics.          class A(ClauseElement): -            __visit_name__ = 'a' +            __visit_name__ = "a"              def __init__(self, expr):                  self.expr = expr @@ -53,7 +77,7 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):                  return "A(%s)" % repr(self.expr)          class B(ClauseElement): -            __visit_name__ = 'b' +            __visit_name__ = "b"              def __init__(self, *items):                  self.items = items @@ -93,8 +117,9 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):          a1 = A("expr1")          struct = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3"))          struct2 = B(a1, A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3")) -        struct3 = B(a1, A("expr2"), B(A("expr1b"), -                                      A("expr2bmodified")), A("expr3")) +        struct3 = B( +            a1, A("expr2"), B(A("expr1b"), A("expr2bmodified")), A("expr3") +        )          assert a1.is_other(a1)          assert struct.is_other(struct) @@ -104,11 +129,11 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):          assert not struct.is_other(struct3)      def test_clone(self): -        struct = B(A("expr1"), A("expr2"), B(A("expr1b"), -                                             A("expr2b")), A("expr3")) +        struct = B( +            A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3") +        )          class Vis(CloningVisitor): -              def visit_a(self, a):                  pass @@ -121,11 +146,11 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):          assert not struct.is_other(s2)      def test_no_clone(self): -        struct = B(A("expr1"), A("expr2"), B(A("expr1b"), -                                             A("expr2b")), A("expr3")) +        struct = B( +            A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3") +        )          class Vis(ClauseVisitor): -              def visit_a(self, a):                  pass @@ -139,7 +164,8 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):      def test_clone_anon_label(self):          from sqlalchemy.sql.elements import Grouping -        c1 = Grouping(literal_column('q')) + +        c1 = Grouping(literal_column("q"))          s1 = select([c1])          class Vis(CloningVisitor): @@ -151,15 +177,23 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):          eq_(list(s2.inner_columns)[0].anon_label, c1.anon_label)      def test_change_in_place(self): -        struct = B(A("expr1"), A("expr2"), B(A("expr1b"), -                                             A("expr2b")), A("expr3")) -        struct2 = B(A("expr1"), A("expr2modified"), B(A("expr1b"), -                                                      A("expr2b")), A("expr3")) -        struct3 = B(A("expr1"), A("expr2"), B(A("expr1b"), -                                              A("expr2bmodified")), A("expr3")) +        struct = B( +            A("expr1"), A("expr2"), B(A("expr1b"), A("expr2b")), A("expr3") +        ) +        struct2 = B( +            A("expr1"), +            A("expr2modified"), +            B(A("expr1b"), A("expr2b")), +            A("expr3"), +        ) +        struct3 = B( +            A("expr1"), +            A("expr2"), +            B(A("expr1b"), A("expr2bmodified")), +            A("expr3"), +        )          class Vis(CloningVisitor): -              def visit_a(self, a):                  if a.expr == "expr2":                      a.expr = "expr2modified" @@ -174,7 +208,6 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):          assert struct2 == s2          class Vis2(CloningVisitor): -              def visit_a(self, a):                  if a.expr == "expr2b":                      a.expr = "expr2bmodified" @@ -194,9 +227,9 @@ class TraversalTest(fixtures.TestBase, AssertsExecutionResults):          class CustomObj(Column):              pass -        assert CustomObj.__visit_name__ == Column.__visit_name__ == 'column' +        assert CustomObj.__visit_name__ == Column.__visit_name__ == "column" -        foo, bar = CustomObj('foo', String), CustomObj('bar', String) +        foo, bar = CustomObj("foo", String), CustomObj("bar", String)          bin = foo == bar          set(ClauseVisitor().iterate(bin))          assert set(ClauseVisitor().iterate(bin)) == set([foo, bar, bin]) @@ -212,19 +245,13 @@ class BinaryEndpointTraversalTest(fixtures.TestBase):          def visit(binary, l, r):              canary.append((binary.operator, l, r))              print(binary.operator, l, r) +          sql_util.visit_binary_product(visit, expr) -        eq_( -            canary, expected -        ) +        eq_(canary, expected)      def test_basic(self):          a, b = column("a"), column("b") -        self._assert_traversal( -            a == b, -            [ -                (operators.eq, a, b) -            ] -        ) +        self._assert_traversal(a == b, [(operators.eq, a, b)])      def test_with_tuples(self):          a, b, c, d, b1, b1a, b1b, e, f = ( @@ -236,11 +263,9 @@ class BinaryEndpointTraversalTest(fixtures.TestBase):              column("b1a"),              column("b1b"),              column("e"), -            column("f") +            column("f"),          ) -        expr = tuple_( -            a, b, b1 == tuple_(b1a, b1b == d), c -        ) > tuple_( +        expr = tuple_(a, b, b1 == tuple_(b1a, b1b == d), c) > tuple_(              func.go(e + f)          )          self._assert_traversal( @@ -253,8 +278,8 @@ class BinaryEndpointTraversalTest(fixtures.TestBase):                  (operators.eq, b1, b1a),                  (operators.eq, b1b, d),                  (operators.gt, c, e), -                (operators.gt, c, f) -            ] +                (operators.gt, c, f), +            ],          )      def test_composed(self): @@ -267,13 +292,7 @@ class BinaryEndpointTraversalTest(fixtures.TestBase):              column("j"),              column("r"),          ) -        expr = and_( -            (a + b) == q + func.sum(e + f), -            and_( -                j == r, -                f == q -            ) -        ) +        expr = and_((a + b) == q + func.sum(e + f), and_(j == r, f == q))          self._assert_traversal(              expr,              [ @@ -285,7 +304,7 @@ class BinaryEndpointTraversalTest(fixtures.TestBase):                  (operators.eq, b, f),                  (operators.eq, j, r),                  (operators.eq, f, q), -            ] +            ],          )      def test_subquery(self): @@ -293,11 +312,7 @@ class BinaryEndpointTraversalTest(fixtures.TestBase):          subq = select([c]).where(c == a).as_scalar()          expr = and_(a == b, b == subq)          self._assert_traversal( -            expr, -            [ -                (operators.eq, a, b), -                (operators.eq, b, subq), -            ] +            expr, [(operators.eq, a, b), (operators.eq, b, subq)]          ) @@ -305,36 +320,31 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):      """test copy-in-place behavior of various ClauseElements.""" -    __dialect__ = 'default' +    __dialect__ = "default"      @classmethod      def setup_class(cls):          global t1, t2, t3 -        t1 = table("table1", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) -        t2 = table("table2", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) -        t3 = Table('table3', MetaData(), -                   Column('col1', Integer), -                   Column('col2', Integer) -                   ) +        t1 = table("table1", column("col1"), column("col2"), column("col3")) +        t2 = table("table2", column("col1"), column("col2"), column("col3")) +        t3 = Table( +            "table3", +            MetaData(), +            Column("col1", Integer), +            Column("col2", Integer), +        )      def test_binary(self):          clause = t1.c.col2 == t2.c.col2          eq_(str(clause), str(CloningVisitor().traverse(clause)))      def test_binary_anon_label_quirk(self): -        t = table('t1', column('col1')) +        t = table("t1", column("col1"))          f = t.c.col1 * 5 -        self.assert_compile(select([f]), -                            "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1") +        self.assert_compile( +            select([f]), "SELECT t1.col1 * :col1_1 AS anon_1 FROM t1" +        )          f.anon_label @@ -342,9 +352,8 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          f = sql_util.ClauseAdapter(a).traverse(f)          self.assert_compile( -            select( -                [f]), -            "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1") +            select([f]), "SELECT t1_1.col1 * :col1_1 AS anon_1 FROM t1 AS t1_1" +        )      def test_join(self):          clause = t1.join(t2, t1.c.col2 == t2.c.col2) @@ -352,7 +361,6 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          assert str(clause) == str(CloningVisitor().traverse(clause))          class Vis(CloningVisitor): -              def visit_binary(self, binary):                  binary.right = t2.c.col3 @@ -368,24 +376,23 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          adapter = sql_util.ColumnAdapter(aliased) -        f = select([ -            adapter.columns[c] -            for c in aliased2.c -        ]).select_from(aliased) +        f = select([adapter.columns[c] for c in aliased2.c]).select_from( +            aliased +        )          s = select([aliased2]).select_from(aliased)          eq_(str(s), str(f)) -        f = select([ -            adapter.columns[func.count(aliased2.c.col1)] -        ]).select_from(aliased) +        f = select([adapter.columns[func.count(aliased2.c.col1)]]).select_from( +            aliased +        )          eq_(              str(select([func.count(aliased2.c.col1)]).select_from(aliased)), -            str(f) +            str(f),          )      def test_aliased_cloned_column_adapt_inner(self): -        clause = select([t1.c.col1, func.foo(t1.c.col2).label('foo')]) +        clause = select([t1.c.col1, func.foo(t1.c.col2).label("foo")])          aliased1 = select([clause.c.col1, clause.c.foo])          aliased2 = clause @@ -397,20 +404,12 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          # aliased2.  corresponding_column checks these          # now.          adapter = sql_util.ColumnAdapter(aliased1) -        f1 = select([ -            adapter.columns[c] -            for c in aliased2._raw_columns -        ]) -        f2 = select([ -            adapter.columns[c] -            for c in aliased3._raw_columns -        ]) -        eq_( -            str(f1), str(f2) -        ) +        f1 = select([adapter.columns[c] for c in aliased2._raw_columns]) +        f2 = select([adapter.columns[c] for c in aliased3._raw_columns]) +        eq_(str(f1), str(f2))      def test_aliased_cloned_column_adapt_exported(self): -        clause = select([t1.c.col1, func.foo(t1.c.col2).label('foo')]) +        clause = select([t1.c.col1, func.foo(t1.c.col2).label("foo")])          aliased1 = select([clause.c.col1, clause.c.foo])          aliased2 = clause @@ -422,20 +421,12 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          # have an _is_clone_of pointer.   But we now modified _make_proxy          # to assign this.          adapter = sql_util.ColumnAdapter(aliased1) -        f1 = select([ -            adapter.columns[c] -            for c in aliased2.c -        ]) -        f2 = select([ -            adapter.columns[c] -            for c in aliased3.c -        ]) -        eq_( -            str(f1), str(f2) -        ) +        f1 = select([adapter.columns[c] for c in aliased2.c]) +        f2 = select([adapter.columns[c] for c in aliased3.c]) +        eq_(str(f1), str(f2))      def test_aliased_cloned_schema_column_adapt_exported(self): -        clause = select([t3.c.col1, func.foo(t3.c.col2).label('foo')]) +        clause = select([t3.c.col1, func.foo(t3.c.col2).label("foo")])          aliased1 = select([clause.c.col1, clause.c.foo])          aliased2 = clause @@ -447,20 +438,12 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          # have an _is_clone_of pointer.   But we now modified _make_proxy          # to assign this.          adapter = sql_util.ColumnAdapter(aliased1) -        f1 = select([ -            adapter.columns[c] -            for c in aliased2.c -        ]) -        f2 = select([ -            adapter.columns[c] -            for c in aliased3.c -        ]) -        eq_( -            str(f1), str(f2) -        ) +        f1 = select([adapter.columns[c] for c in aliased2.c]) +        f2 = select([adapter.columns[c] for c in aliased3.c]) +        eq_(str(f1), str(f2))      def test_labeled_expression_adapt(self): -        lbl_x = (t3.c.col1 == 1).label('x') +        lbl_x = (t3.c.col1 == 1).label("x")          t3_alias = t3.alias()          adapter = sql_util.ColumnAdapter(t3_alias) @@ -471,13 +454,13 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          lblx_adapted = adapter.traverse(lbl_x)          self.assert_compile(              select([lblx_adapted.self_group()]), -            "SELECT (table3_1.col1 = :col1_1) AS x FROM table3 AS table3_1" +            "SELECT (table3_1.col1 = :col1_1) AS x FROM table3 AS table3_1",          )          self.assert_compile(              select([lblx_adapted.is_(True)]),              "SELECT (table3_1.col1 = :col1_1) IS 1 AS anon_1 " -            "FROM table3 AS table3_1" +            "FROM table3 AS table3_1",          )      def test_cte_w_union(self): @@ -486,50 +469,55 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          s = select([func.sum(t.c.n)])          from sqlalchemy.sql.visitors import cloned_traverse +          cloned = cloned_traverse(s, {}, {}) -        self.assert_compile(cloned, -                            "WITH RECURSIVE t(n) AS " -                            "(SELECT values(:values_1) AS n " -                            "UNION ALL SELECT t.n + :n_1 AS anon_1 " -                            "FROM t " -                            "WHERE t.n < :n_2) " -                            "SELECT sum(t.n) AS sum_1 FROM t" -                            ) +        self.assert_compile( +            cloned, +            "WITH RECURSIVE t(n) AS " +            "(SELECT values(:values_1) AS n " +            "UNION ALL SELECT t.n + :n_1 AS anon_1 " +            "FROM t " +            "WHERE t.n < :n_2) " +            "SELECT sum(t.n) AS sum_1 FROM t", +        )      def test_aliased_cte_w_union(self): -        t = select([func.values(1).label("n")]).\ -            cte("t", recursive=True).alias('foo') +        t = ( +            select([func.values(1).label("n")]) +            .cte("t", recursive=True) +            .alias("foo") +        )          t = t.union_all(select([t.c.n + 1]).where(t.c.n < 100))          s = select([func.sum(t.c.n)])          from sqlalchemy.sql.visitors import cloned_traverse +          cloned = cloned_traverse(s, {}, {})          self.assert_compile(              cloned,              "WITH RECURSIVE foo(n) AS (SELECT values(:values_1) AS n "              "UNION ALL SELECT foo.n + :n_1 AS anon_1 FROM foo " -            "WHERE foo.n < :n_2) SELECT sum(foo.n) AS sum_1 FROM foo" +            "WHERE foo.n < :n_2) SELECT sum(foo.n) AS sum_1 FROM foo",          )      def test_text(self):          clause = text( -            "select * from table where foo=:bar", -            bindparams=[bindparam('bar')]) +            "select * from table where foo=:bar", bindparams=[bindparam("bar")] +        )          c1 = str(clause)          class Vis(CloningVisitor): -              def visit_textclause(self, text):                  text.text = text.text + " SOME MODIFIER=:lala" -                text._bindparams['lala'] = bindparam('lala') +                text._bindparams["lala"] = bindparam("lala")          clause2 = Vis().traverse(clause)          assert c1 == str(clause)          assert str(clause2) == c1 + " SOME MODIFIER=:lala" -        assert list(clause._bindparams.keys()) == ['bar'] -        assert set(clause2._bindparams.keys()) == set(['bar', 'lala']) +        assert list(clause._bindparams.keys()) == ["bar"] +        assert set(clause2._bindparams.keys()) == set(["bar", "lala"])      def test_select(self):          s2 = select([t1]) @@ -537,9 +525,9 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          s3_assert = str(select([t1], t1.c.col2 == 7))          class Vis(CloningVisitor): -              def visit_select(self, select):                  select.append_whereclause(t1.c.col2 == 7) +          s3 = Vis().traverse(s2)          assert str(s3) == s3_assert          assert str(s2) == s2_assert @@ -547,18 +535,18 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          print(str(s3))          class Vis(ClauseVisitor): -              def visit_select(self, select):                  select.append_whereclause(t1.c.col2 == 7) +          Vis().traverse(s2)          assert str(s2) == s3_assert          s4_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col3 == 9)))          class Vis(CloningVisitor): -              def visit_select(self, select):                  select.append_whereclause(t1.c.col3 == 9) +          s4 = Vis().traverse(s3)          print(str(s3))          print(str(s4)) @@ -568,11 +556,11 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          s5_assert = str(select([t1], and_(t1.c.col2 == 7, t1.c.col1 == 9)))          class Vis(CloningVisitor): -              def visit_binary(self, binary):                  if binary.left is t1.c.col3:                      binary.left = t1.c.col1                      binary.right = bindparam("col1", unique=True) +          s5 = Vis().traverse(s4)          print(str(s4))          print(str(s5)) @@ -591,18 +579,18 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          assert str(u) == str(u2)          assert [str(c) for c in u2.c] == cols -        s1 = select([t1], t1.c.col1 == bindparam('id_param')) +        s1 = select([t1], t1.c.col1 == bindparam("id_param"))          s2 = select([t2])          u = union(s1, s2)          u2 = u.params(id_param=7)          u3 = u.params(id_param=10)          assert str(u) == str(u2) == str(u3) -        assert u2.compile().params == {'id_param': 7} -        assert u3.compile().params == {'id_param': 10} +        assert u2.compile().params == {"id_param": 7} +        assert u3.compile().params == {"id_param": 10}      def test_in(self): -        expr = t1.c.col1.in_(['foo', 'bar']) +        expr = t1.c.col1.in_(["foo", "bar"])          expr2 = CloningVisitor().traverse(expr)          assert str(expr) == str(expr2) @@ -628,7 +616,7 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):      def test_adapt_union(self):          u = union(              t1.select().where(t1.c.col1 == 4), -            t1.select().where(t1.c.col1 == 5) +            t1.select().where(t1.c.col1 == 5),          ).alias()          assert sql_util.ClauseAdapter(u).traverse(t1) is u @@ -642,48 +630,54 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          s3 = select([s], s.c.col2 == s2.c.col2)          self.assert_compile( -            s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " +            s3, +            "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "              "(SELECT table1.col1 AS col1, table1.col2 AS col2, "              "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :param_1) "              "AS anon_1, "              "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 "              "AS col3 FROM table1 WHERE table1.col1 = :param_2) AS anon_2 " -            "WHERE anon_1.col2 = anon_2.col2") +            "WHERE anon_1.col2 = anon_2.col2", +        )          s = select([t1], t1.c.col1 == 4).alias()          s2 = CloningVisitor().traverse(s).alias()          s3 = select([s], s.c.col2 == s2.c.col2)          self.assert_compile( -            s3, "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM " +            s3, +            "SELECT anon_1.col1, anon_1.col2, anon_1.col3 FROM "              "(SELECT table1.col1 AS col1, table1.col2 AS col2, "              "table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_1) "              "AS anon_1, "              "(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 "              "AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 " -            "WHERE anon_1.col2 = anon_2.col2") +            "WHERE anon_1.col2 = anon_2.col2", +        )      def test_extract(self): -        s = select([extract('foo', t1.c.col1).label('col1')]) +        s = select([extract("foo", t1.c.col1).label("col1")])          self.assert_compile( -            s, -            "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") +            s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1" +        )          s2 = CloningVisitor().traverse(s).alias()          s3 = select([s2.c.col1])          self.assert_compile( -            s, -            "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1") -        self.assert_compile(s3, -                            "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM " -                            "table1.col1) AS col1 FROM table1) AS anon_1") +            s, "SELECT EXTRACT(foo FROM table1.col1) AS col1 FROM table1" +        ) +        self.assert_compile( +            s3, +            "SELECT anon_1.col1 FROM (SELECT EXTRACT(foo FROM " +            "table1.col1) AS col1 FROM table1) AS anon_1", +        ) -    @testing.emits_warning('.*replaced by another column with the same key') +    @testing.emits_warning(".*replaced by another column with the same key")      def test_alias(self): -        subq = t2.select().alias('subq') -        s = select([t1.c.col1, subq.c.col1], -                   from_obj=[t1, subq, -                             t1.join(subq, t1.c.col1 == subq.c.col2)] -                   ) +        subq = t2.select().alias("subq") +        s = select( +            [t1.c.col1, subq.c.col1], +            from_obj=[t1, subq, t1.join(subq, t1.c.col1 == subq.c.col2)], +        )          orig = str(s)          s2 = CloningVisitor().traverse(s)          assert orig == str(s) == str(s2) @@ -691,26 +685,26 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          s4 = CloningVisitor().traverse(s2)          assert orig == str(s) == str(s2) == str(s4) -        s3 = sql_util.ClauseAdapter(table('foo')).traverse(s) +        s3 = sql_util.ClauseAdapter(table("foo")).traverse(s)          assert orig == str(s) == str(s3) -        s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3) +        s4 = sql_util.ClauseAdapter(table("foo")).traverse(s3)          assert orig == str(s) == str(s3) == str(s4) -        subq = subq.alias('subq') -        s = select([t1.c.col1, subq.c.col1], -                   from_obj=[t1, subq, -                             t1.join(subq, t1.c.col1 == subq.c.col2)] -                   ) +        subq = subq.alias("subq") +        s = select( +            [t1.c.col1, subq.c.col1], +            from_obj=[t1, subq, t1.join(subq, t1.c.col1 == subq.c.col2)], +        )          s5 = CloningVisitor().traverse(s)          assert orig == str(s) == str(s5)      def test_correlated_select(self): -        s = select([literal_column('*')], t1.c.col1 == t2.c.col1, -                   from_obj=[t1, t2]).correlate(t2) +        s = select( +            [literal_column("*")], t1.c.col1 == t2.c.col1, from_obj=[t1, t2] +        ).correlate(t2)          class Vis(CloningVisitor): -              def visit_select(self, select):                  select.append_whereclause(t1.c.col2 == 7) @@ -719,26 +713,30 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):              "SELECT table2.col1, table2.col2, table2.col3 "              "FROM table2 WHERE table2.col1 = "              "(SELECT * FROM table1 WHERE table1.col1 = table2.col1 " -            "AND table1.col2 = :col2_1)" +            "AND table1.col2 = :col2_1)",          )      def test_this_thing(self): -        s = select([t1]).where(t1.c.col1 == 'foo').alias() +        s = select([t1]).where(t1.c.col1 == "foo").alias()          s2 = select([s.c.col1]) -        self.assert_compile(s2, -                            'SELECT anon_1.col1 FROM (SELECT ' -                            'table1.col1 AS col1, table1.col2 AS col2, ' -                            'table1.col3 AS col3 FROM table1 WHERE ' -                            'table1.col1 = :col1_1) AS anon_1') +        self.assert_compile( +            s2, +            "SELECT anon_1.col1 FROM (SELECT " +            "table1.col1 AS col1, table1.col2 AS col2, " +            "table1.col3 AS col3 FROM table1 WHERE " +            "table1.col1 = :col1_1) AS anon_1", +        )          t1a = t1.alias()          s2 = sql_util.ClauseAdapter(t1a).traverse(s2) -        self.assert_compile(s2, -                            'SELECT anon_1.col1 FROM (SELECT ' -                            'table1_1.col1 AS col1, table1_1.col2 AS ' -                            'col2, table1_1.col3 AS col3 FROM table1 ' -                            'AS table1_1 WHERE table1_1.col1 = ' -                            ':col1_1) AS anon_1') +        self.assert_compile( +            s2, +            "SELECT anon_1.col1 FROM (SELECT " +            "table1_1.col1 AS col1, table1_1.col2 AS " +            "col2, table1_1.col3 AS col3 FROM table1 " +            "AS table1_1 WHERE table1_1.col1 = " +            ":col1_1) AS anon_1", +        )      def test_select_fromtwice_one(self):          t1a = t1.alias() @@ -746,95 +744,91 @@ class ClauseTest(fixtures.TestBase, AssertsCompiledSQL):          s = select([1], t1.c.col1 == t1a.c.col1, from_obj=t1a).correlate(t1a)          s = select([t1]).where(t1.c.col1 == s)          self.assert_compile( -            s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " +            s, +            "SELECT table1.col1, table1.col2, table1.col3 FROM table1 "              "WHERE table1.col1 = "              "(SELECT 1 FROM table1, table1 AS table1_1 " -            "WHERE table1.col1 = table1_1.col1)") +            "WHERE table1.col1 = table1_1.col1)", +        )          s = CloningVisitor().traverse(s)          self.assert_compile( -            s, "SELECT table1.col1, table1.col2, table1.col3 FROM table1 " +            s, +            "SELECT table1.col1, table1.col2, table1.col3 FROM table1 "              "WHERE table1.col1 = "              "(SELECT 1 FROM table1, table1 AS table1_1 " -            "WHERE table1.col1 = table1_1.col1)") +            "WHERE table1.col1 = table1_1.col1)", +        )      def test_select_fromtwice_two(self): -        s = select([t1]).where(t1.c.col1 == 'foo').alias() +        s = select([t1]).where(t1.c.col1 == "foo").alias()          s2 = select([1], t1.c.col1 == s.c.col1, from_obj=s).correlate(t1)          s3 = select([t1]).where(t1.c.col1 == s2)          self.assert_compile( -            s3, "SELECT table1.col1, table1.col2, table1.col3 " +            s3, +            "SELECT table1.col1, table1.col2, table1.col3 "              "FROM table1 WHERE table1.col1 = "              "(SELECT 1 FROM "              "(SELECT table1.col1 AS col1, table1.col2 AS col2, "              "table1.col3 AS col3 FROM table1 "              "WHERE table1.col1 = :col1_1) " -            "AS anon_1 WHERE table1.col1 = anon_1.col1)") +            "AS anon_1 WHERE table1.col1 = anon_1.col1)", +        )          s4 = ReplacingCloningVisitor().traverse(s3)          self.assert_compile( -            s4, "SELECT table1.col1, table1.col2, table1.col3 " +            s4, +            "SELECT table1.col1, table1.col2, table1.col3 "              "FROM table1 WHERE table1.col1 = "              "(SELECT 1 FROM "              "(SELECT table1.col1 AS col1, table1.col2 AS col2, "              "table1.col3 AS col3 FROM table1 "              "WHERE table1.col1 = :col1_1) " -            "AS anon_1 WHERE table1.col1 = anon_1.col1)") +            "AS anon_1 WHERE table1.col1 = anon_1.col1)", +        )  class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL): -    __dialect__ = 'default' +    __dialect__ = "default"      @classmethod      def setup_class(cls):          global t1, t2 -        t1 = table("table1", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   column("col4") -                   ) -        t2 = table("table2", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) +        t1 = table( +            "table1", +            column("col1"), +            column("col2"), +            column("col3"), +            column("col4"), +        ) +        t2 = table("table2", column("col1"), column("col2"), column("col3"))      def test_traverse_memoizes_w_columns(self):          t1a = t1.alias()          adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) -        expr = select([t1a.c.col1]).label('x') +        expr = select([t1a.c.col1]).label("x")          expr_adapted = adapter.traverse(expr)          is_not_(expr, expr_adapted) -        is_( -            adapter.columns[expr], -            expr_adapted -        ) +        is_(adapter.columns[expr], expr_adapted)      def test_traverse_memoizes_w_itself(self):          t1a = t1.alias()          adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) -        expr = select([t1a.c.col1]).label('x') +        expr = select([t1a.c.col1]).label("x")          expr_adapted = adapter.traverse(expr)          is_not_(expr, expr_adapted) -        is_( -            adapter.traverse(expr), -            expr_adapted -        ) +        is_(adapter.traverse(expr), expr_adapted)      def test_columns_memoizes_w_itself(self):          t1a = t1.alias()          adapter = sql_util.ColumnAdapter(t1a, anonymize_labels=True) -        expr = select([t1a.c.col1]).label('x') +        expr = select([t1a.c.col1]).label("x")          expr_adapted = adapter.columns[expr]          is_not_(expr, expr_adapted) -        is_( -            adapter.columns[expr], -            expr_adapted -        ) +        is_(adapter.columns[expr], expr_adapted)      def test_wrapping_fallthrough(self):          t1a = t1.alias(name="t1a") @@ -850,57 +844,35 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          # t1.c.col1 -> s1.c.t1a_col1          # adapted by a2 -        is_( -            a3.columns[t1.c.col1], s1.c.t1a_col1 -        ) -        is_( -            a4.columns[t1.c.col1], s1.c.t1a_col1 -        ) +        is_(a3.columns[t1.c.col1], s1.c.t1a_col1) +        is_(a4.columns[t1.c.col1], s1.c.t1a_col1)          # chaining can't fall through because a1 grabs it          # first -        is_( -            a5.columns[t1.c.col1], t1a.c.col1 -        ) +        is_(a5.columns[t1.c.col1], t1a.c.col1)          # t2.c.col1 -> s1.c.t2a_col1          # adapted by a2 -        is_( -            a3.columns[t2.c.col1], s1.c.t2a_col1 -        ) -        is_( -            a4.columns[t2.c.col1], s1.c.t2a_col1 -        ) +        is_(a3.columns[t2.c.col1], s1.c.t2a_col1) +        is_(a4.columns[t2.c.col1], s1.c.t2a_col1)          # chaining, t2 hits s1 -        is_( -            a5.columns[t2.c.col1], s1.c.t2a_col1 -        ) +        is_(a5.columns[t2.c.col1], s1.c.t2a_col1)          # t1.c.col2 -> t1a.c.col2          # fallthrough to a1 -        is_( -            a3.columns[t1.c.col2], t1a.c.col2 -        ) -        is_( -            a4.columns[t1.c.col2], t1a.c.col2 -        ) +        is_(a3.columns[t1.c.col2], t1a.c.col2) +        is_(a4.columns[t1.c.col2], t1a.c.col2)          # chaining hits a1 -        is_( -            a5.columns[t1.c.col2], t1a.c.col2 -        ) +        is_(a5.columns[t1.c.col2], t1a.c.col2)          # t2.c.col2 -> t2.c.col2          # fallthrough to no adaption -        is_( -            a3.columns[t2.c.col2], t2.c.col2 -        ) -        is_( -            a4.columns[t2.c.col2], t2.c.col2 -        ) +        is_(a3.columns[t2.c.col2], t2.c.col2) +        is_(a4.columns[t2.c.col2], t2.c.col2)      def test_wrapping_ordering(self):          """illustrate an example where order of wrappers matters. @@ -926,25 +898,15 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          # in different contexts, order of wrapping matters          # t2.c.col1 via a2 is stmt2.c.col1; then ignored by a1 -        is_( -            a2_to_a1.columns[t2.c.col1], stmt2.c.col1 -        ) +        is_(a2_to_a1.columns[t2.c.col1], stmt2.c.col1)          # t2.c.col1 via a1 is stmt.c.table2_col1; a2 then          # sends this to stmt2.c.table2_col1 -        is_( -            a1_to_a2.columns[t2.c.col1], stmt2.c.table2_col1 -        ) +        is_(a1_to_a2.columns[t2.c.col1], stmt2.c.table2_col1)          # for mutually exclusive columns, order doesn't matter -        is_( -            a2_to_a1.columns[t1.c.col1], stmt2.c.table1_col1 -        ) -        is_( -            a1_to_a2.columns[t1.c.col1], stmt2.c.table1_col1 -        ) -        is_( -            a2_to_a1.columns[t2.c.col2], stmt2.c.col2 -        ) +        is_(a2_to_a1.columns[t1.c.col1], stmt2.c.table1_col1) +        is_(a1_to_a2.columns[t1.c.col1], stmt2.c.table1_col1) +        is_(a2_to_a1.columns[t2.c.col2], stmt2.c.col2)      def test_wrapping_multiple(self):          """illustrate that wrapping runs both adapters""" @@ -959,7 +921,7 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          self.assert_compile(              a3.traverse(stmt), -            "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a" +            "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a",          )          # chaining does too because these adapters don't share any @@ -967,7 +929,7 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          a4 = a2.chain(a1)          self.assert_compile(              a4.traverse(stmt), -            "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a" +            "SELECT t1a.col1, t2a.col2 FROM table1 AS t1a, table2 AS t2a",          )      def test_wrapping_inclusions(self): @@ -977,13 +939,13 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          t1a = t1.alias(name="t1a")          t2a = t2.alias(name="t2a")          a1 = sql_util.ColumnAdapter( -            t1a, -            include_fn=lambda col: "a1" in col._annotations) +            t1a, include_fn=lambda col: "a1" in col._annotations +        )          s1 = select([t1a, t2a]).apply_labels().alias()          a2 = sql_util.ColumnAdapter( -            s1, -            include_fn=lambda col: "a2" in col._annotations) +            s1, include_fn=lambda col: "a2" in col._annotations +        )          a3 = a2.wrap(a1)          c1a1 = t1.c.col1._annotate(dict(a1=True)) @@ -994,62 +956,45 @@ class ColumnAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          c2a2 = t2.c.col1._annotate(dict(a2=True))          c2aa = t2.c.col1._annotate(dict(a1=True, a2=True)) -        is_( -            a3.columns[c1a1], t1a.c.col1 -        ) -        is_( -            a3.columns[c1a2], s1.c.t1a_col1 -        ) -        is_( -            a3.columns[c1aa], s1.c.t1a_col1 -        ) +        is_(a3.columns[c1a1], t1a.c.col1) +        is_(a3.columns[c1a2], s1.c.t1a_col1) +        is_(a3.columns[c1aa], s1.c.t1a_col1)          # not covered by a1, accepted by a2 -        is_( -            a3.columns[c2aa], s1.c.t2a_col1 -        ) +        is_(a3.columns[c2aa], s1.c.t2a_col1)          # not covered by a1, accepted by a2 -        is_( -            a3.columns[c2a2], s1.c.t2a_col1 -        ) +        is_(a3.columns[c2a2], s1.c.t2a_col1)          # not covered by a1, rejected by a2 -        is_( -            a3.columns[c2a1], c2a1 -        ) +        is_(a3.columns[c2a1], c2a1)  class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL): -    __dialect__ = 'default' +    __dialect__ = "default"      @classmethod      def setup_class(cls):          global t1, t2 -        t1 = table("table1", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) -        t2 = table("table2", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) +        t1 = table("table1", column("col1"), column("col2"), column("col3")) +        t2 = table("table2", column("col1"), column("col2"), column("col3"))      def test_correlation_on_clone(self): -        t1alias = t1.alias('t1alias') -        t2alias = t2.alias('t2alias') +        t1alias = t1.alias("t1alias") +        t2alias = t2.alias("t2alias")          vis = sql_util.ClauseAdapter(t1alias) -        s = select([literal_column('*')], -                   from_obj=[t1alias, t2alias]).as_scalar() +        s = select( +            [literal_column("*")], from_obj=[t1alias, t2alias] +        ).as_scalar()          assert t2alias in s._froms          assert t1alias in s._froms -        self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), -                            'SELECT * FROM table2 AS t2alias WHERE ' -                            't2alias.col1 = (SELECT * FROM table1 AS ' -                            't1alias)') +        self.assert_compile( +            select([literal_column("*")], t2alias.c.col1 == s), +            "SELECT * FROM table2 AS t2alias WHERE " +            "t2alias.col1 = (SELECT * FROM table1 AS " +            "t1alias)", +        )          s = vis.traverse(s)          assert t2alias not in s._froms  # not present because it's been @@ -1060,61 +1005,91 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          # correlate list on "s" needs to take into account the full          # _cloned_set for each element in _froms when correlating -        self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), -                            'SELECT * FROM table2 AS t2alias WHERE ' -                            't2alias.col1 = (SELECT * FROM table1 AS ' -                            't1alias)') -        s = select([literal_column('*')], -                   from_obj=[t1alias, t2alias]).correlate(t2alias).as_scalar() -        self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), -                            'SELECT * FROM table2 AS t2alias WHERE ' -                            't2alias.col1 = (SELECT * FROM table1 AS ' -                            't1alias)') +        self.assert_compile( +            select([literal_column("*")], t2alias.c.col1 == s), +            "SELECT * FROM table2 AS t2alias WHERE " +            "t2alias.col1 = (SELECT * FROM table1 AS " +            "t1alias)", +        ) +        s = ( +            select([literal_column("*")], from_obj=[t1alias, t2alias]) +            .correlate(t2alias) +            .as_scalar() +        ) +        self.assert_compile( +            select([literal_column("*")], t2alias.c.col1 == s), +            "SELECT * FROM table2 AS t2alias WHERE " +            "t2alias.col1 = (SELECT * FROM table1 AS " +            "t1alias)", +        )          s = vis.traverse(s) -        self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), -                            'SELECT * FROM table2 AS t2alias WHERE ' -                            't2alias.col1 = (SELECT * FROM table1 AS ' -                            't1alias)') +        self.assert_compile( +            select([literal_column("*")], t2alias.c.col1 == s), +            "SELECT * FROM table2 AS t2alias WHERE " +            "t2alias.col1 = (SELECT * FROM table1 AS " +            "t1alias)", +        )          s = CloningVisitor().traverse(s) -        self.assert_compile(select([literal_column('*')], t2alias.c.col1 == s), -                            'SELECT * FROM table2 AS t2alias WHERE ' -                            't2alias.col1 = (SELECT * FROM table1 AS ' -                            't1alias)') +        self.assert_compile( +            select([literal_column("*")], t2alias.c.col1 == s), +            "SELECT * FROM table2 AS t2alias WHERE " +            "t2alias.col1 = (SELECT * FROM table1 AS " +            "t1alias)", +        ) -        s = select([literal_column('*')]).where(t1.c.col1 == t2.c.col1) \ +        s = ( +            select([literal_column("*")]) +            .where(t1.c.col1 == t2.c.col1)              .as_scalar() -        self.assert_compile(select([t1.c.col1, s]), -                            'SELECT table1.col1, (SELECT * FROM table2 ' -                            'WHERE table1.col1 = table2.col1) AS ' -                            'anon_1 FROM table1') +        ) +        self.assert_compile( +            select([t1.c.col1, s]), +            "SELECT table1.col1, (SELECT * FROM table2 " +            "WHERE table1.col1 = table2.col1) AS " +            "anon_1 FROM table1", +        )          vis = sql_util.ClauseAdapter(t1alias)          s = vis.traverse(s) -        self.assert_compile(select([t1alias.c.col1, s]), -                            'SELECT t1alias.col1, (SELECT * FROM ' -                            'table2 WHERE t1alias.col1 = table2.col1) ' -                            'AS anon_1 FROM table1 AS t1alias') +        self.assert_compile( +            select([t1alias.c.col1, s]), +            "SELECT t1alias.col1, (SELECT * FROM " +            "table2 WHERE t1alias.col1 = table2.col1) " +            "AS anon_1 FROM table1 AS t1alias", +        )          s = CloningVisitor().traverse(s) -        self.assert_compile(select([t1alias.c.col1, s]), -                            'SELECT t1alias.col1, (SELECT * FROM ' -                            'table2 WHERE t1alias.col1 = table2.col1) ' -                            'AS anon_1 FROM table1 AS t1alias') -        s = select([literal_column('*')]).where(t1.c.col1 == t2.c.col1) \ -            .correlate(t1).as_scalar() -        self.assert_compile(select([t1.c.col1, s]), -                            'SELECT table1.col1, (SELECT * FROM table2 ' -                            'WHERE table1.col1 = table2.col1) AS ' -                            'anon_1 FROM table1') +        self.assert_compile( +            select([t1alias.c.col1, s]), +            "SELECT t1alias.col1, (SELECT * FROM " +            "table2 WHERE t1alias.col1 = table2.col1) " +            "AS anon_1 FROM table1 AS t1alias", +        ) +        s = ( +            select([literal_column("*")]) +            .where(t1.c.col1 == t2.c.col1) +            .correlate(t1) +            .as_scalar() +        ) +        self.assert_compile( +            select([t1.c.col1, s]), +            "SELECT table1.col1, (SELECT * FROM table2 " +            "WHERE table1.col1 = table2.col1) AS " +            "anon_1 FROM table1", +        )          vis = sql_util.ClauseAdapter(t1alias)          s = vis.traverse(s) -        self.assert_compile(select([t1alias.c.col1, s]), -                            'SELECT t1alias.col1, (SELECT * FROM ' -                            'table2 WHERE t1alias.col1 = table2.col1) ' -                            'AS anon_1 FROM table1 AS t1alias') +        self.assert_compile( +            select([t1alias.c.col1, s]), +            "SELECT t1alias.col1, (SELECT * FROM " +            "table2 WHERE t1alias.col1 = table2.col1) " +            "AS anon_1 FROM table1 AS t1alias", +        )          s = CloningVisitor().traverse(s) -        self.assert_compile(select([t1alias.c.col1, s]), -                            'SELECT t1alias.col1, (SELECT * FROM ' -                            'table2 WHERE t1alias.col1 = table2.col1) ' -                            'AS anon_1 FROM table1 AS t1alias') +        self.assert_compile( +            select([t1alias.c.col1, s]), +            "SELECT t1alias.col1, (SELECT * FROM " +            "table2 WHERE t1alias.col1 = table2.col1) " +            "AS anon_1 FROM table1 AS t1alias", +        )      @testing.fails_on_everything_except()      def test_joins_dont_adapt(self): @@ -1122,284 +1097,319 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          # make much sense. ClauseAdapter doesn't make any changes if          # it's against a straight join. -        users = table('users', column('id')) -        addresses = table('addresses', column('id'), column('user_id')) +        users = table("users", column("id")) +        addresses = table("addresses", column("id"), column("user_id"))          ualias = users.alias() -        s = select([func.count(addresses.c.id)], users.c.id -                   == addresses.c.user_id).correlate(users) +        s = select( +            [func.count(addresses.c.id)], users.c.id == addresses.c.user_id +        ).correlate(users)          s = sql_util.ClauseAdapter(ualias).traverse(s)          j1 = addresses.join(ualias, addresses.c.user_id == ualias.c.id) -        self.assert_compile(sql_util.ClauseAdapter(j1).traverse(s), -                            'SELECT count(addresses.id) AS count_1 ' -                            'FROM addresses WHERE users_1.id = ' -                            'addresses.user_id') +        self.assert_compile( +            sql_util.ClauseAdapter(j1).traverse(s), +            "SELECT count(addresses.id) AS count_1 " +            "FROM addresses WHERE users_1.id = " +            "addresses.user_id", +        )      def test_table_to_alias_1(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias) -        ff = vis.traverse(func.count(t1.c.col1).label('foo')) +        ff = vis.traverse(func.count(t1.c.col1).label("foo"))          assert list(_from_objects(ff)) == [t1alias]      def test_table_to_alias_2(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias)          self.assert_compile( -            vis.traverse(select([literal_column('*')], from_obj=[t1])), -            'SELECT * FROM table1 AS t1alias') +            vis.traverse(select([literal_column("*")], from_obj=[t1])), +            "SELECT * FROM table1 AS t1alias", +        )      def test_table_to_alias_3(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias)          self.assert_compile( -            select([literal_column('*')], t1.c.col1 == t2.c.col2), -            'SELECT * FROM table1, table2 WHERE table1.col1 = table2.col2') +            select([literal_column("*")], t1.c.col1 == t2.c.col2), +            "SELECT * FROM table1, table2 WHERE table1.col1 = table2.col2", +        )      def test_table_to_alias_4(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias) -        self.assert_compile(vis.traverse(select([literal_column('*')], -                                                t1.c.col1 == t2.c.col2)), -                            'SELECT * FROM table1 AS t1alias, table2 ' -                            'WHERE t1alias.col1 = table2.col2') +        self.assert_compile( +            vis.traverse( +                select([literal_column("*")], t1.c.col1 == t2.c.col2) +            ), +            "SELECT * FROM table1 AS t1alias, table2 " +            "WHERE t1alias.col1 = table2.col2", +        )      def test_table_to_alias_5(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias)          self.assert_compile(              vis.traverse(                  select( -                    [literal_column('*')], +                    [literal_column("*")],                      t1.c.col1 == t2.c.col2, -                    from_obj=[ -                        t1, -                        t2])), -            'SELECT * FROM table1 AS t1alias, table2 ' -            'WHERE t1alias.col1 = table2.col2') +                    from_obj=[t1, t2], +                ) +            ), +            "SELECT * FROM table1 AS t1alias, table2 " +            "WHERE t1alias.col1 = table2.col2", +        )      def test_table_to_alias_6(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias) -        self.assert_compile(select([t1alias, t2]).where( -            t1alias.c.col1 == vis.traverse( -                select([literal_column('*')], -                       t1.c.col1 == t2.c.col2, from_obj=[t1, t2]).correlate(t1) -            ) -        ), +        self.assert_compile( +            select([t1alias, t2]).where( +                t1alias.c.col1 +                == vis.traverse( +                    select( +                        [literal_column("*")], +                        t1.c.col1 == t2.c.col2, +                        from_obj=[t1, t2], +                    ).correlate(t1) +                ) +            ),              "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "              "table2.col1, table2.col2, table2.col3 "              "FROM table1 AS t1alias, table2 WHERE t1alias.col1 = " -            "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)" +            "(SELECT * FROM table2 WHERE t1alias.col1 = table2.col2)",          )      def test_table_to_alias_7(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias)          self.assert_compile( -            select([t1alias, t2]). -            where(t1alias.c.col1 == vis.traverse( -                select([literal_column('*')], -                       t1.c.col1 == t2.c.col2, from_obj=[t1, t2]). -                correlate(t2))), +            select([t1alias, t2]).where( +                t1alias.c.col1 +                == vis.traverse( +                    select( +                        [literal_column("*")], +                        t1.c.col1 == t2.c.col2, +                        from_obj=[t1, t2], +                    ).correlate(t2) +                ) +            ),              "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "              "table2.col1, table2.col2, table2.col3 "              "FROM table1 AS t1alias, table2 "              "WHERE t1alias.col1 = "              "(SELECT * FROM table1 AS t1alias " -            "WHERE t1alias.col1 = table2.col2)") +            "WHERE t1alias.col1 = table2.col2)", +        )      def test_table_to_alias_8(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias)          self.assert_compile(              vis.traverse(case([(t1.c.col1 == 5, t1.c.col2)], else_=t1.c.col1)), -            'CASE WHEN (t1alias.col1 = :col1_1) THEN ' -            't1alias.col2 ELSE t1alias.col1 END') +            "CASE WHEN (t1alias.col1 = :col1_1) THEN " +            "t1alias.col2 ELSE t1alias.col1 END", +        )      def test_table_to_alias_9(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias)          self.assert_compile(              vis.traverse( -                case( -                    [ -                        (5, -                         t1.c.col2)], -                    value=t1.c.col1, -                    else_=t1.c.col1)), -            'CASE t1alias.col1 WHEN :param_1 THEN ' -            't1alias.col2 ELSE t1alias.col1 END') +                case([(5, t1.c.col2)], value=t1.c.col1, else_=t1.c.col1) +            ), +            "CASE t1alias.col1 WHEN :param_1 THEN " +            "t1alias.col2 ELSE t1alias.col1 END", +        )      def test_table_to_alias_10(self): -        s = select([literal_column('*')], from_obj=[t1]).alias('foo') -        self.assert_compile(s.select(), -                            'SELECT foo.* FROM (SELECT * FROM table1) ' -                            'AS foo') +        s = select([literal_column("*")], from_obj=[t1]).alias("foo") +        self.assert_compile( +            s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo" +        )      def test_table_to_alias_11(self): -        s = select([literal_column('*')], from_obj=[t1]).alias('foo') -        t1alias = t1.alias('t1alias') +        s = select([literal_column("*")], from_obj=[t1]).alias("foo") +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias) -        self.assert_compile(vis.traverse(s.select()), -                            'SELECT foo.* FROM (SELECT * FROM table1 ' -                            'AS t1alias) AS foo') +        self.assert_compile( +            vis.traverse(s.select()), +            "SELECT foo.* FROM (SELECT * FROM table1 " "AS t1alias) AS foo", +        )      def test_table_to_alias_12(self): -        s = select([literal_column('*')], from_obj=[t1]).alias('foo') -        self.assert_compile(s.select(), -                            'SELECT foo.* FROM (SELECT * FROM table1) ' -                            'AS foo') +        s = select([literal_column("*")], from_obj=[t1]).alias("foo") +        self.assert_compile( +            s.select(), "SELECT foo.* FROM (SELECT * FROM table1) " "AS foo" +        )      def test_table_to_alias_13(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias) -        ff = vis.traverse(func.count(t1.c.col1).label('foo')) -        self.assert_compile(select([ff]), -                            'SELECT count(t1alias.col1) AS foo FROM ' -                            'table1 AS t1alias') +        ff = vis.traverse(func.count(t1.c.col1).label("foo")) +        self.assert_compile( +            select([ff]), +            "SELECT count(t1alias.col1) AS foo FROM " "table1 AS t1alias", +        )          assert list(_from_objects(ff)) == [t1alias]      # def test_table_to_alias_2(self): -        # TODO: self.assert_compile(vis.traverse(select([func.count(t1.c -        # .col1).l abel('foo')]), clone=True), "SELECT -        # count(t1alias.col1) AS foo FROM table1 AS t1alias") +    # TODO: self.assert_compile(vis.traverse(select([func.count(t1.c +    # .col1).l abel('foo')]), clone=True), "SELECT +    # count(t1alias.col1) AS foo FROM table1 AS t1alias")      def test_table_to_alias_14(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias) -        t2alias = t2.alias('t2alias') +        t2alias = t2.alias("t2alias")          vis.chain(sql_util.ClauseAdapter(t2alias))          self.assert_compile(              vis.traverse( -                select([literal_column('*')], t1.c.col1 == t2.c.col2)), -            'SELECT * FROM table1 AS t1alias, table2 ' -            'AS t2alias WHERE t1alias.col1 = ' -            't2alias.col2') +                select([literal_column("*")], t1.c.col1 == t2.c.col2) +            ), +            "SELECT * FROM table1 AS t1alias, table2 " +            "AS t2alias WHERE t1alias.col1 = " +            "t2alias.col2", +        )      def test_table_to_alias_15(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias) -        t2alias = t2.alias('t2alias') +        t2alias = t2.alias("t2alias")          vis.chain(sql_util.ClauseAdapter(t2alias))          self.assert_compile(              vis.traverse( -                select( -                    ['*'], -                    t1.c.col1 == t2.c.col2, -                    from_obj=[ -                        t1, -                        t2])), -            'SELECT * FROM table1 AS t1alias, table2 ' -            'AS t2alias WHERE t1alias.col1 = ' -            't2alias.col2') +                select(["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2]) +            ), +            "SELECT * FROM table1 AS t1alias, table2 " +            "AS t2alias WHERE t1alias.col1 = " +            "t2alias.col2", +        )      def test_table_to_alias_16(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias) -        t2alias = t2.alias('t2alias') +        t2alias = t2.alias("t2alias")          vis.chain(sql_util.ClauseAdapter(t2alias))          self.assert_compile(              select([t1alias, t2alias]).where( -                t1alias.c.col1 == -                vis.traverse(select(['*'], -                                    t1.c.col1 == t2.c.col2, -                                    from_obj=[t1, t2]).correlate(t1)) +                t1alias.c.col1 +                == vis.traverse( +                    select( +                        ["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2] +                    ).correlate(t1) +                )              ),              "SELECT t1alias.col1, t1alias.col2, t1alias.col3, "              "t2alias.col1, t2alias.col2, t2alias.col3 "              "FROM table1 AS t1alias, table2 AS t2alias "              "WHERE t1alias.col1 = "              "(SELECT * FROM table2 AS t2alias " -            "WHERE t1alias.col1 = t2alias.col2)" +            "WHERE t1alias.col1 = t2alias.col2)",          )      def test_table_to_alias_17(self): -        t1alias = t1.alias('t1alias') +        t1alias = t1.alias("t1alias")          vis = sql_util.ClauseAdapter(t1alias) -        t2alias = t2.alias('t2alias') +        t2alias = t2.alias("t2alias")          vis.chain(sql_util.ClauseAdapter(t2alias))          self.assert_compile(              t2alias.select().where( -                t2alias.c.col2 == vis.traverse( +                t2alias.c.col2 +                == vis.traverse(                      select( -                        ['*'], -                        t1.c.col1 == t2.c.col2, -                        from_obj=[ -                            t1, -                            t2]).correlate(t2))), -            'SELECT t2alias.col1, t2alias.col2, t2alias.col3 ' -            'FROM table2 AS t2alias WHERE t2alias.col2 = ' -            '(SELECT * FROM table1 AS t1alias WHERE ' -            't1alias.col1 = t2alias.col2)') +                        ["*"], t1.c.col1 == t2.c.col2, from_obj=[t1, t2] +                    ).correlate(t2) +                ) +            ), +            "SELECT t2alias.col1, t2alias.col2, t2alias.col3 " +            "FROM table2 AS t2alias WHERE t2alias.col2 = " +            "(SELECT * FROM table1 AS t1alias WHERE " +            "t1alias.col1 = t2alias.col2)", +        )      def test_include_exclude(self):          m = MetaData() -        a = Table('a', m, -                  Column('id', Integer, primary_key=True), -                  Column('xxx_id', Integer, -                         ForeignKey('a.id', name='adf', use_alter=True) -                         ) -                  ) - -        e = (a.c.id == a.c.xxx_id) +        a = Table( +            "a", +            m, +            Column("id", Integer, primary_key=True), +            Column( +                "xxx_id", +                Integer, +                ForeignKey("a.id", name="adf", use_alter=True), +            ), +        ) + +        e = a.c.id == a.c.xxx_id          assert str(e) == "a.id = a.xxx_id"          b = a.alias() -        e = sql_util.ClauseAdapter(b, include_fn=lambda x: x in set([a.c.id]), -                                   equivalents={a.c.id: set([a.c.id])} -                                   ).traverse(e) +        e = sql_util.ClauseAdapter( +            b, +            include_fn=lambda x: x in set([a.c.id]), +            equivalents={a.c.id: set([a.c.id])}, +        ).traverse(e)          assert str(e) == "a_1.id = a.xxx_id"      def test_recursive_equivalents(self):          m = MetaData() -        a = Table('a', m, Column('x', Integer), Column('y', Integer)) -        b = Table('b', m, Column('x', Integer), Column('y', Integer)) -        c = Table('c', m, Column('x', Integer), Column('y', Integer)) +        a = Table("a", m, Column("x", Integer), Column("y", Integer)) +        b = Table("b", m, Column("x", Integer), Column("y", Integer)) +        c = Table("c", m, Column("x", Integer), Column("y", Integer))          # force a recursion overflow, by linking a.c.x<->c.c.x, and          # asking for a nonexistent col.  corresponding_column should prevent          # endless depth.          adapt = sql_util.ClauseAdapter( -            b, equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])}) +            b, equivalents={a.c.x: set([c.c.x]), c.c.x: set([a.c.x])} +        )          assert adapt._corresponding_column(a.c.x, False) is None      def test_multilevel_equivalents(self):          m = MetaData() -        a = Table('a', m, Column('x', Integer), Column('y', Integer)) -        b = Table('b', m, Column('x', Integer), Column('y', Integer)) -        c = Table('c', m, Column('x', Integer), Column('y', Integer)) +        a = Table("a", m, Column("x", Integer), Column("y", Integer)) +        b = Table("b", m, Column("x", Integer), Column("y", Integer)) +        c = Table("c", m, Column("x", Integer), Column("y", Integer))          alias = select([a]).select_from(a.join(b, a.c.x == b.c.x)).alias()          # two levels of indirection from c.x->b.x->a.x, requires recursive          # corresponding_column call          adapt = sql_util.ClauseAdapter( -            alias, equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])}) +            alias, equivalents={b.c.x: set([a.c.x]), c.c.x: set([b.c.x])} +        )          assert adapt._corresponding_column(a.c.x, False) is alias.c.x          assert adapt._corresponding_column(c.c.x, False) is alias.c.x      def test_join_to_alias(self):          metadata = MetaData() -        a = Table('a', metadata, -                  Column('id', Integer, primary_key=True)) -        b = Table('b', metadata, -                  Column('id', Integer, primary_key=True), -                  Column('aid', Integer, ForeignKey('a.id')), -                  ) -        c = Table('c', metadata, -                  Column('id', Integer, primary_key=True), -                  Column('bid', Integer, ForeignKey('b.id')), -                  ) - -        d = Table('d', metadata, -                  Column('id', Integer, primary_key=True), -                  Column('aid', Integer, ForeignKey('a.id')), -                  ) +        a = Table("a", metadata, Column("id", Integer, primary_key=True)) +        b = Table( +            "b", +            metadata, +            Column("id", Integer, primary_key=True), +            Column("aid", Integer, ForeignKey("a.id")), +        ) +        c = Table( +            "c", +            metadata, +            Column("id", Integer, primary_key=True), +            Column("bid", Integer, ForeignKey("b.id")), +        ) + +        d = Table( +            "d", +            metadata, +            Column("id", Integer, primary_key=True), +            Column("aid", Integer, ForeignKey("a.id")), +        )          j1 = a.outerjoin(b)          j2 = select([j1], use_labels=True) @@ -1407,12 +1417,14 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          j3 = c.join(j2, j2.c.b_id == c.c.bid)          j4 = j3.outerjoin(d) -        self.assert_compile(j4, -                            'c JOIN (SELECT a.id AS a_id, b.id AS ' -                            'b_id, b.aid AS b_aid FROM a LEFT OUTER ' -                            'JOIN b ON a.id = b.aid) ON b_id = c.bid ' -                            'LEFT OUTER JOIN d ON a_id = d.aid') -        j5 = j3.alias('foo') +        self.assert_compile( +            j4, +            "c JOIN (SELECT a.id AS a_id, b.id AS " +            "b_id, b.aid AS b_aid FROM a LEFT OUTER " +            "JOIN b ON a.id = b.aid) ON b_id = c.bid " +            "LEFT OUTER JOIN d ON a_id = d.aid", +        ) +        j5 = j3.alias("foo")          j6 = sql_util.ClauseAdapter(j5).copy_and_process([j4])[0]          # this statement takes c join(a join b), wraps it inside an @@ -1420,14 +1432,16 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          # right side "left outer join d" stays the same, except "d"          # joins against foo.a_id instead of plain "a_id" -        self.assert_compile(j6, -                            '(SELECT c.id AS c_id, c.bid AS c_bid, ' -                            'a_id AS a_id, b_id AS b_id, b_aid AS ' -                            'b_aid FROM c JOIN (SELECT a.id AS a_id, ' -                            'b.id AS b_id, b.aid AS b_aid FROM a LEFT ' -                            'OUTER JOIN b ON a.id = b.aid) ON b_id = ' -                            'c.bid) AS foo LEFT OUTER JOIN d ON ' -                            'foo.a_id = d.aid') +        self.assert_compile( +            j6, +            "(SELECT c.id AS c_id, c.bid AS c_bid, " +            "a_id AS a_id, b_id AS b_id, b_aid AS " +            "b_aid FROM c JOIN (SELECT a.id AS a_id, " +            "b.id AS b_id, b.aid AS b_aid FROM a LEFT " +            "OUTER JOIN b ON a.id = b.aid) ON b_id = " +            "c.bid) AS foo LEFT OUTER JOIN d ON " +            "foo.a_id = d.aid", +        )      def test_derived_from(self):          assert select([t1]).is_derived_from(t1) @@ -1435,7 +1449,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          assert not t1.is_derived_from(select([t1]))          assert t1.alias().is_derived_from(t1) -        s1 = select([t1, t2]).alias('foo') +        s1 = select([t1, t2]).alias("foo")          s2 = select([s1]).limit(5).offset(10).alias()          assert s2.is_derived_from(s1)          s2 = s2._clone() @@ -1445,111 +1459,117 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):          # original issue from ticket #904 -        s1 = select([t1]).alias('foo') +        s1 = select([t1]).alias("foo")          s2 = select([s1]).limit(5).offset(10).alias() -        self.assert_compile(sql_util.ClauseAdapter(s2).traverse(s1), -                            'SELECT foo.col1, foo.col2, foo.col3 FROM ' -                            '(SELECT table1.col1 AS col1, table1.col2 ' -                            'AS col2, table1.col3 AS col3 FROM table1) ' -                            'AS foo LIMIT :param_1 OFFSET :param_2', -                            {'param_1': 5, 'param_2': 10}) +        self.assert_compile( +            sql_util.ClauseAdapter(s2).traverse(s1), +            "SELECT foo.col1, foo.col2, foo.col3 FROM " +            "(SELECT table1.col1 AS col1, table1.col2 " +            "AS col2, table1.col3 AS col3 FROM table1) " +            "AS foo LIMIT :param_1 OFFSET :param_2", +            {"param_1": 5, "param_2": 10}, +        )      def test_aliasedselect_to_aliasedselect_join(self): -        s1 = select([t1]).alias('foo') +        s1 = select([t1]).alias("foo")          s2 = select([s1]).limit(5).offset(10).alias()          j = s1.outerjoin(t2, s1.c.col1 == t2.c.col1) -        self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), -                            'SELECT anon_1.col1, anon_1.col2, ' -                            'anon_1.col3, table2.col1, table2.col2, ' -                            'table2.col3 FROM (SELECT foo.col1 AS ' -                            'col1, foo.col2 AS col2, foo.col3 AS col3 ' -                            'FROM (SELECT table1.col1 AS col1, ' -                            'table1.col2 AS col2, table1.col3 AS col3 ' -                            'FROM table1) AS foo LIMIT :param_1 OFFSET ' -                            ':param_2) AS anon_1 LEFT OUTER JOIN ' -                            'table2 ON anon_1.col1 = table2.col1', -                            {'param_1': 5, 'param_2': 10}) +        self.assert_compile( +            sql_util.ClauseAdapter(s2).traverse(j).select(), +            "SELECT anon_1.col1, anon_1.col2, " +            "anon_1.col3, table2.col1, table2.col2, " +            "table2.col3 FROM (SELECT foo.col1 AS " +            "col1, foo.col2 AS col2, foo.col3 AS col3 " +            "FROM (SELECT table1.col1 AS col1, " +            "table1.col2 AS col2, table1.col3 AS col3 " +            "FROM table1) AS foo LIMIT :param_1 OFFSET " +            ":param_2) AS anon_1 LEFT OUTER JOIN " +            "table2 ON anon_1.col1 = table2.col1", +            {"param_1": 5, "param_2": 10}, +        )      def test_aliasedselect_to_aliasedselect_join_nested_table(self): -        s1 = select([t1]).alias('foo') +        s1 = select([t1]).alias("foo")          s2 = select([s1]).limit(5).offset(10).alias() -        talias = t1.alias('bar') +        talias = t1.alias("bar")          assert not s2.is_derived_from(talias)          j = s1.outerjoin(talias, s1.c.col1 == talias.c.col1) -        self.assert_compile(sql_util.ClauseAdapter(s2).traverse(j).select(), -                            'SELECT anon_1.col1, anon_1.col2, ' -                            'anon_1.col3, bar.col1, bar.col2, bar.col3 ' -                            'FROM (SELECT foo.col1 AS col1, foo.col2 ' -                            'AS col2, foo.col3 AS col3 FROM (SELECT ' -                            'table1.col1 AS col1, table1.col2 AS col2, ' -                            'table1.col3 AS col3 FROM table1) AS foo ' -                            'LIMIT :param_1 OFFSET :param_2) AS anon_1 ' -                            'LEFT OUTER JOIN table1 AS bar ON ' -                            'anon_1.col1 = bar.col1', {'param_1': 5, -                                                       'param_2': 10}) +        self.assert_compile( +            sql_util.ClauseAdapter(s2).traverse(j).select(), +            "SELECT anon_1.col1, anon_1.col2, " +            "anon_1.col3, bar.col1, bar.col2, bar.col3 " +            "FROM (SELECT foo.col1 AS col1, foo.col2 " +            "AS col2, foo.col3 AS col3 FROM (SELECT " +            "table1.col1 AS col1, table1.col2 AS col2, " +            "table1.col3 AS col3 FROM table1) AS foo " +            "LIMIT :param_1 OFFSET :param_2) AS anon_1 " +            "LEFT OUTER JOIN table1 AS bar ON " +            "anon_1.col1 = bar.col1", +            {"param_1": 5, "param_2": 10}, +        )      def test_functions(self):          self.assert_compile( -            sql_util.ClauseAdapter(t1.alias()). -            traverse(func.count(t1.c.col1)), -            'count(table1_1.col1)') +            sql_util.ClauseAdapter(t1.alias()).traverse(func.count(t1.c.col1)), +            "count(table1_1.col1)", +        )          s = select([func.count(t1.c.col1)]) -        self.assert_compile(sql_util.ClauseAdapter(t1.alias()).traverse(s), -                            'SELECT count(table1_1.col1) AS count_1 ' -                            'FROM table1 AS table1_1') +        self.assert_compile( +            sql_util.ClauseAdapter(t1.alias()).traverse(s), +            "SELECT count(table1_1.col1) AS count_1 " +            "FROM table1 AS table1_1", +        )      def test_recursive(self):          metadata = MetaData() -        a = Table('a', metadata, -                  Column('id', Integer, primary_key=True)) -        b = Table('b', metadata, -                  Column('id', Integer, primary_key=True), -                  Column('aid', Integer, ForeignKey('a.id')), -                  ) -        c = Table('c', metadata, -                  Column('id', Integer, primary_key=True), -                  Column('bid', Integer, ForeignKey('b.id')), -                  ) - -        d = Table('d', metadata, -                  Column('id', Integer, primary_key=True), -                  Column('aid', Integer, ForeignKey('a.id')), -                  ) +        a = Table("a", metadata, Column("id", Integer, primary_key=True)) +        b = Table( +            "b", +            metadata, +            Column("id", Integer, primary_key=True), +            Column("aid", Integer, ForeignKey("a.id")), +        ) +        c = Table( +            "c", +            metadata, +            Column("id", Integer, primary_key=True), +            Column("bid", Integer, ForeignKey("b.id")), +        ) + +        d = Table( +            "d", +            metadata, +            Column("id", Integer, primary_key=True), +            Column("aid", Integer, ForeignKey("a.id")), +        )          u = union(              a.join(b).select().apply_labels(), -            a.join(d).select().apply_labels() +            a.join(d).select().apply_labels(),          ).alias()          self.assert_compile( -            sql_util.ClauseAdapter(u). -            traverse(select([c.c.bid]).where(c.c.bid == u.c.b_aid)), +            sql_util.ClauseAdapter(u).traverse( +                select([c.c.bid]).where(c.c.bid == u.c.b_aid) +            ),              "SELECT c.bid "              "FROM c, (SELECT a.id AS a_id, b.id AS b_id, b.aid AS b_aid "              "FROM a JOIN b ON a.id = b.aid UNION SELECT a.id AS a_id, d.id "              "AS d_id, d.aid AS d_aid "              "FROM a JOIN d ON a.id = d.aid) AS anon_1 " -            "WHERE c.bid = anon_1.b_aid" +            "WHERE c.bid = anon_1.b_aid",          ) -        t1 = table("table1", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) -        t2 = table("table2", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) +        t1 = table("table1", column("col1"), column("col2"), column("col3")) +        t2 = table("table2", column("col1"), column("col2"), column("col3"))      def test_label_anonymize_one(self):          t1a = t1.alias()          adapter = sql_util.ClauseAdapter(t1a, anonymize_labels=True) -        expr = select([t1.c.col2]).where(t1.c.col3 == 5).label('expr') +        expr = select([t1.c.col2]).where(t1.c.col3 == 5).label("expr")          expr_adapted = adapter.traverse(expr)          stmt = select([expr, expr_adapted]).order_by(expr, expr_adapted) @@ -1560,7 +1580,7 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):              "AS expr, "              "(SELECT table1_1.col2 FROM table1 AS table1_1 "              "WHERE table1_1.col3 = :col3_2) AS anon_1 " -            "ORDER BY expr, anon_1" +            "ORDER BY expr, anon_1",          )      def test_label_anonymize_two(self): @@ -1578,14 +1598,14 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):              "AS anon_1, "              "(SELECT table1_1.col2 FROM table1 AS table1_1 "              "WHERE table1_1.col3 = :col3_2) AS anon_2 " -            "ORDER BY anon_1, anon_2" +            "ORDER BY anon_1, anon_2",          )      def test_label_anonymize_three(self):          t1a = t1.alias()          adapter = sql_util.ColumnAdapter( -            t1a, anonymize_labels=True, -            allow_label_resolve=False) +            t1a, anonymize_labels=True, allow_label_resolve=False +        )          expr = select([t1.c.col2]).where(t1.c.col3 == 5).label(None)          l1 = expr @@ -1603,236 +1623,235 @@ class ClauseAdapterTest(fixtures.TestBase, AssertsCompiledSQL):  class SpliceJoinsTest(fixtures.TestBase, AssertsCompiledSQL): -    __dialect__ = 'default' +    __dialect__ = "default"      @classmethod      def setup_class(cls):          global table1, table2, table3, table4          def _table(name): -            return table(name, column('col1'), column('col2'), -                         column('col3')) +            return table(name, column("col1"), column("col2"), column("col3"))          table1, table2, table3, table4 = [ -            _table(name) for name in ( -                'table1', 'table2', 'table3', 'table4')] +            _table(name) for name in ("table1", "table2", "table3", "table4") +        ]      def test_splice(self):          t1, t2, t3, t4 = table1, table2, table1.alias(), table2.alias() -        j = t1.join( -            t2, -            t1.c.col1 == t2.c.col1).join( -            t3, -            t2.c.col1 == t3.c.col1).join( -            t4, -            t4.c.col1 == t1.c.col1) +        j = ( +            t1.join(t2, t1.c.col1 == t2.c.col1) +            .join(t3, t2.c.col1 == t3.c.col1) +            .join(t4, t4.c.col1 == t1.c.col1) +        )          s = select([t1]).where(t1.c.col2 < 5).alias() -        self.assert_compile(sql_util.splice_joins(s, j), -                            '(SELECT table1.col1 AS col1, table1.col2 ' -                            'AS col2, table1.col3 AS col3 FROM table1 ' -                            'WHERE table1.col2 < :col2_1) AS anon_1 ' -                            'JOIN table2 ON anon_1.col1 = table2.col1 ' -                            'JOIN table1 AS table1_1 ON table2.col1 = ' -                            'table1_1.col1 JOIN table2 AS table2_1 ON ' -                            'table2_1.col1 = anon_1.col1') +        self.assert_compile( +            sql_util.splice_joins(s, j), +            "(SELECT table1.col1 AS col1, table1.col2 " +            "AS col2, table1.col3 AS col3 FROM table1 " +            "WHERE table1.col2 < :col2_1) AS anon_1 " +            "JOIN table2 ON anon_1.col1 = table2.col1 " +            "JOIN table1 AS table1_1 ON table2.col1 = " +            "table1_1.col1 JOIN table2 AS table2_1 ON " +            "table2_1.col1 = anon_1.col1", +        )      def test_stop_on(self):          t1, t2, t3 = table1, table2, table3          j1 = t1.join(t2, t1.c.col1 == t2.c.col1)          j2 = j1.join(t3, t2.c.col1 == t3.c.col1)          s = select([t1]).select_from(j1).alias() -        self.assert_compile(sql_util.splice_joins(s, j2), -                            '(SELECT table1.col1 AS col1, table1.col2 ' -                            'AS col2, table1.col3 AS col3 FROM table1 ' -                            'JOIN table2 ON table1.col1 = table2.col1) ' -                            'AS anon_1 JOIN table2 ON anon_1.col1 = ' -                            'table2.col1 JOIN table3 ON table2.col1 = ' -                            'table3.col1') -        self.assert_compile(sql_util.splice_joins(s, j2, j1), -                            '(SELECT table1.col1 AS col1, table1.col2 ' -                            'AS col2, table1.col3 AS col3 FROM table1 ' -                            'JOIN table2 ON table1.col1 = table2.col1) ' -                            'AS anon_1 JOIN table3 ON table2.col1 = ' -                            'table3.col1') +        self.assert_compile( +            sql_util.splice_joins(s, j2), +            "(SELECT table1.col1 AS col1, table1.col2 " +            "AS col2, table1.col3 AS col3 FROM table1 " +            "JOIN table2 ON table1.col1 = table2.col1) " +            "AS anon_1 JOIN table2 ON anon_1.col1 = " +            "table2.col1 JOIN table3 ON table2.col1 = " +            "table3.col1", +        ) +        self.assert_compile( +            sql_util.splice_joins(s, j2, j1), +            "(SELECT table1.col1 AS col1, table1.col2 " +            "AS col2, table1.col3 AS col3 FROM table1 " +            "JOIN table2 ON table1.col1 = table2.col1) " +            "AS anon_1 JOIN table3 ON table2.col1 = " +            "table3.col1", +        )      def test_splice_2(self):          t2a = table2.alias()          t3a = table3.alias() -        j1 = table1.join( -            t2a, -            table1.c.col1 == t2a.c.col1).join( -            t3a, -            t2a.c.col2 == t3a.c.col2) +        j1 = table1.join(t2a, table1.c.col1 == t2a.c.col1).join( +            t3a, t2a.c.col2 == t3a.c.col2 +        )          t2b = table4.alias()          j2 = table1.join(t2b, table1.c.col3 == t2b.c.col3) -        self.assert_compile(sql_util.splice_joins(table1, j1), -                            'table1 JOIN table2 AS table2_1 ON ' -                            'table1.col1 = table2_1.col1 JOIN table3 ' -                            'AS table3_1 ON table2_1.col2 = ' -                            'table3_1.col2') -        self.assert_compile(sql_util.splice_joins(table1, j2), -                            'table1 JOIN table4 AS table4_1 ON ' -                            'table1.col3 = table4_1.col3') -        self.assert_compile( -            sql_util.splice_joins( -                sql_util.splice_joins( -                    table1, -                    j1), -                j2), -            'table1 JOIN table2 AS table2_1 ON ' -            'table1.col1 = table2_1.col1 JOIN table3 ' -            'AS table3_1 ON table2_1.col2 = ' -            'table3_1.col2 JOIN table4 AS table4_1 ON ' -            'table1.col3 = table4_1.col3') +        self.assert_compile( +            sql_util.splice_joins(table1, j1), +            "table1 JOIN table2 AS table2_1 ON " +            "table1.col1 = table2_1.col1 JOIN table3 " +            "AS table3_1 ON table2_1.col2 = " +            "table3_1.col2", +        ) +        self.assert_compile( +            sql_util.splice_joins(table1, j2), +            "table1 JOIN table4 AS table4_1 ON " "table1.col3 = table4_1.col3", +        ) +        self.assert_compile( +            sql_util.splice_joins(sql_util.splice_joins(table1, j1), j2), +            "table1 JOIN table2 AS table2_1 ON " +            "table1.col1 = table2_1.col1 JOIN table3 " +            "AS table3_1 ON table2_1.col2 = " +            "table3_1.col2 JOIN table4 AS table4_1 ON " +            "table1.col3 = table4_1.col3", +        )  class SelectTest(fixtures.TestBase, AssertsCompiledSQL):      """tests the generative capability of Select""" -    __dialect__ = 'default' +    __dialect__ = "default"      @classmethod      def setup_class(cls):          global t1, t2 -        t1 = table("table1", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) -        t2 = table("table2", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) +        t1 = table("table1", column("col1"), column("col2"), column("col3")) +        t2 = table("table2", column("col1"), column("col2"), column("col3"))      def test_columns(self):          s = t1.select() -        self.assert_compile(s, -                            'SELECT table1.col1, table1.col2, ' -                            'table1.col3 FROM table1') -        select_copy = s.column(column('yyy')) -        self.assert_compile(select_copy, -                            'SELECT table1.col1, table1.col2, ' -                            'table1.col3, yyy FROM table1') +        self.assert_compile( +            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" +        ) +        select_copy = s.column(column("yyy")) +        self.assert_compile( +            select_copy, +            "SELECT table1.col1, table1.col2, " "table1.col3, yyy FROM table1", +        )          assert s.columns is not select_copy.columns          assert s._columns is not select_copy._columns          assert s._raw_columns is not select_copy._raw_columns -        self.assert_compile(s, -                            'SELECT table1.col1, table1.col2, ' -                            'table1.col3 FROM table1') +        self.assert_compile( +            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" +        )      def test_froms(self):          s = t1.select() -        self.assert_compile(s, -                            'SELECT table1.col1, table1.col2, ' -                            'table1.col3 FROM table1') +        self.assert_compile( +            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" +        )          select_copy = s.select_from(t2) -        self.assert_compile(select_copy, -                            'SELECT table1.col1, table1.col2, ' -                            'table1.col3 FROM table1, table2') +        self.assert_compile( +            select_copy, +            "SELECT table1.col1, table1.col2, " +            "table1.col3 FROM table1, table2", +        )          assert s._froms is not select_copy._froms -        self.assert_compile(s, -                            'SELECT table1.col1, table1.col2, ' -                            'table1.col3 FROM table1') +        self.assert_compile( +            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" +        )      def test_prefixes(self):          s = t1.select() -        self.assert_compile(s, -                            'SELECT table1.col1, table1.col2, ' -                            'table1.col3 FROM table1') -        select_copy = s.prefix_with('FOOBER') -        self.assert_compile(select_copy, -                            'SELECT FOOBER table1.col1, table1.col2, ' -                            'table1.col3 FROM table1') -        self.assert_compile(s, -                            'SELECT table1.col1, table1.col2, ' -                            'table1.col3 FROM table1') +        self.assert_compile( +            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" +        ) +        select_copy = s.prefix_with("FOOBER") +        self.assert_compile( +            select_copy, +            "SELECT FOOBER table1.col1, table1.col2, " +            "table1.col3 FROM table1", +        ) +        self.assert_compile( +            s, "SELECT table1.col1, table1.col2, " "table1.col3 FROM table1" +        )      def test_execution_options(self): -        s = select().execution_options(foo='bar') -        s2 = s.execution_options(bar='baz') -        s3 = s.execution_options(foo='not bar') +        s = select().execution_options(foo="bar") +        s2 = s.execution_options(bar="baz") +        s3 = s.execution_options(foo="not bar")          # The original select should not be modified. -        assert s._execution_options == dict(foo='bar') +        assert s._execution_options == dict(foo="bar")          # s2 should have its execution_options based on s, though. -        assert s2._execution_options == dict(foo='bar', bar='baz') -        assert s3._execution_options == dict(foo='not bar') +        assert s2._execution_options == dict(foo="bar", bar="baz") +        assert s3._execution_options == dict(foo="not bar")      def test_invalid_options(self):          assert_raises( -            exc.ArgumentError, -            select().execution_options, compiled_cache={} +            exc.ArgumentError, select().execution_options, compiled_cache={}          )          assert_raises(              exc.ArgumentError,              select().execution_options, -            isolation_level='READ_COMMITTED' +            isolation_level="READ_COMMITTED",          )      # this feature not available yet      def _NOTYET_test_execution_options_in_kwargs(self): -        s = select(execution_options=dict(foo='bar')) -        s2 = s.execution_options(bar='baz') +        s = select(execution_options=dict(foo="bar")) +        s2 = s.execution_options(bar="baz")          # The original select should not be modified. -        assert s._execution_options == dict(foo='bar') +        assert s._execution_options == dict(foo="bar")          # s2 should have its execution_options based on s, though. -        assert s2._execution_options == dict(foo='bar', bar='baz') +        assert s2._execution_options == dict(foo="bar", bar="baz")      # this feature not available yet      def _NOTYET_test_execution_options_in_text(self): -        s = text('select 42', execution_options=dict(foo='bar')) -        assert s._execution_options == dict(foo='bar') +        s = text("select 42", execution_options=dict(foo="bar")) +        assert s._execution_options == dict(foo="bar")  class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):      """Tests the generative capability of Insert, Update""" -    __dialect__ = 'default' +    __dialect__ = "default"      # fixme: consolidate converage from elsewhere here and expand      @classmethod      def setup_class(cls):          global t1, t2 -        t1 = table("table1", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) -        t2 = table("table2", -                   column("col1"), -                   column("col2"), -                   column("col3"), -                   ) +        t1 = table("table1", column("col1"), column("col2"), column("col3")) +        t2 = table("table2", column("col1"), column("col2"), column("col3"))      def test_prefixes(self):          i = t1.insert() -        self.assert_compile(i, -                            "INSERT INTO table1 (col1, col2, col3) " -                            "VALUES (:col1, :col2, :col3)") +        self.assert_compile( +            i, +            "INSERT INTO table1 (col1, col2, col3) " +            "VALUES (:col1, :col2, :col3)", +        )          gen = i.prefix_with("foober") -        self.assert_compile(gen, -                            "INSERT foober INTO table1 (col1, col2, col3) " -                            "VALUES (:col1, :col2, :col3)") +        self.assert_compile( +            gen, +            "INSERT foober INTO table1 (col1, col2, col3) " +            "VALUES (:col1, :col2, :col3)", +        ) -        self.assert_compile(i, -                            "INSERT INTO table1 (col1, col2, col3) " -                            "VALUES (:col1, :col2, :col3)") +        self.assert_compile( +            i, +            "INSERT INTO table1 (col1, col2, col3) " +            "VALUES (:col1, :col2, :col3)", +        ) -        i2 = t1.insert(prefixes=['squiznart']) -        self.assert_compile(i2, -                            "INSERT squiznart INTO table1 (col1, col2, col3) " -                            "VALUES (:col1, :col2, :col3)") +        i2 = t1.insert(prefixes=["squiznart"]) +        self.assert_compile( +            i2, +            "INSERT squiznart INTO table1 (col1, col2, col3) " +            "VALUES (:col1, :col2, :col3)", +        )          gen2 = i2.prefix_with("quux") -        self.assert_compile(gen2, -                            "INSERT squiznart quux INTO " -                            "table1 (col1, col2, col3) " -                            "VALUES (:col1, :col2, :col3)") +        self.assert_compile( +            gen2, +            "INSERT squiznart quux INTO " +            "table1 (col1, col2, col3) " +            "VALUES (:col1, :col2, :col3)", +        )      def test_add_kwarg(self):          i = t1.insert() @@ -1857,11 +1876,13 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):          i = t1.insert()          eq_(i.parameters, None)          i = i.values([(5, 6, 7), (8, 9, 10)]) -        eq_(i.parameters, [ -            {"col1": 5, "col2": 6, "col3": 7}, -            {"col1": 8, "col2": 9, "col3": 10}, -            ] -            ) +        eq_( +            i.parameters, +            [ +                {"col1": 5, "col2": 6, "col3": 7}, +                {"col1": 8, "col2": 9, "col3": 10}, +            ], +        )      def test_inline_values_single(self):          i = t1.insert(values={"col1": 5}) @@ -1895,7 +1916,8 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):          assert_raises_message(              exc.InvalidRequestError,              "This construct already has multiple parameter sets.", -            i.values, col2=7 +            i.values, +            col2=7,          )      def test_cant_mix_single_multi_formats_dict_to_list(self): @@ -1904,7 +1926,8 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):              exc.ArgumentError,              "Can't mix single-values and multiple values "              "formats in one statement", -            i.values, [{"col1": 6}] +            i.values, +            [{"col1": 6}],          )      def test_cant_mix_single_multi_formats_list_to_dict(self): @@ -1913,7 +1936,8 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):              exc.ArgumentError,              "Can't mix single-values and multiple values "              "formats in one statement", -            i.values, {"col1": 5} +            i.values, +            {"col1": 5},          )      def test_erroneous_multi_args_dicts(self): @@ -1922,7 +1946,9 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):              exc.ArgumentError,              "Only a single dictionary/tuple or list of "              "dictionaries/tuples is accepted positionally.", -            i.values, {"col1": 5}, {"col1": 7} +            i.values, +            {"col1": 5}, +            {"col1": 7},          )      def test_erroneous_multi_args_tuples(self): @@ -1931,7 +1957,9 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):              exc.ArgumentError,              "Only a single dictionary/tuple or list of "              "dictionaries/tuples is accepted positionally.", -            i.values, (5, 6, 7), (8, 9, 10) +            i.values, +            (5, 6, 7), +            (8, 9, 10),          )      def test_erroneous_multi_args_plus_kw(self): @@ -1939,7 +1967,9 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):          assert_raises_message(              exc.ArgumentError,              "Can't pass kwargs and multiple parameter sets simultaneously", -            i.values, [{"col1": 5}], col2=7 +            i.values, +            [{"col1": 5}], +            col2=7,          )      def test_update_no_support_multi_values(self): @@ -1947,12 +1977,14 @@ class ValuesBaseTest(fixtures.TestBase, AssertsCompiledSQL):          assert_raises_message(              exc.InvalidRequestError,              "This construct does not support multiple parameter sets.", -            u.values, [{"col1": 5}, {"col1": 7}] +            u.values, +            [{"col1": 5}, {"col1": 7}],          )      def test_update_no_support_multi_constructor(self):          assert_raises_message(              exc.InvalidRequestError,              "This construct does not support multiple parameter sets.", -            t1.update, values=[{"col1": 5}, {"col1": 7}] +            t1.update, +            values=[{"col1": 5}, {"col1": 7}],          ) | 
