summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_compiler.py3
-rw-r--r--test/sql/test_cte.py148
-rw-r--r--test/sql/test_selectable.py6
3 files changed, 155 insertions, 2 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index f2c1e004d..40faab486 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -892,6 +892,9 @@ class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
"WITH RECURSIVE (colnames)" part. This test shows that this isn't
correct when keys are present.
+ See also test_cte ->
+ test_wrecur_ovlp_lbls_plus_dupes_separate_keys_use_labels
+
"""
m = MetaData()
foo = Table(
diff --git a/test/sql/test_cte.py b/test/sql/test_cte.py
index e8a8a3150..f1d27aa8f 100644
--- a/test/sql/test_cte.py
+++ b/test/sql/test_cte.py
@@ -1,4 +1,9 @@
+from sqlalchemy import Column
from sqlalchemy import delete
+from sqlalchemy import Integer
+from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
+from sqlalchemy import MetaData
+from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import update
@@ -495,6 +500,149 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL):
s.compile,
)
+ def test_with_recursive_no_name_currently_buggy(self):
+ s1 = select(1)
+ c1 = s1.cte(name="cte1", recursive=True)
+
+ # this is nonsensical at the moment
+ self.assert_compile(
+ select(c1),
+ 'WITH RECURSIVE cte1("1") AS (SELECT 1) SELECT cte1.1 FROM cte1',
+ )
+
+ # however, so is subquery, which is worse as it isn't even trying
+ # to quote "1" as a label
+ self.assert_compile(
+ select(s1.subquery()), "SELECT anon_1.1 FROM (SELECT 1) AS anon_1"
+ )
+
+ def test_wrecur_dupe_col_names(self):
+ """test #6710"""
+
+ manager = table("manager", column("id"))
+ employee = table("employee", column("id"), column("manager_id"))
+
+ top_q = select(employee, manager).join_from(
+ employee, manager, employee.c.manager_id == manager.c.id
+ )
+
+ top_q = top_q.cte("cte", recursive=True)
+
+ bottom_q = (
+ select(employee, manager)
+ .join_from(
+ employee, manager, employee.c.manager_id == manager.c.id
+ )
+ .join(top_q, top_q.c.id == employee.c.id)
+ )
+
+ rec_cte = select(top_q.union_all(bottom_q))
+ self.assert_compile(
+ rec_cte,
+ "WITH RECURSIVE cte(id, manager_id, id_1) AS "
+ "(SELECT employee.id AS id, employee.manager_id AS manager_id, "
+ "manager.id AS id_1 FROM employee JOIN manager "
+ "ON employee.manager_id = manager.id UNION ALL "
+ "SELECT employee.id AS id, employee.manager_id AS manager_id, "
+ "manager.id AS id_1 FROM employee JOIN manager ON "
+ "employee.manager_id = manager.id "
+ "JOIN cte ON cte.id = employee.id) "
+ "SELECT cte.id, cte.manager_id, cte.id_1 FROM cte",
+ )
+
+ def test_wrecur_dupe_col_names_w_grouping(self):
+ """test #6710
+
+ by adding order_by() to the top query, the CTE will have
+ a compound select with the first element a SelectStatementGrouping
+ object, which we can test has the correct methods for the compiler
+ to call upon.
+
+ """
+
+ manager = table("manager", column("id"))
+ employee = table("employee", column("id"), column("manager_id"))
+
+ top_q = (
+ select(employee, manager)
+ .join_from(
+ employee, manager, employee.c.manager_id == manager.c.id
+ )
+ .order_by(employee.c.id)
+ .cte("cte", recursive=True)
+ )
+
+ bottom_q = (
+ select(employee, manager)
+ .join_from(
+ employee, manager, employee.c.manager_id == manager.c.id
+ )
+ .join(top_q, top_q.c.id == employee.c.id)
+ )
+
+ rec_cte = select(top_q.union_all(bottom_q))
+
+ self.assert_compile(
+ rec_cte,
+ "WITH RECURSIVE cte(id, manager_id, id_1) AS "
+ "((SELECT employee.id AS id, employee.manager_id AS manager_id, "
+ "manager.id AS id_1 FROM employee JOIN manager "
+ "ON employee.manager_id = manager.id ORDER BY employee.id) "
+ "UNION ALL "
+ "SELECT employee.id AS id, employee.manager_id AS manager_id, "
+ "manager.id AS id_1 FROM employee JOIN manager ON "
+ "employee.manager_id = manager.id "
+ "JOIN cte ON cte.id = employee.id) "
+ "SELECT cte.id, cte.manager_id, cte.id_1 FROM cte",
+ )
+
+ def test_wrecur_ovlp_lbls_plus_dupes_separate_keys_use_labels(self):
+ """test a condition related to #6710.
+
+ also see test_compiler->
+ test_overlapping_labels_plus_dupes_separate_keys_use_labels
+
+ for a non cte form of this test.
+
+ """
+
+ m = MetaData()
+ foo = Table(
+ "foo",
+ m,
+ Column("id", Integer),
+ Column("bar_id", Integer, key="bb"),
+ )
+ foo_bar = Table("foo_bar", m, Column("id", Integer, key="bb"))
+
+ stmt = select(
+ foo.c.id,
+ foo.c.bb,
+ foo_bar.c.bb,
+ foo.c.bb,
+ foo.c.id,
+ foo.c.bb,
+ foo_bar.c.bb,
+ foo_bar.c.bb,
+ ).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+
+ cte = stmt.cte(recursive=True)
+
+ self.assert_compile(
+ select(cte),
+ "WITH RECURSIVE anon_1(foo_id, foo_bar_id, foo_bar_id_1) AS "
+ "(SELECT foo.id AS foo_id, foo.bar_id AS foo_bar_id, "
+ "foo_bar.id AS foo_bar_id_1, foo.bar_id AS foo_bar_id__1, "
+ "foo.id AS foo_id__1, foo.bar_id AS foo_bar_id__1, "
+ "foo_bar.id AS foo_bar_id__2, foo_bar.id AS foo_bar_id__2 "
+ "FROM foo, foo_bar) "
+ "SELECT anon_1.foo_id, anon_1.foo_bar_id, anon_1.foo_bar_id_1, "
+ "anon_1.foo_bar_id AS foo_bar_id_2, anon_1.foo_id AS foo_id_1, "
+ "anon_1.foo_bar_id AS foo_bar_id_3, "
+ "anon_1.foo_bar_id_1 AS foo_bar_id_1_1, "
+ "anon_1.foo_bar_id_1 AS foo_bar_id_1_2 FROM anon_1",
+ )
+
def test_union(self):
orders = table("orders", column("region"), column("amount"))
diff --git a/test/sql/test_selectable.py b/test/sql/test_selectable.py
index be894d239..cfdf4ad02 100644
--- a/test/sql/test_selectable.py
+++ b/test/sql/test_selectable.py
@@ -998,9 +998,11 @@ class SelectableTest(
self.assert_compile(
stmt,
"SELECT anon_1.col1, anon_1.col2, anon_1.col1_1 FROM "
- "((SELECT table1.col1, table1.col2, table2.col1 AS col1_1 "
+ "((SELECT table1.col1 AS col1, table1.col2 AS col2, table2.col1 "
+ "AS col1_1 "
"FROM table1, table2 LIMIT :param_1) UNION "
- "(SELECT table2.col1, table2.col2, table2.col3 FROM table2 "
+ "(SELECT table2.col1 AS col1, table2.col2 AS col2, "
+ "table2.col3 AS col3 FROM table2 "
"LIMIT :param_2)) AS anon_1",
)