diff options
Diffstat (limited to 'test/sql/test_cte.py')
| -rw-r--r-- | test/sql/test_cte.py | 148 |
1 files changed, 148 insertions, 0 deletions
diff --git a/test/sql/test_cte.py b/test/sql/test_cte.py index e8a8a3150..f1d27aa8f 100644 --- a/test/sql/test_cte.py +++ b/test/sql/test_cte.py @@ -1,4 +1,9 @@ +from sqlalchemy import Column from sqlalchemy import delete +from sqlalchemy import Integer +from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL +from sqlalchemy import MetaData +from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import update @@ -495,6 +500,149 @@ class CTETest(fixtures.TestBase, AssertsCompiledSQL): s.compile, ) + def test_with_recursive_no_name_currently_buggy(self): + s1 = select(1) + c1 = s1.cte(name="cte1", recursive=True) + + # this is nonsensical at the moment + self.assert_compile( + select(c1), + 'WITH RECURSIVE cte1("1") AS (SELECT 1) SELECT cte1.1 FROM cte1', + ) + + # however, so is subquery, which is worse as it isn't even trying + # to quote "1" as a label + self.assert_compile( + select(s1.subquery()), "SELECT anon_1.1 FROM (SELECT 1) AS anon_1" + ) + + def test_wrecur_dupe_col_names(self): + """test #6710""" + + manager = table("manager", column("id")) + employee = table("employee", column("id"), column("manager_id")) + + top_q = select(employee, manager).join_from( + employee, manager, employee.c.manager_id == manager.c.id + ) + + top_q = top_q.cte("cte", recursive=True) + + bottom_q = ( + select(employee, manager) + .join_from( + employee, manager, employee.c.manager_id == manager.c.id + ) + .join(top_q, top_q.c.id == employee.c.id) + ) + + rec_cte = select(top_q.union_all(bottom_q)) + self.assert_compile( + rec_cte, + "WITH RECURSIVE cte(id, manager_id, id_1) AS " + "(SELECT employee.id AS id, employee.manager_id AS manager_id, " + "manager.id AS id_1 FROM employee JOIN manager " + "ON employee.manager_id = manager.id UNION ALL " + "SELECT employee.id AS id, employee.manager_id AS manager_id, " + "manager.id AS id_1 FROM employee JOIN manager ON " + "employee.manager_id = manager.id " + "JOIN cte ON cte.id = employee.id) " + "SELECT cte.id, cte.manager_id, cte.id_1 FROM cte", + ) + + def test_wrecur_dupe_col_names_w_grouping(self): + """test #6710 + + by adding order_by() to the top query, the CTE will have + a compound select with the first element a SelectStatementGrouping + object, which we can test has the correct methods for the compiler + to call upon. + + """ + + manager = table("manager", column("id")) + employee = table("employee", column("id"), column("manager_id")) + + top_q = ( + select(employee, manager) + .join_from( + employee, manager, employee.c.manager_id == manager.c.id + ) + .order_by(employee.c.id) + .cte("cte", recursive=True) + ) + + bottom_q = ( + select(employee, manager) + .join_from( + employee, manager, employee.c.manager_id == manager.c.id + ) + .join(top_q, top_q.c.id == employee.c.id) + ) + + rec_cte = select(top_q.union_all(bottom_q)) + + self.assert_compile( + rec_cte, + "WITH RECURSIVE cte(id, manager_id, id_1) AS " + "((SELECT employee.id AS id, employee.manager_id AS manager_id, " + "manager.id AS id_1 FROM employee JOIN manager " + "ON employee.manager_id = manager.id ORDER BY employee.id) " + "UNION ALL " + "SELECT employee.id AS id, employee.manager_id AS manager_id, " + "manager.id AS id_1 FROM employee JOIN manager ON " + "employee.manager_id = manager.id " + "JOIN cte ON cte.id = employee.id) " + "SELECT cte.id, cte.manager_id, cte.id_1 FROM cte", + ) + + def test_wrecur_ovlp_lbls_plus_dupes_separate_keys_use_labels(self): + """test a condition related to #6710. + + also see test_compiler-> + test_overlapping_labels_plus_dupes_separate_keys_use_labels + + for a non cte form of this test. + + """ + + m = MetaData() + foo = Table( + "foo", + m, + Column("id", Integer), + Column("bar_id", Integer, key="bb"), + ) + foo_bar = Table("foo_bar", m, Column("id", Integer, key="bb")) + + stmt = select( + foo.c.id, + foo.c.bb, + foo_bar.c.bb, + foo.c.bb, + foo.c.id, + foo.c.bb, + foo_bar.c.bb, + foo_bar.c.bb, + ).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL) + + cte = stmt.cte(recursive=True) + + self.assert_compile( + select(cte), + "WITH RECURSIVE anon_1(foo_id, foo_bar_id, foo_bar_id_1) AS " + "(SELECT foo.id AS foo_id, foo.bar_id AS foo_bar_id, " + "foo_bar.id AS foo_bar_id_1, foo.bar_id AS foo_bar_id__1, " + "foo.id AS foo_id__1, foo.bar_id AS foo_bar_id__1, " + "foo_bar.id AS foo_bar_id__2, foo_bar.id AS foo_bar_id__2 " + "FROM foo, foo_bar) " + "SELECT anon_1.foo_id, anon_1.foo_bar_id, anon_1.foo_bar_id_1, " + "anon_1.foo_bar_id AS foo_bar_id_2, anon_1.foo_id AS foo_id_1, " + "anon_1.foo_bar_id AS foo_bar_id_3, " + "anon_1.foo_bar_id_1 AS foo_bar_id_1_1, " + "anon_1.foo_bar_id_1 AS foo_bar_id_1_2 FROM anon_1", + ) + def test_union(self): orders = table("orders", column("region"), column("amount")) |
