summaryrefslogtreecommitdiff
path: root/test/sql/test_selectable.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2019-06-13 12:37:22 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2019-07-06 13:02:22 -0400
commitef7ff058eb67d73ebeac7b125ab2a7806e14629c (patch)
tree9a09162961f7bcdb6d16837adacabb99f10b4410 /test/sql/test_selectable.py
parent1ce98ca83a4b2da12e52aa0f4ab181c83063abc2 (diff)
downloadsqlalchemy-ef7ff058eb67d73ebeac7b125ab2a7806e14629c.tar.gz
SelectBase no longer a FromClause
As part of the SQLAlchemy 2.0 migration project, a conceptual change has been made to the role of the :class:`.SelectBase` class hierarchy, which is the root of all "SELECT" statement constructs, in that they no longer serve directly as FROM clauses, that is, they no longer subclass :class:`.FromClause`. For end users, the change mostly means that any placement of a :func:`.select` construct in the FROM clause of another :func:`.select` requires first that it be wrapped in a subquery first, which historically is through the use of the :meth:`.SelectBase.alias` method, and is now also available through the use of :meth:`.SelectBase.subquery`. This was usually a requirement in any case since several databases don't accept unnamed SELECT subqueries in their FROM clause in any case. See the documentation in this change for lots more detail. Fixes: #4617 Change-Id: I0f6174ee24b9a1a4529168e52e855e12abd60667
Diffstat (limited to 'test/sql/test_selectable.py')
-rw-r--r--test/sql/test_selectable.py628
1 files changed, 439 insertions, 189 deletions
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index 230b5423b..485a0e428 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -29,6 +29,7 @@ from sqlalchemy import TypeDecorator
from sqlalchemy import union
from sqlalchemy import util
from sqlalchemy.sql import Alias
+from sqlalchemy.sql import base
from sqlalchemy.sql import column
from sqlalchemy.sql import elements
from sqlalchemy.sql import table
@@ -88,7 +89,7 @@ class SelectableTest(
table1.c.col1,
table1.c.col1.label("c1"),
]
- )
+ ).subquery()
# this tests the same thing as
# test_direct_correspondence_on_labels below -
@@ -98,13 +99,39 @@ class SelectableTest(
assert s.corresponding_column(s.c.col1) is s.c.col1
assert s.corresponding_column(s.c.c1) is s.c.c1
- def test_labeled_subquery_twice(self):
+ def test_labeled_select_twice(self):
scalar_select = select([table1.c.col1]).label("foo")
s1 = select([scalar_select])
s2 = select([scalar_select, scalar_select])
eq_(
+ s1.selected_columns.foo.proxy_set,
+ set(
+ [s1.selected_columns.foo, scalar_select, scalar_select.element]
+ ),
+ )
+ eq_(
+ s2.selected_columns.foo.proxy_set,
+ set(
+ [s2.selected_columns.foo, scalar_select, scalar_select.element]
+ ),
+ )
+
+ assert (
+ s1.corresponding_column(scalar_select) is s1.selected_columns.foo
+ )
+ assert (
+ s2.corresponding_column(scalar_select) is s2.selected_columns.foo
+ )
+
+ def test_labeled_subquery_twice(self):
+ scalar_select = select([table1.c.col1]).label("foo")
+
+ s1 = select([scalar_select]).subquery()
+ s2 = select([scalar_select, scalar_select]).subquery()
+
+ eq_(
s1.c.foo.proxy_set,
set([s1.c.foo, scalar_select, scalar_select.element]),
)
@@ -116,12 +143,21 @@ class SelectableTest(
assert s1.corresponding_column(scalar_select) is s1.c.foo
assert s2.corresponding_column(scalar_select) is s2.c.foo
- def test_label_grouped_still_corresponds(self):
+ def test_select_label_grouped_still_corresponds(self):
label = select([table1.c.col1]).label("foo")
label2 = label.self_group()
s1 = select([label])
s2 = select([label2])
+ assert s1.corresponding_column(label) is s1.selected_columns.foo
+ assert s2.corresponding_column(label) is s2.selected_columns.foo
+
+ def test_subquery_label_grouped_still_corresponds(self):
+ label = select([table1.c.col1]).label("foo")
+ label2 = label.self_group()
+
+ s1 = select([label]).subquery()
+ s2 = select([label2]).subquery()
assert s1.corresponding_column(label) is s1.c.foo
assert s2.corresponding_column(label) is s2.c.foo
@@ -144,13 +180,22 @@ class SelectableTest(
def test_keyed_gen(self):
s = select([keyed])
- eq_(s.c.colx.key, "colx")
+ eq_(s.selected_columns.colx.key, "colx")
- eq_(s.c.colx.name, "x")
+ eq_(s.selected_columns.colx.name, "x")
- assert s.corresponding_column(keyed.c.colx) is s.c.colx
- assert s.corresponding_column(keyed.c.coly) is s.c.coly
- assert s.corresponding_column(keyed.c.z) is s.c.z
+ assert (
+ s.selected_columns.corresponding_column(keyed.c.colx)
+ is s.selected_columns.colx
+ )
+ assert (
+ s.selected_columns.corresponding_column(keyed.c.coly)
+ is s.selected_columns.coly
+ )
+ assert (
+ s.selected_columns.corresponding_column(keyed.c.z)
+ is s.selected_columns.z
+ )
sel2 = s.alias()
assert sel2.corresponding_column(keyed.c.colx) is sel2.c.colx
@@ -160,9 +205,18 @@ class SelectableTest(
def test_keyed_label_gen(self):
s = select([keyed]).apply_labels()
- assert s.corresponding_column(keyed.c.colx) is s.c.keyed_colx
- assert s.corresponding_column(keyed.c.coly) is s.c.keyed_coly
- assert s.corresponding_column(keyed.c.z) is s.c.keyed_z
+ assert (
+ s.selected_columns.corresponding_column(keyed.c.colx)
+ is s.selected_columns.keyed_colx
+ )
+ assert (
+ s.selected_columns.corresponding_column(keyed.c.coly)
+ is s.selected_columns.keyed_coly
+ )
+ assert (
+ s.selected_columns.corresponding_column(keyed.c.z)
+ is s.selected_columns.keyed_z
+ )
sel2 = s.alias()
assert sel2.corresponding_column(keyed.c.colx) is sel2.c.keyed_colx
@@ -184,6 +238,9 @@ class SelectableTest(
c = Column("foo", Integer, key="bar")
t = Table("t", MetaData(), c)
s = select([t])._clone()
+ assert c in s.selected_columns.bar.proxy_set
+
+ s = select([t]).subquery()._clone()
assert c in s.c.bar.proxy_set
def test_clone_c_proxy_key_lower(self):
@@ -191,6 +248,9 @@ class SelectableTest(
c.key = "bar"
t = table("t", c)
s = select([t])._clone()
+ assert c in s.selected_columns.bar.proxy_set
+
+ s = select([t]).subquery()._clone()
assert c in s.c.bar.proxy_set
def test_no_error_on_unsupported_expr_key(self):
@@ -204,6 +264,13 @@ class SelectableTest(
expr = BinaryExpression(t.c.x, t.c.y, myop)
s = select([t, expr])
+
+ # anon_label, e.g. a truncated_label, is used here becuase
+ # the expr has no name, no key, and myop() can't create a
+ # string, so this is the last resort
+ eq_(s.selected_columns.keys(), ["x", "y", expr.anon_label])
+
+ s = select([t, expr]).subquery()
eq_(s.c.keys(), ["x", "y", expr.anon_label])
def test_cloned_intersection(self):
@@ -219,10 +286,7 @@ class SelectableTest(
s2c1 = s2._clone()
s3c1 = s3._clone()
- eq_(
- elements._cloned_intersection([s1c1, s3c1], [s2c1, s1c2]),
- set([s1c1]),
- )
+ eq_(base._cloned_intersection([s1c1, s3c1], [s2c1, s1c2]), set([s1c1]))
def test_cloned_difference(self):
t1 = table("t1", column("x"))
@@ -238,27 +302,36 @@ class SelectableTest(
s3c1 = s3._clone()
eq_(
- elements._cloned_difference([s1c1, s2c1, s3c1], [s2c1, s1c2]),
+ base._cloned_difference([s1c1, s2c1, s3c1], [s2c1, s1c2]),
set([s3c1]),
)
def test_distance_on_aliases(self):
a1 = table1.alias("a1")
for s in (
- select([a1, table1], use_labels=True),
- select([table1, a1], use_labels=True),
+ select([a1, table1], use_labels=True).subquery(),
+ select([table1, a1], use_labels=True).subquery(),
):
assert s.corresponding_column(table1.c.col1) is s.c.table1_col1
assert s.corresponding_column(a1.c.col1) is s.c.a1_col1
def test_join_against_self(self):
- jj = select([table1.c.col1.label("bar_col1")])
+ jj = select([table1.c.col1.label("bar_col1")]).subquery()
jjj = join(table1, jj, table1.c.col1 == jj.c.bar_col1)
# test column directly against itself
+ # joins necessarily have to prefix column names with the name
+ # of the selectable, else the same-named columns will overwrite
+ # one another. In this case, we unfortunately have this unfriendly
+ # "anonymous" name, whereas before when select() could be a FROM
+ # the "bar_col1" label would be directly in the join() object. However
+ # this was a useless join() object because PG and MySQL don't accept
+ # unnamed subqueries in joins in any case.
+ name = "%s_bar_col1" % (jj.name,)
+
assert jjj.corresponding_column(jjj.c.table1_col1) is jjj.c.table1_col1
- assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c.bar_col1
+ assert jjj.corresponding_column(jj.c.bar_col1) is jjj.c[name]
# test alias of the join
@@ -267,13 +340,15 @@ class SelectableTest(
def test_clone_append_column(self):
sel = select([literal_column("1").label("a")])
- eq_(list(sel.c.keys()), ["a"])
+ eq_(list(sel.selected_columns.keys()), ["a"])
cloned = visitors.ReplacingCloningVisitor().traverse(sel)
cloned.append_column(literal_column("2").label("b"))
cloned.append_column(func.foo())
- eq_(list(cloned.c.keys()), ["a", "b", "foo()"])
+ eq_(list(cloned.selected_columns.keys()), ["a", "b", "foo()"])
- def test_append_column_after_replace_selectable(self):
+ def test_append_column_after_visitor_replace(self):
+ # test for a supported idiom that matches the deprecated / removed
+ # replace_selectable method
basesel = select([literal_column("1").label("a")])
tojoin = select(
[literal_column("1").label("a"), literal_column("2").label("b")]
@@ -281,9 +356,14 @@ class SelectableTest(
basefrom = basesel.alias("basefrom")
joinfrom = tojoin.alias("joinfrom")
sel = select([basefrom.c.a])
- replaced = sel.replace_selectable(
- basefrom, basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a)
- )
+
+ replace_from = basefrom.join(joinfrom, basefrom.c.a == joinfrom.c.a)
+
+ def replace(elem):
+ if elem is basefrom:
+ return replace_from
+
+ replaced = visitors.replacement_traverse(sel, {}, replace)
self.assert_compile(
replaced,
"SELECT basefrom.a FROM (SELECT 1 AS a) AS basefrom "
@@ -302,7 +382,7 @@ class SelectableTest(
# test that corresponding column digs across
# clone boundaries with anonymous labeled elements
col = func.count().label("foo")
- sel = select([col])
+ sel = select([col]).subquery()
sel2 = visitors.ReplacingCloningVisitor().traverse(sel)
assert sel2.corresponding_column(col) is sel2.c.foo
@@ -323,13 +403,16 @@ class SelectableTest(
impl = Integer
stmt = select([type_coerce(column("x"), MyType).label("foo")])
- stmt2 = stmt.select()
+ subq = stmt.subquery()
+ stmt2 = subq.select()
+ subq2 = stmt2.subquery()
assert isinstance(stmt._raw_columns[0].type, MyType)
- assert isinstance(stmt.c.foo.type, MyType)
- assert isinstance(stmt2.c.foo.type, MyType)
+ assert isinstance(subq.c.foo.type, MyType)
+ assert isinstance(stmt2.selected_columns.foo.type, MyType)
+ assert isinstance(subq2.c.foo.type, MyType)
- def test_select_on_table(self):
- sel = select([table1, table2], use_labels=True)
+ def test_subquery_on_table(self):
+ sel = select([table1, table2], use_labels=True).subquery()
assert sel.corresponding_column(table1.c.col1) is sel.c.table1_col1
assert (
@@ -383,7 +466,7 @@ class SelectableTest(
"AS a",
)
- def test_union(self):
+ def test_union_correspondence(self):
# tests that we can correspond a column in a Select statement
# with a certain Table, against a column in a Union where one of
@@ -411,8 +494,34 @@ class SelectableTest(
s1 = table1.select(use_labels=True)
s2 = table2.select(use_labels=True)
- assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
- assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
+ assert (
+ u.corresponding_column(s1.selected_columns.table1_col2)
+ is u.selected_columns.col2
+ )
+
+ # right now, the "selected_columns" of a union are those of the
+ # first selectable. so without using a subquery that represents
+ # all the SELECTs in the union, we can't do corresponding column
+ # like this. perhaps compoundselect shouldn't even implement
+ # .corresponding_column directly
+ assert (
+ u.corresponding_column(s2.selected_columns.table2_col2) is None
+ ) # really? u.selected_columns.col2
+
+ usub = u.subquery()
+ assert (
+ usub.corresponding_column(s1.selected_columns.table1_col2)
+ is usub.c.col2
+ )
+ assert (
+ usub.corresponding_column(s2.selected_columns.table2_col2)
+ is usub.c.col2
+ )
+
+ s1sub = s1.subquery()
+ s2sub = s2.subquery()
+ assert usub.corresponding_column(s1sub.c.table1_col2) is usub.c.col2
+ assert usub.corresponding_column(s2sub.c.table2_col2) is usub.c.col2
def test_union_precedence(self):
# conflicting column correspondence should be resolved based on
@@ -423,11 +532,11 @@ class SelectableTest(
s3 = select([table1.c.col3, table1.c.colx])
s4 = select([table1.c.colx, table1.c.col3])
- u1 = union(s1, s2)
+ u1 = union(s1, s2).subquery()
assert u1.corresponding_column(table1.c.col1) is u1.c.col1
assert u1.corresponding_column(table1.c.col2) is u1.c.col2
- u1 = union(s1, s2, s3, s4)
+ u1 = union(s1, s2, s3, s4).subquery()
assert u1.corresponding_column(table1.c.col1) is u1.c.col1
assert u1.corresponding_column(table1.c.col2) is u1.c.col2
assert u1.corresponding_column(table1.c.colx) is u1.c.col2
@@ -437,12 +546,12 @@ class SelectableTest(
s1 = select([table1.c.col1, table1.c.col2])
s2 = select([table1.c.col2, table1.c.col1])
- for c in s1.c:
+ for c in s1.selected_columns:
c.proxy_set
- for c in s2.c:
+ for c in s2.selected_columns:
c.proxy_set
- u1 = union(s1, s2)
+ u1 = union(s1, s2).subquery()
assert u1.corresponding_column(table1.c.col2) is u1.c.col2
def test_singular_union(self):
@@ -451,9 +560,9 @@ class SelectableTest(
select([table1.c.col1, table1.c.col2, table1.c.col3]),
)
u = union(select([table1.c.col1, table1.c.col2, table1.c.col3]))
- assert u.c.col1 is not None
- assert u.c.col2 is not None
- assert u.c.col3 is not None
+ assert u.selected_columns.col1 is not None
+ assert u.selected_columns.col2 is not None
+ assert u.selected_columns.col3 is not None
def test_alias_union(self):
@@ -482,8 +591,8 @@ class SelectableTest(
)
.alias("analias")
)
- s1 = table1.select(use_labels=True)
- s2 = table2.select(use_labels=True)
+ s1 = table1.select(use_labels=True).subquery()
+ s2 = table2.select(use_labels=True).subquery()
assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
assert u.corresponding_column(s2.c.table2_coly) is u.c.coly
@@ -493,13 +602,15 @@ class SelectableTest(
s1 = select([table1.c.col1, table1.c.col2])
s2 = select([table1.c.col1, table1.c.col2]).alias()
- u1 = union(s1, s2)
- assert u1.corresponding_column(s1.c.col1) is u1.c.col1
- assert u1.corresponding_column(s2.c.col1) is u1.c.col1
-
- u2 = union(s2, s1)
- assert u2.corresponding_column(s1.c.col1) is u2.c.col1
- assert u2.corresponding_column(s2.c.col1) is u2.c.col1
+ # previously this worked
+ assert_raises_message(
+ exc.ArgumentError,
+ "SELECT construct for inclusion in a UNION or "
+ "other set construct expected",
+ union,
+ s1,
+ s2,
+ )
def test_union_of_text(self):
s1 = select([table1.c.col1, table1.c.col2])
@@ -507,77 +618,130 @@ class SelectableTest(
column("col1"), column("col2")
)
- u1 = union(s1, s2)
- assert u1.corresponding_column(s1.c.col1) is u1.c.col1
- assert u1.corresponding_column(s2.c.col1) is u1.c.col1
+ u1 = union(s1, s2).subquery()
+ assert u1.corresponding_column(s1.selected_columns.col1) is u1.c.col1
+ assert u1.corresponding_column(s2.selected_columns.col1) is u1.c.col1
+
+ u2 = union(s2, s1).subquery()
+ assert u2.corresponding_column(s1.selected_columns.col1) is u2.c.col1
+ assert u2.corresponding_column(s2.selected_columns.col1) is u2.c.col1
+
+ def test_foo(self):
+ s1 = select([table1.c.col1, table1.c.col2])
+ s2 = select([table1.c.col2, table1.c.col1])
+
+ u1 = union(s1, s2).subquery()
+ assert u1.corresponding_column(table1.c.col2) is u1.c.col2
+
+ metadata = MetaData()
+ table1_new = Table(
+ "table1",
+ metadata,
+ Column("col1", Integer, primary_key=True),
+ Column("col2", String(20)),
+ Column("col3", Integer),
+ Column("colx", Integer),
+ )
+ # table1_new = table1
+
+ s1 = select([table1_new.c.col1, table1_new.c.col2])
+ s2 = select([table1_new.c.col2, table1_new.c.col1])
+ u1 = union(s1, s2).subquery()
- u2 = union(s2, s1)
- assert u2.corresponding_column(s1.c.col1) is u2.c.col1
- assert u2.corresponding_column(s2.c.col1) is u2.c.col1
+ # TODO: failing due to proxy_set not correct
+ assert u1.corresponding_column(table1_new.c.col2) is u1.c.col2
- @testing.emits_warning("Column 'col1'")
- def test_union_dupe_keys(self):
+ def test_union_alias_dupe_keys(self):
s1 = select([table1.c.col1, table1.c.col2, table2.c.col1])
s2 = select([table2.c.col1, table2.c.col2, table2.c.col3])
- u1 = union(s1, s2)
+ u1 = union(s1, s2).subquery()
+
+ with testing.expect_warnings("Column 'col1'"):
+ u1.c
assert (
- u1.corresponding_column(s1.c._all_columns[0])
+ u1.corresponding_column(s1.selected_columns._all_columns[0])
is u1.c._all_columns[0]
)
- assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0]
- assert u1.corresponding_column(s1.c.col2) is u1.c.col2
- assert u1.corresponding_column(s2.c.col2) is u1.c.col2
- assert u1.corresponding_column(s2.c.col3) is u1.c._all_columns[2]
+ # due to the duplicate key, "col1" is now the column at the end
+ # of the list and the first column is not accessible by key
+ assert u1.c.col1 is u1.c._all_columns[2]
+ # table2.c.col1 is in two positions in this union, so...currently
+ # it is the replaced one at position 2.
assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[2]
- assert u1.corresponding_column(table2.c.col3) is u1.c._all_columns[2]
- @testing.emits_warning("Column 'col1'")
- def test_union_alias_dupe_keys(self):
- s1 = select([table1.c.col1, table1.c.col2, table2.c.col1]).alias()
- s2 = select([table2.c.col1, table2.c.col2, table2.c.col3])
- u1 = union(s1, s2)
+ # this is table2.c.col1 in both cases, so this is "right"
+ assert u1.corresponding_column(s2.selected_columns.col1) is u1.c.col1
+ # same
+ assert u1.corresponding_column(s2.subquery().c.col1) is u1.c.col1
+
+ # col2 is working OK
+ assert u1.corresponding_column(s1.selected_columns.col2) is u1.c.col2
assert (
- u1.corresponding_column(s1.c._all_columns[0])
- is u1.c._all_columns[0]
+ u1.corresponding_column(s1.selected_columns.col2)
+ is u1.c._all_columns[1]
)
- assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0]
- assert u1.corresponding_column(s1.c.col2) is u1.c.col2
- assert u1.corresponding_column(s2.c.col2) is u1.c.col2
+ assert u1.corresponding_column(s2.selected_columns.col2) is u1.c.col2
+ assert (
+ u1.corresponding_column(s2.selected_columns.col2)
+ is u1.c._all_columns[1]
+ )
+ assert u1.corresponding_column(s2.subquery().c.col2) is u1.c.col2
- assert u1.corresponding_column(s2.c.col3) is u1.c._all_columns[2]
+ # col3 is also "correct" , though confusing
+ assert u1.corresponding_column(s2.selected_columns.col3) is u1.c.col1
- # this differs from the non-alias test because table2.c.col1 is
- # more directly at s2.c.col1 than it is s1.c.col1.
- assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[0]
+ assert u1.corresponding_column(table1.c.col1) is u1.c._all_columns[0]
+ assert u1.corresponding_column(table1.c.col2) is u1.c._all_columns[1]
+ assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[2]
+ assert u1.corresponding_column(table2.c.col2) is u1.c._all_columns[1]
assert u1.corresponding_column(table2.c.col3) is u1.c._all_columns[2]
- @testing.emits_warning("Column 'col1'")
def test_union_alias_dupe_keys_grouped(self):
- s1 = (
- select([table1.c.col1, table1.c.col2, table2.c.col1])
- .limit(1)
- .alias()
- )
+ s1 = select([table1.c.col1, table1.c.col2, table2.c.col1]).limit(1)
s2 = select([table2.c.col1, table2.c.col2, table2.c.col3]).limit(1)
- u1 = union(s1, s2)
+ u1 = union(s1, s2).subquery()
+
+ with testing.expect_warnings("Column 'col1'"):
+ u1.c
+
+ # due to the duplicate key, "col1" is now the column at the end
+ # of the list and the first column is not accessible by key
+ assert u1.c.col1 is u1.c._all_columns[2]
+
+ # table2.c.col1 is in two positions in this union, so...currently
+ # it is the replaced one at position 2.
+ assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[2]
+
+ # this is table2.c.col1 in both cases, so this is "right"
+ assert u1.corresponding_column(s2.selected_columns.col1) is u1.c.col1
+ # same
+ assert u1.corresponding_column(s2.subquery().c.col1) is u1.c.col1
+
+ # col2 is working OK
+ assert u1.corresponding_column(s1.selected_columns.col2) is u1.c.col2
assert (
- u1.corresponding_column(s1.c._all_columns[0])
- is u1.c._all_columns[0]
+ u1.corresponding_column(s1.selected_columns.col2)
+ is u1.c._all_columns[1]
+ )
+ assert u1.corresponding_column(s2.selected_columns.col2) is u1.c.col2
+ assert (
+ u1.corresponding_column(s2.selected_columns.col2)
+ is u1.c._all_columns[1]
)
- assert u1.corresponding_column(s2.c.col1) is u1.c._all_columns[0]
- assert u1.corresponding_column(s1.c.col2) is u1.c.col2
- assert u1.corresponding_column(s2.c.col2) is u1.c.col2
+ assert u1.corresponding_column(s2.subquery().c.col2) is u1.c.col2
- assert u1.corresponding_column(s2.c.col3) is u1.c._all_columns[2]
+ # col3 is also "correct" , though confusing
+ assert u1.corresponding_column(s2.selected_columns.col3) is u1.c.col1
- # this differs from the non-alias test because table2.c.col1 is
- # more directly at s2.c.col1 than it is s1.c.col1.
- assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[0]
+ assert u1.corresponding_column(table1.c.col1) is u1.c._all_columns[0]
+ assert u1.corresponding_column(table1.c.col2) is u1.c._all_columns[1]
+ assert u1.corresponding_column(table2.c.col1) is u1.c._all_columns[2]
+ assert u1.corresponding_column(table2.c.col2) is u1.c._all_columns[1]
assert u1.corresponding_column(table2.c.col3) is u1.c._all_columns[2]
def test_select_union(self):
@@ -607,9 +771,9 @@ class SelectableTest(
)
.alias("analias")
)
- s = select([u])
- s1 = table1.select(use_labels=True)
- s2 = table2.select(use_labels=True)
+ s = select([u]).subquery()
+ s1 = table1.select(use_labels=True).subquery()
+ s2 = table2.select(use_labels=True).subquery()
assert s.corresponding_column(s1.c.table1_col2) is s.c.col2
assert s.corresponding_column(s2.c.table2_col2) is s.c.col2
@@ -653,15 +817,15 @@ class SelectableTest(
criterion = a.c.table1_col1 == b.c.col2
self.assert_(criterion.compare(j.onclause))
- def test_select_alias(self):
+ def test_select_subquery_join(self):
a = table1.select().alias("a")
j = join(a, table2)
criterion = a.c.col1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
- def test_select_labels(self):
- a = table1.select(use_labels=True)
+ def test_subquery_labels_join(self):
+ a = table1.select(use_labels=True).subquery()
j = join(a, table2)
criterion = a.c.table1_col1 == table2.c.col2
@@ -683,18 +847,27 @@ class SelectableTest(
table1.c.col2.label("acol2"),
table1.c.col3.label("acol3"),
]
- )
+ ).subquery()
j = join(a, table2)
criterion = a.c.acol1 == table2.c.col2
self.assert_(criterion.compare(j.onclause))
- def test_labeled_select_correspoinding(self):
+ def test_labeled_select_corresponding(self):
l1 = select([func.max(table1.c.col1)]).label("foo")
s = select([l1])
- eq_(s.corresponding_column(l1), s.c.foo)
+ eq_(s.corresponding_column(l1), s.selected_columns.foo)
s = select([table1.c.col1, l1])
+ eq_(s.corresponding_column(l1), s.selected_columns.foo)
+
+ def test_labeled_subquery_corresponding(self):
+ l1 = select([func.max(table1.c.col1)]).label("foo")
+ s = select([l1]).subquery()
+
+ eq_(s.corresponding_column(l1), s.c.foo)
+
+ s = select([table1.c.col1, l1]).subquery()
eq_(s.corresponding_column(l1), s.c.foo)
def test_select_alias_labels(self):
@@ -724,7 +897,7 @@ class SelectableTest(
t2 = Table("t2", m, Column("id", Integer, ForeignKey("t1.id")))
t3 = Table("t3", m2, Column("id", Integer, ForeignKey("t1.id2")))
- s = select([t2, t3], use_labels=True)
+ s = select([t2, t3], use_labels=True).subquery()
assert_raises(exc.NoReferencedTableError, s.join, t1)
@@ -732,19 +905,19 @@ class SelectableTest(
# See [ticket:2167] for this one.
l1 = table1.c.col1.label("a")
l2 = select([l1]).label("b")
- s = select([l2])
+ s = select([l2]).subquery()
assert s.c.b is not None
self.assert_compile(
s.select(),
- "SELECT b FROM "
- "(SELECT (SELECT table1.col1 AS a FROM table1) AS b)",
+ "SELECT anon_1.b FROM "
+ "(SELECT (SELECT table1.col1 AS a FROM table1) AS b) AS anon_1",
)
- s2 = select([s.label("c")])
+ s2 = select([s.element.label("c")]).subquery()
self.assert_compile(
s2.select(),
- "SELECT c FROM (SELECT (SELECT ("
- "SELECT table1.col1 AS a FROM table1) AS b) AS c)",
+ "SELECT anon_1.c FROM (SELECT (SELECT ("
+ "SELECT table1.col1 AS a FROM table1) AS b) AS c) AS anon_1",
)
def test_self_referential_select_raises(self):
@@ -752,7 +925,8 @@ class SelectableTest(
s = select([t])
- s.append_whereclause(s.c.x > 5)
+ with testing.expect_deprecated("The SelectBase.c"):
+ s.append_whereclause(s.c.x > 5)
assert_raises_message(
exc.InvalidRequestError,
r"select\(\) construct refers to itself as a FROM",
@@ -762,7 +936,7 @@ class SelectableTest(
def test_unusual_column_elements_text(self):
"""test that .c excludes text()."""
- s = select([table1.c.col1, text("foo")])
+ s = select([table1.c.col1, text("foo")]).subquery()
eq_(list(s.c), [s.c.col1])
def test_unusual_column_elements_clauselist(self):
@@ -770,14 +944,16 @@ class SelectableTest(
from sqlalchemy.sql.expression import ClauseList
- s = select([table1.c.col1, ClauseList(table1.c.col2, table1.c.col3)])
+ s = select(
+ [table1.c.col1, ClauseList(table1.c.col2, table1.c.col3)]
+ ).subquery()
eq_(list(s.c), [s.c.col1, s.c.col2, s.c.col3])
def test_unusual_column_elements_boolean_clauselist(self):
"""test that BooleanClauseList is placed as single element in .c."""
c2 = and_(table1.c.col2 == 5, table1.c.col3 == 4)
- s = select([table1.c.col1, c2])
+ s = select([table1.c.col1, c2]).subquery()
eq_(list(s.c), [s.c.col1, s.corresponding_column(c2)])
def test_from_list_deferred_constructor(self):
@@ -936,11 +1112,11 @@ class RefreshForNewColTest(fixtures.TestBase):
a = table("a", column("x"))
b = table("b", column("y"))
s = select([a, b]).apply_labels()
- s.c
+ s.selected_columns
q = column("x")
b.append_column(q)
s._refresh_for_new_column(q)
- assert q in s.c.b_x.proxy_set
+ assert q in s.selected_columns.b_x.proxy_set
def test_alias_alias_samename_init(self):
a = table("a", column("x"))
@@ -954,8 +1130,11 @@ class RefreshForNewColTest(fixtures.TestBase):
q = column("x")
b.append_column(q)
+ assert "_columns" in s2.__dict__
+
s2._refresh_for_new_column(q)
+ assert "_columns" not in s2.__dict__
is_(s1.corresponding_column(s2.c.b_x), s1.c.b_x)
def test_aliased_select_samename_uninit(self):
@@ -1005,23 +1184,18 @@ class RefreshForNewColTest(fixtures.TestBase):
q = column("q")
a.append_column(q)
s3._refresh_for_new_column(q)
- assert a.c.q in s3.c.q.proxy_set
+ assert a.c.q in s3.selected_columns.q.proxy_set
- def test_union_init_raises(self):
+ def test_union_init(self):
a = table("a", column("x"))
s1 = select([a])
s2 = select([a])
s3 = s1.union(s2)
- s3.c
+ s3.selected_columns
q = column("q")
a.append_column(q)
- assert_raises_message(
- NotImplementedError,
- "CompoundSelect constructs don't support addition of "
- "columns to underlying selectables",
- s3._refresh_for_new_column,
- q,
- )
+ s3._refresh_for_new_column(q)
+ assert a.c.q in s3.selected_columns.q.proxy_set
def test_nested_join_uninit(self):
a = table("a", column("x"))
@@ -1195,7 +1369,7 @@ class JoinAliasingTest(fixtures.TestBase, AssertsCompiledSQL):
class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
- def test_join_condition(self):
+ def test_join_condition_one(self):
m = MetaData()
t1 = Table("t1", m, Column("id", Integer))
t2 = Table(
@@ -1211,13 +1385,6 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
t4 = Table(
"t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id"))
)
- t5 = Table(
- "t5",
- m,
- Column("t1id1", ForeignKey("t1.id")),
- Column("t1id2", ForeignKey("t1.id")),
- )
-
t1t2 = t1.join(t2)
t2t3 = t2.join(t3)
@@ -1235,6 +1402,32 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
sql_util.join_condition(left, right, a_subset=a_subset)
)
+ def test_join_condition_two(self):
+ m = MetaData()
+ t1 = Table("t1", m, Column("id", Integer))
+ t2 = Table(
+ "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id"))
+ )
+ t3 = Table(
+ "t3",
+ m,
+ Column("id", Integer),
+ Column("t1id", ForeignKey("t1.id")),
+ Column("t2id", ForeignKey("t2.id")),
+ )
+ t4 = Table(
+ "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id"))
+ )
+ t5 = Table(
+ "t5",
+ m,
+ Column("t1id1", ForeignKey("t1.id")),
+ Column("t1id2", ForeignKey("t1.id")),
+ )
+
+ t1t2 = t1.join(t2)
+ t2t3 = t2.join(t3)
+
# these are ambiguous, or have no joins
for left, right, a_subset in [
(t1t2, t3, None),
@@ -1242,7 +1435,7 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
(t1, t4, None),
(t1t2, t2t3, None),
(t5, t1, None),
- (t5.select(use_labels=True), t1, None),
+ (t5.select(use_labels=True).subquery(), t1, None),
]:
assert_raises(
exc.ArgumentError,
@@ -1252,6 +1445,24 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
a_subset=a_subset,
)
+ def test_join_condition_three(self):
+ m = MetaData()
+ t1 = Table("t1", m, Column("id", Integer))
+ t2 = Table(
+ "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id"))
+ )
+ t3 = Table(
+ "t3",
+ m,
+ Column("id", Integer),
+ Column("t1id", ForeignKey("t1.id")),
+ Column("t2id", ForeignKey("t2.id")),
+ )
+ t4 = Table(
+ "t4", m, Column("id", Integer), Column("t2id", ForeignKey("t2.id"))
+ )
+ t1t2 = t1.join(t2)
+ t2t3 = t2.join(t3)
als = t2t3.alias()
# test join's behavior, including natural
for left, right, expected in [
@@ -1266,6 +1477,22 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
]:
assert expected.compare(left.join(right).onclause)
+ def test_join_condition_four(self):
+ m = MetaData()
+ t1 = Table("t1", m, Column("id", Integer))
+ t2 = Table(
+ "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id"))
+ )
+ t3 = Table(
+ "t3",
+ m,
+ Column("id", Integer),
+ Column("t1id", ForeignKey("t1.id")),
+ Column("t2id", ForeignKey("t2.id")),
+ )
+ t1t2 = t1.join(t2)
+ t2t3 = t2.join(t3)
+
# these are right-nested joins
j = t1t2.join(t2t3)
assert j.onclause.compare(t2.c.id == t3.c.t2id)
@@ -1275,7 +1502,23 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
"(t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3.t2id",
)
- st2t3 = t2t3.select(use_labels=True)
+ def test_join_condition_five(self):
+ m = MetaData()
+ t1 = Table("t1", m, Column("id", Integer))
+ t2 = Table(
+ "t2", m, Column("id", Integer), Column("t1id", ForeignKey("t1.id"))
+ )
+ t3 = Table(
+ "t3",
+ m,
+ Column("id", Integer),
+ Column("t1id", ForeignKey("t1.id")),
+ Column("t2id", ForeignKey("t2.id")),
+ )
+ t1t2 = t1.join(t2)
+ t2t3 = t2.join(t3)
+
+ st2t3 = t2t3.select(use_labels=True).subquery()
j = t1t2.join(st2t3)
assert j.onclause.compare(t2.c.id == st2t3.c.t3_t2id)
self.assert_compile(
@@ -1283,7 +1526,8 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
"t1 JOIN t2 ON t1.id = t2.t1id JOIN "
"(SELECT t2.id AS t2_id, t2.t1id AS t2_t1id, "
"t3.id AS t3_id, t3.t1id AS t3_t1id, t3.t2id AS t3_t2id "
- "FROM t2 JOIN t3 ON t2.id = t3.t2id) ON t2.id = t3_t2id",
+ "FROM t2 JOIN t3 ON t2.id = t3.t2id) AS anon_1 "
+ "ON t2.id = anon_1.t3_t2id",
)
def test_join_multiple_equiv_fks(self):
@@ -1582,8 +1826,10 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
Column("manager_id", Integer, primary_key=True),
Column("manager_name", String(50)),
)
- s = select([engineers, managers]).where(
- engineers.c.engineer_name == managers.c.manager_name
+ s = (
+ select([engineers, managers])
+ .where(engineers.c.engineer_name == managers.c.manager_name)
+ .subquery()
)
eq_(
util.column_set(sql_util.reduce_columns(list(s.c), s)),
@@ -1629,7 +1875,16 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
)
s1 = select([t1, t2])
s1 = s1.reduce_columns(only_synonyms=True)
- eq_(set(s1.c), set([s1.c.x, s1.c.y, s1.c.q]))
+ eq_(
+ set(s1.selected_columns),
+ set(
+ [
+ s1.selected_columns.x,
+ s1.selected_columns.y,
+ s1.selected_columns.q,
+ ]
+ ),
+ )
def test_reduce_only_synonym_lineage(self):
m = MetaData()
@@ -1642,7 +1897,7 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
)
# test that the first appearance in the columns clause
# wins - t1 is first, t1.c.x wins
- s1 = select([t1])
+ s1 = select([t1]).subquery()
s2 = select([t1, s1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
eq_(
set(s2.reduce_columns().inner_columns),
@@ -1650,7 +1905,7 @@ class ReduceTest(fixtures.TestBase, AssertsExecutionResults):
)
# reverse order, s1.c.x wins
- s1 = select([t1])
+ s1 = select([t1]).subquery()
s2 = select([s1, t1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
eq_(
set(s2.reduce_columns().inner_columns),
@@ -1939,7 +2194,7 @@ class AnnotationsTest(fixtures.TestBase):
assert (t.c.x == 5).compare(x_a == 5)
assert not (t.c.y == 5).compare(x_a == 5)
- s = select([t])
+ s = select([t]).subquery()
x_p = s.c.x
assert not x_a.compare(x_p)
assert not t.c.x.compare(x_p)
@@ -2025,12 +2280,12 @@ class AnnotationsTest(fixtures.TestBase):
_constructor = Column
t1 = Table("t1", MetaData(), MyColumn())
- s1 = t1.select()
+ s1 = t1.select().subquery()
assert isinstance(t1.c.foo, MyColumn)
assert isinstance(s1.c.foo, Column)
annot_1 = t1.c.foo._annotate({})
- s2 = select([annot_1])
+ s2 = select([annot_1]).subquery()
assert isinstance(s2.c.foo, Column)
annot_2 = s1._annotate({})
assert isinstance(annot_2.c.foo, Column)
@@ -2064,7 +2319,7 @@ class AnnotationsTest(fixtures.TestBase):
def test_annotated_corresponding_column(self):
table1 = table("table1", column("col1"))
- s1 = select([table1.c.col1])
+ s1 = select([table1.c.col1]).subquery()
t1 = s1._annotate({})
t2 = s1
@@ -2075,7 +2330,7 @@ class AnnotationsTest(fixtures.TestBase):
assert t1.c is t2.c
assert t1.c.col1 is t2.c.col1
- inner = select([s1])
+ inner = select([s1]).subquery()
assert (
inner.corresponding_column(t2.c.col1, require_embedded=False)
@@ -2278,7 +2533,7 @@ class AnnotationsTest(fixtures.TestBase):
a1 = table1.alias()
s = select([a1.c.x]).select_from(a1.join(table2, a1.c.x == table2.c.y))
- assert_s = select([select([s])])
+ assert_s = select([select([s.subquery()]).subquery()])
for fn in (
sql_util._deep_deannotate,
lambda s: sql_util._deep_annotate(s, {"foo": "bar"}),
@@ -2286,7 +2541,7 @@ class AnnotationsTest(fixtures.TestBase):
lambda s: visitors.replacement_traverse(s, {}, lambda x: None),
):
- sel = fn(select([fn(select([fn(s)]))]))
+ sel = fn(select([fn(select([fn(s.subquery())]).subquery())]))
eq_(str(assert_s), str(sel))
def test_bind_unique_test(self):
@@ -2359,7 +2614,7 @@ class WithLabelsTest(fixtures.TestBase):
assert_raises_message(
exc.SAWarning,
r"replaced by Column.*, which has the same key",
- lambda: s.c,
+ lambda: s.subquery().c,
)
def _assert_result_keys(self, s, keys):
@@ -2367,7 +2622,7 @@ class WithLabelsTest(fixtures.TestBase):
eq_(set(compiled._create_result_map()), set(keys))
def _assert_subq_result_keys(self, s, keys):
- compiled = s.select().compile()
+ compiled = s.subquery().select().compile()
eq_(set(compiled._create_result_map()), set(keys))
def _names_overlap(self):
@@ -2383,7 +2638,8 @@ class WithLabelsTest(fixtures.TestBase):
def test_names_overlap_label(self):
sel = self._names_overlap().apply_labels()
- eq_(list(sel.c.keys()), ["t1_x", "t2_x"])
+ eq_(list(sel.selected_columns.keys()), ["t1_x", "t2_x"])
+ eq_(list(sel.subquery().c.keys()), ["t1_x", "t2_x"])
self._assert_result_keys(sel, ["t1_x", "t2_x"])
def _names_overlap_keys_dont(self):
@@ -2394,12 +2650,14 @@ class WithLabelsTest(fixtures.TestBase):
def test_names_overlap_keys_dont_nolabel(self):
sel = self._names_overlap_keys_dont()
- eq_(list(sel.c.keys()), ["a", "b"])
+ eq_(list(sel.selected_columns.keys()), ["a", "b"])
+ eq_(list(sel.subquery().c.keys()), ["a", "b"])
self._assert_result_keys(sel, ["x"])
def test_names_overlap_keys_dont_label(self):
sel = self._names_overlap_keys_dont().apply_labels()
- eq_(list(sel.c.keys()), ["t1_a", "t2_b"])
+ eq_(list(sel.selected_columns.keys()), ["t1_a", "t2_b"])
+ eq_(list(sel.subquery().c.keys()), ["t1_a", "t2_b"])
self._assert_result_keys(sel, ["t1_x", "t2_x"])
def _labels_overlap(self):
@@ -2410,13 +2668,15 @@ class WithLabelsTest(fixtures.TestBase):
def test_labels_overlap_nolabel(self):
sel = self._labels_overlap()
- eq_(list(sel.c.keys()), ["x_id", "id"])
+ eq_(list(sel.selected_columns.keys()), ["x_id", "id"])
+ eq_(list(sel.subquery().c.keys()), ["x_id", "id"])
self._assert_result_keys(sel, ["x_id", "id"])
def test_labels_overlap_label(self):
sel = self._labels_overlap().apply_labels()
t2 = sel.froms[1]
- eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label])
+ eq_(list(sel.selected_columns.keys()), ["t_x_id", t2.c.id.anon_label])
+ eq_(list(sel.subquery().c.keys()), ["t_x_id", t2.c.id.anon_label])
self._assert_result_keys(sel, ["t_x_id", "id_1"])
self._assert_subq_result_keys(sel, ["t_x_id", "id_1"])
@@ -2428,12 +2688,14 @@ class WithLabelsTest(fixtures.TestBase):
def test_labels_overlap_keylabels_dont_nolabel(self):
sel = self._labels_overlap_keylabels_dont()
- eq_(list(sel.c.keys()), ["a", "b"])
+ eq_(list(sel.selected_columns.keys()), ["a", "b"])
+ eq_(list(sel.subquery().c.keys()), ["a", "b"])
self._assert_result_keys(sel, ["x_id", "id"])
def test_labels_overlap_keylabels_dont_label(self):
sel = self._labels_overlap_keylabels_dont().apply_labels()
- eq_(list(sel.c.keys()), ["t_a", "t_x_b"])
+ eq_(list(sel.selected_columns.keys()), ["t_a", "t_x_b"])
+ eq_(list(sel.subquery().c.keys()), ["t_a", "t_x_b"])
self._assert_result_keys(sel, ["t_x_id", "id_1"])
def _keylabels_overlap_labels_dont(self):
@@ -2444,13 +2706,15 @@ class WithLabelsTest(fixtures.TestBase):
def test_keylabels_overlap_labels_dont_nolabel(self):
sel = self._keylabels_overlap_labels_dont()
- eq_(list(sel.c.keys()), ["x_id", "id"])
+ eq_(list(sel.selected_columns.keys()), ["x_id", "id"])
+ eq_(list(sel.subquery().c.keys()), ["x_id", "id"])
self._assert_result_keys(sel, ["a", "b"])
def test_keylabels_overlap_labels_dont_label(self):
sel = self._keylabels_overlap_labels_dont().apply_labels()
t2 = sel.froms[1]
- eq_(list(sel.c.keys()), ["t_x_id", t2.c.id.anon_label])
+ eq_(list(sel.selected_columns.keys()), ["t_x_id", t2.c.id.anon_label])
+ eq_(list(sel.subquery().c.keys()), ["t_x_id", t2.c.id.anon_label])
self._assert_result_keys(sel, ["t_a", "t_x_b"])
self._assert_subq_result_keys(sel, ["t_a", "t_x_b"])
@@ -2462,14 +2726,16 @@ class WithLabelsTest(fixtures.TestBase):
def test_keylabels_overlap_labels_overlap_nolabel(self):
sel = self._keylabels_overlap_labels_overlap()
- eq_(list(sel.c.keys()), ["x_a", "a"])
+ eq_(list(sel.selected_columns.keys()), ["x_a", "a"])
+ eq_(list(sel.subquery().c.keys()), ["x_a", "a"])
self._assert_result_keys(sel, ["x_id", "id"])
self._assert_subq_result_keys(sel, ["x_id", "id"])
def test_keylabels_overlap_labels_overlap_label(self):
sel = self._keylabels_overlap_labels_overlap().apply_labels()
t2 = sel.froms[1]
- eq_(list(sel.c.keys()), ["t_x_a", t2.c.a.anon_label])
+ eq_(list(sel.selected_columns.keys()), ["t_x_a", t2.c.a.anon_label])
+ eq_(list(sel.subquery().c.keys()), ["t_x_a", t2.c.a.anon_label])
self._assert_result_keys(sel, ["t_x_id", "id_1"])
self._assert_subq_result_keys(sel, ["t_x_id", "id_1"])
@@ -2486,7 +2752,8 @@ class WithLabelsTest(fixtures.TestBase):
def test_keys_overlap_names_dont_label(self):
sel = self._keys_overlap_names_dont().apply_labels()
- eq_(list(sel.c.keys()), ["t1_x", "t2_x"])
+ eq_(list(sel.selected_columns.keys()), ["t1_x", "t2_x"])
+ eq_(list(sel.subquery().c.keys()), ["t1_x", "t2_x"])
self._assert_result_keys(sel, ["t1_a", "t2_b"])
@@ -2653,38 +2920,24 @@ class ForUpdateTest(fixtures.TestBase, AssertsCompiledSQL):
class AliasTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
- def test_legacy_original_accessor(self):
- t = table("t", column("c"))
- a1 = t.alias()
- a2 = a1.alias()
- a3 = a2.alias()
-
- is_(a1.original, t)
- is_(a2.original, t)
- is_(a3.original, t)
-
- def test_wrapped(self):
+ def test_direct_element_hierarchy(self):
t = table("t", column("c"))
a1 = t.alias()
a2 = a1.alias()
a3 = a2.alias()
is_(a1.element, t)
- is_(a2.element, t)
- is_(a3.element, t)
-
- is_(a3.wrapped, a2)
- is_(a2.wrapped, a1)
- is_(a1.wrapped, t)
+ is_(a2.element, a1)
+ is_(a3.element, a2)
- def test_get_children_preserves_wrapped(self):
+ def test_get_children_preserves_multiple_nesting(self):
t = table("t", column("c"))
stmt = select([t])
a1 = stmt.alias()
a2 = a1.alias()
eq_(set(a2.get_children(column_collections=False)), {a1})
- def test_wrapped_correspondence(self):
+ def test_correspondence_multiple_nesting(self):
t = table("t", column("c"))
stmt = select([t])
a1 = stmt.alias()
@@ -2692,15 +2945,12 @@ class AliasTest(fixtures.TestBase, AssertsCompiledSQL):
is_(a1.corresponding_column(a2.c.c), a1.c.c)
- def test_copy_internals_preserves_wrapped(self):
+ def test_copy_internals_multiple_nesting(self):
t = table("t", column("c"))
stmt = select([t])
a1 = stmt.alias()
a2 = a1.alias()
- is_(a2.element, a2.wrapped.element)
-
a3 = a2._clone()
a3._copy_internals()
is_(a1.corresponding_column(a3.c.c), a1.c.c)
- is_(a3.element, a3.wrapped.element)