diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-06-13 12:37:22 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2019-07-06 13:02:22 -0400 |
| commit | ef7ff058eb67d73ebeac7b125ab2a7806e14629c (patch) | |
| tree | 9a09162961f7bcdb6d16837adacabb99f10b4410 /test/sql/test_selectable.py | |
| parent | 1ce98ca83a4b2da12e52aa0f4ab181c83063abc2 (diff) | |
| download | sqlalchemy-ef7ff058eb67d73ebeac7b125ab2a7806e14629c.tar.gz | |
SelectBase no longer a FromClause
As part of the SQLAlchemy 2.0 migration project, a conceptual change has
been made to the role of the :class:`.SelectBase` class hierarchy,
which is the root of all "SELECT" statement constructs, in that they no
longer serve directly as FROM clauses, that is, they no longer subclass
:class:`.FromClause`. For end users, the change mostly means that any
placement of a :func:`.select` construct in the FROM clause of another
:func:`.select` requires first that it be wrapped in a subquery first,
which historically is through the use of the :meth:`.SelectBase.alias`
method, and is now also available through the use of
:meth:`.SelectBase.subquery`. This was usually a requirement in any
case since several databases don't accept unnamed SELECT subqueries
in their FROM clause in any case.
See the documentation in this change for lots more detail.
Fixes: #4617
Change-Id: I0f6174ee24b9a1a4529168e52e855e12abd60667
Diffstat (limited to 'test/sql/test_selectable.py')
| -rw-r--r-- | test/sql/test_selectable.py | 628 |
1 files changed, 439 insertions, 189 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py index 230b5423b..485a0e428 100644 --- a/test/sql/test_selectable.py +++ b/test/sql/test_selectable.py @@ -29,6 +29,7 @@ from sqlalchemy import TypeDecorator from sqlalchemy import union from sqlalchemy import util from sqlalchemy.sql import Alias +from sqlalchemy.sql import base from sqlalchemy.sql import column from sqlalchemy.sql import elements from sqlalchemy.sql import table @@ -88,7 +89,7 @@ class SelectableTest( table1.c.col1, table1.c.col1.label("c1"), ] - ) + ).subquery() # this tests the same thing as # test_direct_correspondence_on_labels below - @@ -98,13 +99,39 @@ class SelectableTest( assert s.corresponding_column(s.c.col1) is s.c.col1 assert s.corresponding_column(s.c.c1) is s.c.c1 - def test_labeled_subquery_twice(self): + def test_labeled_select_twice(self): scalar_select = select([table1.c.col1]).label("foo") s1 = select([scalar_select]) s2 = select([scalar_select, scalar_select]) eq_( + s1.selected_columns.foo.proxy_set, + set( + [s1.selected_columns.foo, scalar_select, scalar_select.element] + ), + ) + eq_( + s2.selected_columns.foo.proxy_set, + set( + [s2.selected_columns.foo, scalar_select, scalar_select.element] + ), + ) + + assert ( + s1.corresponding_column(scalar_select) is s1.selected_columns.foo + ) + assert ( + s2.corresponding_column(scalar_select) is s2.selected_columns.foo + ) + + def test_labeled_subquery_twice(self): + scalar_select = select([table1.c.col1]).label("foo") + + s1 = select([scalar_select]).subquery() + s2 = select([scalar_select, scalar_select]).subquery() + + eq_( s1.c.foo.proxy_set, set([s1.c.foo, scalar_select, scalar_select.element]), ) @@ -116,12 +143,21 @@ class SelectableTest( assert s1.corresponding_column(scalar_select) is s1.c.foo assert s2.corresponding_column(scalar_select) is s2.c.foo - def test_label_grouped_still_corresponds(self): + def test_select_label_grouped_still_corresponds(self): label = select([table1.c.col1]).label("foo") label2 = label.self_group() s1 = select([label]) s2 = select([label2]) + assert s1.corresponding_column(label) is s1.selected_columns.foo + assert s2.corresponding_column(label) is s2.selected_columns.foo + + def test_subquery_label_grouped_still_corresponds(self): + label = select([table1.c.col1]).label("foo") + label2 = label.self_group() + + s1 = select([label]).subquery() + s2 = select([label2]).subquery() assert s1.corresponding_column(label) is s1.c.foo assert s2.corresponding_column(label) is s2.c.foo @@ -144,13 +180,22 @@ class SelectableTest( def test_keyed_gen(self): s = select([keyed]) - eq_(s.c.colx.key, "colx") + eq_(s.selected_columns.colx.key, "colx") - eq_(s.c.colx.name, "x") + eq_(s.selected_columns.colx.name, "x") - assert s.corresponding_column(keyed.c.colx) is s.c.colx - assert s.corresponding_column(keyed.c.coly) is s.c.coly - assert s.corresponding_column(keyed.c.z) is s.c.z + assert ( + s.selected_columns.corresponding_column(keyed.c.colx) + is s.selected_columns.colx + ) + assert ( + s.selected_columns.corresponding_column(keyed.c.coly) + is s.selected_columns.coly + ) + assert ( + s.selected_columns.corresponding_column(keyed.c.z) + is s.selected_columns.z + ) sel2 = s.alias() assert sel2.corresponding_column(keyed.c.colx) is sel2.c.colx @@ -160,9 +205,18 @@ class SelectableTest( def test_keyed_label_gen(self): s = select([keyed]).apply_labels() - assert s.corresponding_column(keyed.c.colx) is s.c.keyed_colx - assert s.corresponding_column(keyed.c.coly) is s.c.keyed_coly - assert s.corresponding_column(keyed.c.z) is s.c.keyed_z + assert ( + s.selected_columns.corresponding_column(keyed.c.colx) + is s.selected_columns.keyed_colx + ) + assert ( + s.selected_columns.corresponding_column(keyed.c.coly) + is s.selected_columns.keyed_coly + ) + assert ( + s.selected_columns.corresponding_column(keyed.c.z) + is s.selected_columns.keyed_z + ) sel2 = s.alias() assert sel2.corresponding_column(keyed.c.colx) is sel2.c.keyed_colx @@ -184,6 +238,9 @@ class SelectableTest( c = Column("foo", Integer, key="bar") t = Table("t", MetaData(), c) s = select([t])._clone() + assert c in s.selected_columns.bar.proxy_set + + s = select([t]).subquery()._clone() assert c in s.c.bar.proxy_set def test_clone_c_proxy_key_lower(self): @@ -191,6 +248,9 @@ class SelectableTest( c.key = "bar" t = table("t", c) s = select([t])._clone() + assert c in s.selected_columns.bar.proxy_set + + s = select([t]).subquery()._clone() assert c in s.c.bar.proxy_set def test_no_error_on_unsupported_expr_key(self): @@ -204,6 +264,13 @@ class SelectableTest( expr = BinaryExpression(t.c.x, t.c.y, myop) s = select([t, expr]) + + # anon_label, e.g. a truncated_label, is used here becuase + # the expr has no name, no key, and myop() can't create a + # string, so this is the last resort + eq_(s.selected_columns.keys(), ["x", "y", expr.anon_label]) + + s = select([t, expr]).subquery() eq_(s.c.keys(), ["x", "y", expr.anon_label]) def test_cloned_intersection(self): @@ -219,10 +286,7 @@ class SelectableTest( s2c1 = s2._clone() s3c1 = s3._clone() - eq_( - elements._cloned_intersection([s1c1, s3c1], [s2c1, s1c2]), - set([s1c1]), - ) + eq_(base._cloned_intersection([s1c1, s3c1], [s2c1, s1c2]), set([s1c1])) def test_cloned_difference(self): t1 = table("t1", column("x")) @@ -238,27 +302,36 @@ class SelectableTest( s3c1 = s3._clone() eq_( - elements._cloned_difference([s1c1, s2c1, s3c1], [s2c1, s1c2]), + base._cloned_difference([s1c1, s2c1, s3c1], [s2c1, s1c2]), set([s3c1]), ) def test_distance_on_aliases(self): a1 = table1.alias("a1") for s in ( - select([a1, table1], use_labels=True), - select([table1, a1], use_labels=True), + select([a1, table1], use_labels=True).subquery(), + select([table1, a1], use_labels=True).subquery(), ): assert s.corresponding_column(table1.c.col1) is s.c.table1_col1 assert s.corresponding_column(a1.c.col1) is s.c.a1_col1 def test_join_against_self(self): - jj = select([table1.c.col1.label("bar_col1")]) + jj = select([table1.c.col1.label("bar_col1")]).subquery() jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1) # test column directly against itself + # joins necessarily have to prefix column names with the name + # of the selectable, else the same-named columns will overwrite + # one another. In this case, we unfortunately have this unfriendly + # "anonymous" name, whereas before when select() could be a FROM + # the "bar_col1" label would be directly in the join() object. However + # this was a useless join() object because PG and MySQL don't accept + # unnamed subqueries in joins in any case. + name = "%s_bar_col1" % (jj.name,) + assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1 - assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1 + assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c[name] # test alias of the join @@ -267,13 +340,15 @@ class SelectableTest( def test_clone_append_column(self): sel = select([literal_column("1").label("a")]) - eq_(list(sel.c.keys()), ["a"]) + eq_(list(sel.selected_columns.keys()), ["a"]) cloned = visitors.ReplacingCloningVisitor().traverse(sel) cloned.append_column(literal_column("2").label("b")) cloned.append_column(func.foo()) - eq_(list(cloned.c.keys()), ["a", "b", "foo()"]) + eq_(list(cloned.selected_columns.keys()), ["a", "b", "foo()"]) - def test_append_column_after_replace_selectable(self): + def test_append_column_after_visitor_replace(self): + # test for a supported idiom that matches the deprecated / removed + # replace_selectable method basesel = select([literal_column("1").label("a")]) tojoin = select( [literal_column("1").label("a"), literal_column("2").label("b")] @@ -281,9 +356,14 @@ class SelectableTest( basefrom = basesel.alias("basefrom") joinfrom = tojoin.alias("joinfrom") sel = select([basefrom.c.a]) - replaced = sel.replace_selectable( - basefrom, basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a) - ) + + replace_from = basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a) + + def replace(elem): + if elem is basefrom: + return replace_from + + replaced = visitors.replacement_traverse(sel, {}, replace) self.assert_compile( replaced, "SELECT basefrom.a FROM (SELECT 1 AS a) AS basefrom " @@ -302,7 +382,7 @@ class SelectableTest( # test that corresponding column digs across # clone boundaries with anonymous labeled elements col = func.count().label("foo") - sel = select([col]) + sel = select([col]).subquery() sel2 = visitors.ReplacingCloningVisitor().traverse(sel) assert sel2.corresponding_column(col) is sel2.c.foo @@ -323,13 +403,16 @@ class SelectableTest( impl = Integer stmt = select([type_coerce(column("x"), MyType).label("foo")]) - stmt2 = stmt.select() + subq = stmt.subquery() + stmt2 = subq.select() + subq2 = stmt2.subquery() assert isinstance(stmt._raw_columns[0].type, MyType) - assert isinstance(stmt.c.foo.type, MyType) - assert isinstance(stmt2.c.foo.type, MyType) + assert isinstance(subq.c.foo.type, MyType) + assert isinstance(stmt2.selected_columns.foo.type, MyType) + assert isinstance(subq2.c.foo.type, MyType) - def test_select_on_table(self): - sel = select([table1, table2], use_labels=True) + def test_subquery_on_table(self): + sel = select([table1, table2], use_labels=True).subquery() assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1 assert ( @@ -383,7 +466,7 @@ class SelectableTest( "AS a", ) - def test_union(self): + def test_union_correspondence(self): # tests that we can correspond a column in a Select statement # with a certain Table, against a column in a Union where one of @@ -411,8 +494,34 @@ class SelectableTest( s1 = table1.select(use_labels=True) s2 = table2.select(use_labels=True) - assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 - assert u.corresponding_column(s2.c.table2_col2) is u.c.col2 + assert ( + u.corresponding_column(s1.selected_columns.table1_col2) + is u.selected_columns.col2 + ) + + # right now, the "selected_columns" of a union are those of the + # first selectable. so without using a subquery that represents + # all the SELECTs in the union, we can't do corresponding column + # like this. perhaps compoundselect shouldn't even implement + # .corresponding_column directly + assert ( + u.corresponding_column(s2.selected_columns.table2_col2) is None + ) # really? u.selected_columns.col2 + + usub = u.subquery() + assert ( + usub.corresponding_column(s1.selected_columns.table1_col2) + is usub.c.col2 + ) + assert ( + usub.corresponding_column(s2.selected_columns.table2_col2) + is usub.c.col2 + ) + + s1sub = s1.subquery() + s2sub = s2.subquery() + assert usub.corresponding_column(s1sub.c.table1_col2) is usub.c.col2 + assert usub.corresponding_column(s2sub.c.table2_col2) is usub.c.col2 def test_union_precedence(self): # conflicting column correspondence should be resolved based on @@ -423,11 +532,11 @@ class SelectableTest( s3 = select([table1.c.col3, table1.c.colx]) s4 = select([table1.c.colx, table1.c.col3]) - u1 = union(s1, s2) + u1 = union(s1, s2).subquery() assert u1.corresponding_column(table1.c.col1) is u1.c.col1 assert u1.corresponding_column(table1.c.col2) is u1.c.col2 - u1 = union(s1, s2, s3, s4) + u1 = union(s1, s2, s3, s4).subquery() assert u1.corresponding_column(table1.c.col1) is u1.c.col1 assert u1.corresponding_column(table1.c.col2) is u1.c.col2 assert u1.corresponding_column(table1.c.colx) is u1.c.col2 @@ -437,12 +546,12 @@ class SelectableTest( s1 = select([table1.c.col1, table1.c.col2]) s2 = select([table1.c.col2, table1.c.col1]) - for c in s1.c: + for c in s1.selected_columns: c.proxy_set - for c in s2.c: + for c in s2.selected_columns: c.proxy_set - u1 = union(s1, s2) + u1 = union(s1, s2).subquery() assert u1.corresponding_column(table1.c.col2) is u1.c.col2 def test_singular_union(self): @@ -451,9 +560,9 @@ class SelectableTest( select([table1.c.col1, table1.c.col2, table1.c.col3]), ) u = union(select([table1.c.col1, table1.c.col2, table1.c.col3])) - assert u.c.col1 is not None - assert u.c.col2 is not None - assert u.c.col3 is not None + assert u.selected_columns.col1 is not None + assert u.selected_columns.col2 is not None + assert u.selected_columns.col3 is not None def test_alias_union(self): @@ -482,8 +591,8 @@ class SelectableTest( ) .alias("analias") ) - s1 = table1.select(use_labels=True) - s2 = table2.select(use_labels=True) + s1 = table1.select(use_labels=True).subquery() + s2 = table2.select(use_labels=True).subquery() assert u.corresponding_column(s1.c.table1_col2) is u.c.col2 assert u.corresponding_column(s2.c.table2_col2) is u.c.col2 assert u.corresponding_column(s2.c.table2_coly) is u.c.coly @@ -493,13 +602,15 @@ class SelectableTest( s1 = select([table1.c.col1, table1.c.col2]) s2 = select([table1.c.col1, table1.c.col2]).alias() - u1 = union(s1, s2) - assert u1.corresponding_column(s1.c.col1) is u1.c.col1 - assert u1.corresponding_column(s2.c.col1) is u1.c.col1 - - u2 = union(s2, s1) - assert u2.corresponding_column(s1.c.col1) is u2.c.col1 - assert u2.corresponding_column(s2.c.col1) is u2.c.col1 + # previously this worked + assert_raises_message( + exc.ArgumentError, + "SELECT construct for inclusion in a UNION or " + "other set construct expected", + union, + s1, + s2, + ) def test_union_of_text(self): s1 = select([table1.c.col1, table1.c.col2]) @@ -507,77 +618,130 @@ class SelectableTest( column("col1"), column("col2") ) - u1 = union(s1, s2) - assert u1.corresponding_column(s1.c.col1) is u1.c.col1 - assert u1.corresponding_column(s2.c.col1) is u1.c.col1 + u1 = union(s1, s2).subquery() + assert u1.corresponding_column(s1.selected_columns.col1) is u1.c.col1 + assert u1.corresponding_column(s2.selected_columns.col1) is u1.c.col1 + + u2 = union(s2, s1).subquery() + assert u2.corresponding_column(s1.selected_columns.col1) is u2.c.col1 + assert u2.corresponding_column(s2.selected_columns.col1) is u2.c.col1 + + def test_foo(self): + s1 = select([table1.c.col1, table1.c.col2]) + s2 = select([table1.c.col2, table1.c.col1]) + + u1 = union(s1, s2).subquery() + assert u1.corresponding_column(table1.c.col2) is u1.c.col2 + + metadata = MetaData() + table1_new = Table( + "table1", + metadata, + Column("col1", Integer, primary_key=True), + Column("col2", String(20)), + Column("col3", Integer), + Column("colx", Integer), + ) + # table1_new = table1 + + s1 = select([table1_new.c.col1, table1_new.c.col2]) + s2 = select([table1_new.c.col2, table1_new.c.col1]) + u1 = union(s1, s2).subquery() - u2 = union(s2, s1) - assert u2.corresponding_column(s1.c.col1) is u2.c.col1 - assert u2.corresponding_column(s2.c.col1) is u2.c.col1 + # TODO: failing due to proxy_set not correct + assert u1.corresponding_column(table1_new.c.col2) is u1.c.col2 - @testing.emits_warning("Column 'col1'") - def test_union_dupe_keys(self): + def test_union_alias_dupe_keys(self): s1 = select([table1.c.col1, table1.c.col2, table2.c.col1]) s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]) - u1 = union(s1, s2) + u1 = union(s1, s2).subquery() + + with testing.expect_warnings("Column 'col1'"): + u1.c assert ( - u1.corresponding_column(s1.c._all_columns[0]) + u1.corresponding_column(s1.selected_columns._all_columns[0]) is u1.c._all_columns[0] ) - assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] - assert u1.corresponding_column(s1.c.col2) is u1.c.col2 - assert u1.corresponding_column(s2.c.col2) is u1.c.col2 - assert u1.corresponding_column(s2.c.col3) is u1.c._all_columns[2] + # due to the duplicate key, "col1" is now the column at the end + # of the list and the first column is not accessible by key + assert u1.c.col1 is u1.c._all_columns[2] + # table2.c.col1 is in two positions in this union, so...currently + # it is the replaced one at position 2. assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[2] - assert u1.corresponding_column(table2.c.col3) is u1.c._all_columns[2] - @testing.emits_warning("Column 'col1'") - def test_union_alias_dupe_keys(self): - s1 = select([table1.c.col1, table1.c.col2, table2.c.col1]).alias() - s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]) - u1 = union(s1, s2) + # this is table2.c.col1 in both cases, so this is "right" + assert u1.corresponding_column(s2.selected_columns.col1) is u1.c.col1 + # same + assert u1.corresponding_column(s2.subquery().c.col1) is u1.c.col1 + + # col2 is working OK + assert u1.corresponding_column(s1.selected_columns.col2) is u1.c.col2 assert ( - u1.corresponding_column(s1.c._all_columns[0]) - is u1.c._all_columns[0] + u1.corresponding_column(s1.selected_columns.col2) + is u1.c._all_columns[1] ) - assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] - assert u1.corresponding_column(s1.c.col2) is u1.c.col2 - assert u1.corresponding_column(s2.c.col2) is u1.c.col2 + assert u1.corresponding_column(s2.selected_columns.col2) is u1.c.col2 + assert ( + u1.corresponding_column(s2.selected_columns.col2) + is u1.c._all_columns[1] + ) + assert u1.corresponding_column(s2.subquery().c.col2) is u1.c.col2 - assert u1.corresponding_column(s2.c.col3) is u1.c._all_columns[2] + # col3 is also "correct" , though confusing + assert u1.corresponding_column(s2.selected_columns.col3) is u1.c.col1 - # this differs from the non-alias test because table2.c.col1 is - # more directly at s2.c.col1 than it is s1.c.col1. - assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[0] + assert u1.corresponding_column(table1.c.col1) is u1.c._all_columns[0] + assert u1.corresponding_column(table1.c.col2) is u1.c._all_columns[1] + assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[2] + assert u1.corresponding_column(table2.c.col2) is u1.c._all_columns[1] assert u1.corresponding_column(table2.c.col3) is u1.c._all_columns[2] - @testing.emits_warning("Column 'col1'") def test_union_alias_dupe_keys_grouped(self): - s1 = ( - select([table1.c.col1, table1.c.col2, table2.c.col1]) - .limit(1) - .alias() - ) + s1 = select([table1.c.col1, table1.c.col2, table2.c.col1]).limit(1) s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]).limit(1) - u1 = union(s1, s2) + u1 = union(s1, s2).subquery() + + with testing.expect_warnings("Column 'col1'"): + u1.c + + # due to the duplicate key, "col1" is now the column at the end + # of the list and the first column is not accessible by key + assert u1.c.col1 is u1.c._all_columns[2] + + # table2.c.col1 is in two positions in this union, so...currently + # it is the replaced one at position 2. + assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[2] + + # this is table2.c.col1 in both cases, so this is "right" + assert u1.corresponding_column(s2.selected_columns.col1) is u1.c.col1 + # same + assert u1.corresponding_column(s2.subquery().c.col1) is u1.c.col1 + + # col2 is working OK + assert u1.corresponding_column(s1.selected_columns.col2) is u1.c.col2 assert ( - u1.corresponding_column(s1.c._all_columns[0]) - is u1.c._all_columns[0] + u1.corresponding_column(s1.selected_columns.col2) + is u1.c._all_columns[1] + ) + assert u1.corresponding_column(s2.selected_columns.col2) is u1.c.col2 + assert ( + u1.corresponding_column(s2.selected_columns.col2) + is u1.c._all_columns[1] ) - assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0] - assert u1.corresponding_column(s1.c.col2) is u1.c.col2 - assert u1.corresponding_column(s2.c.col2) is u1.c.col2 + assert u1.corresponding_column(s2.subquery().c.col2) is u1.c.col2 - assert u1.corresponding_column(s2.c.col3) is u1.c._all_columns[2] + # col3 is also "correct" , though confusing + assert u1.corresponding_column(s2.selected_columns.col3) is u1.c.col1 - # this differs from the non-alias test because table2.c.col1 is - # more directly at s2.c.col1 than it is s1.c.col1. - assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[0] + assert u1.corresponding_column(table1.c.col1) is u1.c._all_columns[0] + assert u1.corresponding_column(table1.c.col2) is u1.c._all_columns[1] + assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[2] + assert u1.corresponding_column(table2.c.col2) is u1.c._all_columns[1] assert u1.corresponding_column(table2.c.col3) is u1.c._all_columns[2] def test_select_union(self): @@ -607,9 +771,9 @@ class SelectableTest( ) .alias("analias") ) - s = select([u]) - s1 = table1.select(use_labels=True) - s2 = table2.select(use_labels=True) + s = select([u]).subquery() + s1 = table1.select(use_labels=True).subquery() + s2 = table2.select(use_labels=True).subquery() assert s.corresponding_column(s1.c.table1_col2) is s.c.col2 assert s.corresponding_column(s2.c.table2_col2) is s.c.col2 @@ -653,15 +817,15 @@ class SelectableTest( criterion = a.c.table1_col1 == b.c.col2 self.assert_(criterion.compare(j.onclause)) - def test_select_alias(self): + def test_select_subquery_join(self): a = table1.select().alias("a") j = join(a, table2) criterion = a.c.col1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) - def test_select_labels(self): - a = table1.select(use_labels=True) + def test_subquery_labels_join(self): + a = table1.select(use_labels=True).subquery() j = join(a, table2) criterion = a.c.table1_col1 == table2.c.col2 @@ -683,18 +847,27 @@ class SelectableTest( table1.c.col2.label("acol2"), table1.c.col3.label("acol3"), ] - ) + ).subquery() j = join(a, table2) criterion = a.c.acol1 == table2.c.col2 self.assert_(criterion.compare(j.onclause)) - def test_labeled_select_correspoinding(self): + def test_labeled_select_corresponding(self): l1 = select([func.max(table1.c.col1)]).label("foo") s = select([l1]) - eq_(s.corresponding_column(l1), s.c.foo) + eq_(s.corresponding_column(l1), s.selected_columns.foo) s = select([table1.c.col1, l1]) + eq_(s.corresponding_column(l1), s.selected_columns.foo) + + def test_labeled_subquery_corresponding(self): + l1 = select([func.max(table1.c.col1)]).label("foo") + s = select([l1]).subquery() + + eq_(s.corresponding_column(l1), s.c.foo) + + s = select([table1.c.col1, l1]).subquery() eq_(s.corresponding_column(l1), s.c.foo) def test_select_alias_labels(self): @@ -724,7 +897,7 @@ class SelectableTest( t2 = Table("t2", m, Column("id", Integer, ForeignKey("t1.id"))) t3 = Table("t3", m2, Column("id", Integer, ForeignKey("t1.id2"))) - s = select([t2, t3], use_labels=True) + s = select([t2, t3], use_labels=True).subquery() assert_raises(exc.NoReferencedTableError, s.join, t1) @@ -732,19 +905,19 @@ class SelectableTest( # See [ticket:2167] for this one. l1 = table1.c.col1.label("a") l2 = select([l1]).label("b") - s = select([l2]) + s = select([l2]).subquery() assert s.c.b is not None self.assert_compile( s.select(), - "SELECT b FROM " - "(SELECT (SELECT table1.col1 AS a FROM table1) AS b)", + "SELECT anon_1.b FROM " + "(SELECT (SELECT table1.col1 AS a FROM table1) AS b) AS anon_1", ) - s2 = select([s.label("c")]) + s2 = select([s.element.label("c")]).subquery() self.assert_compile( s2.select(), - "SELECT c FROM (SELECT (SELECT (" - "SELECT table1.col1 AS a FROM table1) AS b) AS c)", + "SELECT anon_1.c FROM (SELECT (SELECT (" + "SELECT table1.col1 AS a FROM table1) AS b) AS c) AS anon_1", ) def test_self_referential_select_raises(self): @@ -752,7 +925,8 @@ class SelectableTest( s = select([t]) - s.append_whereclause(s.c.x > 5) + with testing.expect_deprecated("The SelectBase.c"): + s.append_whereclause(s.c.x > 5) assert_raises_message( exc.InvalidRequestError, r"select\(\) construct refers to itself as a FROM", @@ -762,7 +936,7 @@ class SelectableTest( def test_unusual_column_elements_text(self): """test that .c excludes text().""" - s = select([table1.c.col1, text("foo")]) + s = select([table1.c.col1, text("foo")]).subquery() eq_(list(s.c), [s.c.col1]) def test_unusual_column_elements_clauselist(self): @@ -770,14 +944,16 @@ class SelectableTest( from sqlalchemy.sql.expression import ClauseList - s = select([table1.c.col1, ClauseList(table1.c.col2, table1.c.col3)]) + s = select( + [table1.c.col1, ClauseList(table1.c.col2, table1.c.col3)] + ).subquery() eq_(list(s.c), [s.c.col1, s.c.col2, s.c.col3]) def test_unusual_column_elements_boolean_clauselist(self): """test that BooleanClauseList is placed as single element in .c.""" c2 = and_(table1.c.col2 == 5, table1.c.col3 == 4) - s = select([table1.c.col1, c2]) + s = select([table1.c.col1, c2]).subquery() eq_(list(s.c), [s.c.col1, s.corresponding_column(c2)]) def test_from_list_deferred_constructor(self): @@ -936,11 +1112,11 @@ class RefreshForNewColTest(fixtures.TestBase): a = table("a", column("x")) b = table("b", column("y")) s = select([a, b]).apply_labels() - s.c + s.selected_columns q = column("x") b.append_column(q) s._refresh_for_new_column(q) - assert q in s.c.b_x.proxy_set + assert q in s.selected_columns.b_x.proxy_set def test_alias_alias_samename_init(self): a = table("a", column("x")) @@ -954,8 +1130,11 @@ class RefreshForNewColTest(fixtures.TestBase): q = column("x") b.append_column(q) + assert "_columns" in s2.__dict__ + s2._refresh_for_new_column(q) + assert "_columns" not in s2.__dict__ is_(s1.corresponding_column(s2.c.b_x), s1.c.b_x) def test_aliased_select_samename_uninit(self): @@ -1005,23 +1184,18 @@ class RefreshForNewColTest(fixtures.TestBase): q = column("q") a.append_column(q) s3._refresh_for_new_column(q) - assert a.c.q in s3.c.q.proxy_set + assert a.c.q in s3.selected_columns.q.proxy_set - def test_union_init_raises(self): + def test_union_init(self): a = table("a", column("x")) s1 = select([a]) s2 = select([a]) s3 = s1.union(s2) - s3.c + s3.selected_columns q = column("q") a.append_column(q) - assert_raises_message( - NotImplementedError, - "CompoundSelect constructs don't support addition of " - "columns to underlying selectables", - s3._refresh_for_new_column, - q, - ) + s3._refresh_for_new_column(q) + assert a.c.q in s3.selected_columns.q.proxy_set def test_nested_join_uninit(self): a = table("a", column("x")) @@ -1195,7 +1369,7 @@ class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL): class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" - def test_join_condition(self): + def test_join_condition_one(self): m = MetaData() t1 = Table("t1", m, Column("id", Integer)) t2 = Table( @@ -1211,13 +1385,6 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): t4 = Table( "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id")) ) - t5 = Table( - "t5", - m, - Column("t1id1", ForeignKey("t1.id")), - Column("t1id2", ForeignKey("t1.id")), - ) - t1t2 = t1.join(t2) t2t3 = t2.join(t3) @@ -1235,6 +1402,32 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): sql_util.join_condition(left, right, a_subset=a_subset) ) + def test_join_condition_two(self): + m = MetaData() + t1 = Table("t1", m, Column("id", Integer)) + t2 = Table( + "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id")) + ) + t3 = Table( + "t3", + m, + Column("id", Integer), + Column("t1id", ForeignKey("t1.id")), + Column("t2id", ForeignKey("t2.id")), + ) + t4 = Table( + "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id")) + ) + t5 = Table( + "t5", + m, + Column("t1id1", ForeignKey("t1.id")), + Column("t1id2", ForeignKey("t1.id")), + ) + + t1t2 = t1.join(t2) + t2t3 = t2.join(t3) + # these are ambiguous, or have no joins for left, right, a_subset in [ (t1t2, t3, None), @@ -1242,7 +1435,7 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): (t1, t4, None), (t1t2, t2t3, None), (t5, t1, None), - (t5.select(use_labels=True), t1, None), + (t5.select(use_labels=True).subquery(), t1, None), ]: assert_raises( exc.ArgumentError, @@ -1252,6 +1445,24 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): a_subset=a_subset, ) + def test_join_condition_three(self): + m = MetaData() + t1 = Table("t1", m, Column("id", Integer)) + t2 = Table( + "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id")) + ) + t3 = Table( + "t3", + m, + Column("id", Integer), + Column("t1id", ForeignKey("t1.id")), + Column("t2id", ForeignKey("t2.id")), + ) + t4 = Table( + "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id")) + ) + t1t2 = t1.join(t2) + t2t3 = t2.join(t3) als = t2t3.alias() # test join's behavior, including natural for left, right, expected in [ @@ -1266,6 +1477,22 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): ]: assert expected.compare(left.join(right).onclause) + def test_join_condition_four(self): + m = MetaData() + t1 = Table("t1", m, Column("id", Integer)) + t2 = Table( + "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id")) + ) + t3 = Table( + "t3", + m, + Column("id", Integer), + Column("t1id", ForeignKey("t1.id")), + Column("t2id", ForeignKey("t2.id")), + ) + t1t2 = t1.join(t2) + t2t3 = t2.join(t3) + # these are right-nested joins j = t1t2.join(t2t3) assert j.onclause.compare(t2.c.id == t3.c.t2id) @@ -1275,7 +1502,23 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): "(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id", ) - st2t3 = t2t3.select(use_labels=True) + def test_join_condition_five(self): + m = MetaData() + t1 = Table("t1", m, Column("id", Integer)) + t2 = Table( + "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id")) + ) + t3 = Table( + "t3", + m, + Column("id", Integer), + Column("t1id", ForeignKey("t1.id")), + Column("t2id", ForeignKey("t2.id")), + ) + t1t2 = t1.join(t2) + t2t3 = t2.join(t3) + + st2t3 = t2t3.select(use_labels=True).subquery() j = t1t2.join(st2t3) assert j.onclause.compare(t2.c.id == st2t3.c.t3_t2id) self.assert_compile( @@ -1283,7 +1526,8 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL): "t1 JOIN t2 ON t1.id = t2.t1id JOIN " "(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, " "t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id " - "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id", + "FROM t2 JOIN t3 ON t2.id = t3.t2id) AS anon_1 " + "ON t2.id = anon_1.t3_t2id", ) def test_join_multiple_equiv_fks(self): @@ -1582,8 +1826,10 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): Column("manager_id", Integer, primary_key=True), Column("manager_name", String(50)), ) - s = select([engineers, managers]).where( - engineers.c.engineer_name == managers.c.manager_name + s = ( + select([engineers, managers]) + .where(engineers.c.engineer_name == managers.c.manager_name) + .subquery() ) eq_( util.column_set(sql_util.reduce_columns(list(s.c), s)), @@ -1629,7 +1875,16 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): ) s1 = select([t1, t2]) s1 = s1.reduce_columns(only_synonyms=True) - eq_(set(s1.c), set([s1.c.x, s1.c.y, s1.c.q])) + eq_( + set(s1.selected_columns), + set( + [ + s1.selected_columns.x, + s1.selected_columns.y, + s1.selected_columns.q, + ] + ), + ) def test_reduce_only_synonym_lineage(self): m = MetaData() @@ -1642,7 +1897,7 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): ) # test that the first appearance in the columns clause # wins - t1 is first, t1.c.x wins - s1 = select([t1]) + s1 = select([t1]).subquery() s2 = select([t1, s1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z) eq_( set(s2.reduce_columns().inner_columns), @@ -1650,7 +1905,7 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults): ) # reverse order, s1.c.x wins - s1 = select([t1]) + s1 = select([t1]).subquery() s2 = select([s1, t1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z) eq_( set(s2.reduce_columns().inner_columns), @@ -1939,7 +2194,7 @@ class AnnotationsTest(fixtures.TestBase): assert (t.c.x == 5).compare(x_a == 5) assert not (t.c.y == 5).compare(x_a == 5) - s = select([t]) + s = select([t]).subquery() x_p = s.c.x assert not x_a.compare(x_p) assert not t.c.x.compare(x_p) @@ -2025,12 +2280,12 @@ class AnnotationsTest(fixtures.TestBase): _constructor = Column t1 = Table("t1", MetaData(), MyColumn()) - s1 = t1.select() + s1 = t1.select().subquery() assert isinstance(t1.c.foo, MyColumn) assert isinstance(s1.c.foo, Column) annot_1 = t1.c.foo._annotate({}) - s2 = select([annot_1]) + s2 = select([annot_1]).subquery() assert isinstance(s2.c.foo, Column) annot_2 = s1._annotate({}) assert isinstance(annot_2.c.foo, Column) @@ -2064,7 +2319,7 @@ class AnnotationsTest(fixtures.TestBase): def test_annotated_corresponding_column(self): table1 = table("table1", column("col1")) - s1 = select([table1.c.col1]) + s1 = select([table1.c.col1]).subquery() t1 = s1._annotate({}) t2 = s1 @@ -2075,7 +2330,7 @@ class AnnotationsTest(fixtures.TestBase): assert t1.c is t2.c assert t1.c.col1 is t2.c.col1 - inner = select([s1]) + inner = select([s1]).subquery() assert ( inner.corresponding_column(t2.c.col1, require_embedded=False) @@ -2278,7 +2533,7 @@ class AnnotationsTest(fixtures.TestBase): a1 = table1.alias() s = select([a1.c.x]).select_from(a1.join(table2, a1.c.x == table2.c.y)) - assert_s = select([select([s])]) + assert_s = select([select([s.subquery()]).subquery()]) for fn in ( sql_util._deep_deannotate, lambda s: sql_util._deep_annotate(s, {"foo": "bar"}), @@ -2286,7 +2541,7 @@ class AnnotationsTest(fixtures.TestBase): lambda s: visitors.replacement_traverse(s, {}, lambda x: None), ): - sel = fn(select([fn(select([fn(s)]))])) + sel = fn(select([fn(select([fn(s.subquery())]).subquery())])) eq_(str(assert_s), str(sel)) def test_bind_unique_test(self): @@ -2359,7 +2614,7 @@ class WithLabelsTest(fixtures.TestBase): assert_raises_message( exc.SAWarning, r"replaced by Column.*, which has the same key", - lambda: s.c, + lambda: s.subquery().c, ) def _assert_result_keys(self, s, keys): @@ -2367,7 +2622,7 @@ class WithLabelsTest(fixtures.TestBase): eq_(set(compiled._create_result_map()), set(keys)) def _assert_subq_result_keys(self, s, keys): - compiled = s.select().compile() + compiled = s.subquery().select().compile() eq_(set(compiled._create_result_map()), set(keys)) def _names_overlap(self): @@ -2383,7 +2638,8 @@ class WithLabelsTest(fixtures.TestBase): def test_names_overlap_label(self): sel = self._names_overlap().apply_labels() - eq_(list(sel.c.keys()), ["t1_x", "t2_x"]) + eq_(list(sel.selected_columns.keys()), ["t1_x", "t2_x"]) + eq_(list(sel.subquery().c.keys()), ["t1_x", "t2_x"]) self._assert_result_keys(sel, ["t1_x", "t2_x"]) def _names_overlap_keys_dont(self): @@ -2394,12 +2650,14 @@ class WithLabelsTest(fixtures.TestBase): def test_names_overlap_keys_dont_nolabel(self): sel = self._names_overlap_keys_dont() - eq_(list(sel.c.keys()), ["a", "b"]) + eq_(list(sel.selected_columns.keys()), ["a", "b"]) + eq_(list(sel.subquery().c.keys()), ["a", "b"]) self._assert_result_keys(sel, ["x"]) def test_names_overlap_keys_dont_label(self): sel = self._names_overlap_keys_dont().apply_labels() - eq_(list(sel.c.keys()), ["t1_a", "t2_b"]) + eq_(list(sel.selected_columns.keys()), ["t1_a", "t2_b"]) + eq_(list(sel.subquery().c.keys()), ["t1_a", "t2_b"]) self._assert_result_keys(sel, ["t1_x", "t2_x"]) def _labels_overlap(self): @@ -2410,13 +2668,15 @@ class WithLabelsTest(fixtures.TestBase): def test_labels_overlap_nolabel(self): sel = self._labels_overlap() - eq_(list(sel.c.keys()), ["x_id", "id"]) + eq_(list(sel.selected_columns.keys()), ["x_id", "id"]) + eq_(list(sel.subquery().c.keys()), ["x_id", "id"]) self._assert_result_keys(sel, ["x_id", "id"]) def test_labels_overlap_label(self): sel = self._labels_overlap().apply_labels() t2 = sel.froms[1] - eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label]) + eq_(list(sel.selected_columns.keys()), ["t_x_id", t2.c.id.anon_label]) + eq_(list(sel.subquery().c.keys()), ["t_x_id", t2.c.id.anon_label]) self._assert_result_keys(sel, ["t_x_id", "id_1"]) self._assert_subq_result_keys(sel, ["t_x_id", "id_1"]) @@ -2428,12 +2688,14 @@ class WithLabelsTest(fixtures.TestBase): def test_labels_overlap_keylabels_dont_nolabel(self): sel = self._labels_overlap_keylabels_dont() - eq_(list(sel.c.keys()), ["a", "b"]) + eq_(list(sel.selected_columns.keys()), ["a", "b"]) + eq_(list(sel.subquery().c.keys()), ["a", "b"]) self._assert_result_keys(sel, ["x_id", "id"]) def test_labels_overlap_keylabels_dont_label(self): sel = self._labels_overlap_keylabels_dont().apply_labels() - eq_(list(sel.c.keys()), ["t_a", "t_x_b"]) + eq_(list(sel.selected_columns.keys()), ["t_a", "t_x_b"]) + eq_(list(sel.subquery().c.keys()), ["t_a", "t_x_b"]) self._assert_result_keys(sel, ["t_x_id", "id_1"]) def _keylabels_overlap_labels_dont(self): @@ -2444,13 +2706,15 @@ class WithLabelsTest(fixtures.TestBase): def test_keylabels_overlap_labels_dont_nolabel(self): sel = self._keylabels_overlap_labels_dont() - eq_(list(sel.c.keys()), ["x_id", "id"]) + eq_(list(sel.selected_columns.keys()), ["x_id", "id"]) + eq_(list(sel.subquery().c.keys()), ["x_id", "id"]) self._assert_result_keys(sel, ["a", "b"]) def test_keylabels_overlap_labels_dont_label(self): sel = self._keylabels_overlap_labels_dont().apply_labels() t2 = sel.froms[1] - eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label]) + eq_(list(sel.selected_columns.keys()), ["t_x_id", t2.c.id.anon_label]) + eq_(list(sel.subquery().c.keys()), ["t_x_id", t2.c.id.anon_label]) self._assert_result_keys(sel, ["t_a", "t_x_b"]) self._assert_subq_result_keys(sel, ["t_a", "t_x_b"]) @@ -2462,14 +2726,16 @@ class WithLabelsTest(fixtures.TestBase): def test_keylabels_overlap_labels_overlap_nolabel(self): sel = self._keylabels_overlap_labels_overlap() - eq_(list(sel.c.keys()), ["x_a", "a"]) + eq_(list(sel.selected_columns.keys()), ["x_a", "a"]) + eq_(list(sel.subquery().c.keys()), ["x_a", "a"]) self._assert_result_keys(sel, ["x_id", "id"]) self._assert_subq_result_keys(sel, ["x_id", "id"]) def test_keylabels_overlap_labels_overlap_label(self): sel = self._keylabels_overlap_labels_overlap().apply_labels() t2 = sel.froms[1] - eq_(list(sel.c.keys()), ["t_x_a", t2.c.a.anon_label]) + eq_(list(sel.selected_columns.keys()), ["t_x_a", t2.c.a.anon_label]) + eq_(list(sel.subquery().c.keys()), ["t_x_a", t2.c.a.anon_label]) self._assert_result_keys(sel, ["t_x_id", "id_1"]) self._assert_subq_result_keys(sel, ["t_x_id", "id_1"]) @@ -2486,7 +2752,8 @@ class WithLabelsTest(fixtures.TestBase): def test_keys_overlap_names_dont_label(self): sel = self._keys_overlap_names_dont().apply_labels() - eq_(list(sel.c.keys()), ["t1_x", "t2_x"]) + eq_(list(sel.selected_columns.keys()), ["t1_x", "t2_x"]) + eq_(list(sel.subquery().c.keys()), ["t1_x", "t2_x"]) self._assert_result_keys(sel, ["t1_a", "t2_b"]) @@ -2653,38 +2920,24 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL): class AliasTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" - def test_legacy_original_accessor(self): - t = table("t", column("c")) - a1 = t.alias() - a2 = a1.alias() - a3 = a2.alias() - - is_(a1.original, t) - is_(a2.original, t) - is_(a3.original, t) - - def test_wrapped(self): + def test_direct_element_hierarchy(self): t = table("t", column("c")) a1 = t.alias() a2 = a1.alias() a3 = a2.alias() is_(a1.element, t) - is_(a2.element, t) - is_(a3.element, t) - - is_(a3.wrapped, a2) - is_(a2.wrapped, a1) - is_(a1.wrapped, t) + is_(a2.element, a1) + is_(a3.element, a2) - def test_get_children_preserves_wrapped(self): + def test_get_children_preserves_multiple_nesting(self): t = table("t", column("c")) stmt = select([t]) a1 = stmt.alias() a2 = a1.alias() eq_(set(a2.get_children(column_collections=False)), {a1}) - def test_wrapped_correspondence(self): + def test_correspondence_multiple_nesting(self): t = table("t", column("c")) stmt = select([t]) a1 = stmt.alias() @@ -2692,15 +2945,12 @@ class AliasTest(fixtures.TestBase, AssertsCompiledSQL): is_(a1.corresponding_column(a2.c.c), a1.c.c) - def test_copy_internals_preserves_wrapped(self): + def test_copy_internals_multiple_nesting(self): t = table("t", column("c")) stmt = select([t]) a1 = stmt.alias() a2 = a1.alias() - is_(a2.element, a2.wrapped.element) - a3 = a2._clone() a3._copy_internals() is_(a1.corresponding_column(a3.c.c), a1.c.c) - is_(a3.element, a3.wrapped.element) |
