diff options
Diffstat (limited to 'test/sql/test_functions.py')
-rw-r--r-- | test/sql/test_functions.py | 432 |
1 files changed, 0 insertions, 432 deletions
diff --git a/test/sql/test_functions.py b/test/sql/test_functions.py index 59accd245..50f50f0f0 100644 --- a/test/sql/test_functions.py +++ b/test/sql/test_functions.py @@ -5,15 +5,12 @@ import decimal from sqlalchemy import ARRAY from sqlalchemy import bindparam from sqlalchemy import Boolean -from sqlalchemy import cast from sqlalchemy import Column from sqlalchemy import Date from sqlalchemy import DateTime from sqlalchemy import extract -from sqlalchemy import Float from sqlalchemy import func from sqlalchemy import Integer -from sqlalchemy import JSON from sqlalchemy import literal from sqlalchemy import literal_column from sqlalchemy import Numeric @@ -23,7 +20,6 @@ from sqlalchemy import sql from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import testing -from sqlalchemy import true from sqlalchemy import types as sqltypes from sqlalchemy import util from sqlalchemy.dialects import mysql @@ -1152,431 +1148,3 @@ class RegisterTest(fixtures.TestBase, AssertsCompiledSQL): assert "not_registered_func" not in functions._registry["_default"] assert isinstance(func.not_registered_func_child().type, Integer) - - -class TableValuedCompileTest(fixtures.TestBase, AssertsCompiledSQL): - """test the full set of functions as FROM developed in [ticket:3566]""" - - __dialect__ = "default_enhanced" - - def test_aggregate_scalar_over_table_valued(self): - test = table("test", column("id"), column("data", JSON)) - - elem = ( - func.json_array_elements_text(test.c.data["key"]) - .table_valued("value") - .alias("elem") - ) - - maxdepth = select(func.max(cast(elem.c.value, Float))).label( - "maxdepth" - ) - - stmt = select(test.c.id.label("test_id"), maxdepth).order_by( - "maxdepth" - ) - - self.assert_compile( - stmt, - "SELECT test.id AS test_id, " - "(SELECT max(CAST(elem.value AS FLOAT)) AS max_1 " - "FROM json_array_elements_text(test.data[:data_1]) AS elem) " - "AS maxdepth " - "FROM test ORDER BY maxdepth", - ) - - def test_scalar_table_valued(self): - assets_transactions = table( - "assets_transactions", column("id"), column("contents", JSON) - ) - - stmt = select( - assets_transactions.c.id, - func.jsonb_each( - assets_transactions.c.contents - ).scalar_table_valued("key"), - func.jsonb_each( - assets_transactions.c.contents - ).scalar_table_valued("value"), - ) - self.assert_compile( - stmt, - "SELECT assets_transactions.id, " - "(jsonb_each(assets_transactions.contents)).key, " - "(jsonb_each(assets_transactions.contents)).value " - "FROM assets_transactions", - ) - - def test_table_valued_one(self): - assets_transactions = table( - "assets_transactions", column("id"), column("contents", JSON) - ) - - jb = func.jsonb_each(assets_transactions.c.contents).table_valued( - "key", "value" - ) - - stmt = select(assets_transactions.c.id, jb.c.key, jb.c.value).join( - jb, true() - ) - - self.assert_compile( - stmt, - "SELECT assets_transactions.id, anon_1.key, anon_1.value " - "FROM assets_transactions " - "JOIN jsonb_each(assets_transactions.contents) AS anon_1 ON true", - ) - - def test_table_valued_two(self): - """ - SELECT vi.id, vv.value - FROM value_ids() AS vi JOIN values AS vv ON vv.id = vi.id - - """ - - values = table( - "values", - column( - "id", - Integer, - ), - column("value", String), - ) - vi = func.value_ids().table_valued(column("id", Integer)).alias("vi") - vv = values.alias("vv") - - stmt = select(vi.c.id, vv.c.value).select_from( # noqa - vi.join(vv, vv.c.id == vi.c.id) - ) - self.assert_compile( - stmt, - "SELECT vi.id, vv.value FROM value_ids() AS vi " - "JOIN values AS vv ON vv.id = vi.id", - ) - - def test_table_as_record(self): - a = table( - "a", - column("id"), - column("x"), - column("y"), - ) - - stmt = select(func.row_to_json(a.record())) - - self.assert_compile( - stmt, "SELECT row_to_json(a) AS row_to_json_1 FROM a" - ) - - def test_subquery_as_record(self): - """ - SELECT row_to_json(anon_1) AS row_to_json_1 - FROM (SELECT a.id AS id, a.x AS x, a.y AS y - FROM a) AS anon_1 - - """ - - a = table( - "a", - column("id"), - column("x"), - column("y"), - ) - - stmt = select(func.row_to_json(a.select().subquery().record())) - - self.assert_compile( - stmt, - "SELECT row_to_json(anon_1) AS row_to_json_1 FROM " - "(SELECT a.id AS id, a.x AS x, a.y AS y FROM a) AS anon_1", - ) - - def test_scalar_subquery(self): - - a = table( - "a", - column("id"), - column("x"), - column("y"), - ) - - stmt = select(func.row_to_json(a.select().scalar_subquery())) - - self.assert_compile( - stmt, - "SELECT row_to_json((SELECT a.id, a.x, a.y FROM a)) " - "AS row_to_json_1", - ) - - def test_named_with_ordinality(self): - """ - SELECT a.id AS a_id, a.refs AS a_refs, - unnested.unnested AS unnested_unnested, - unnested.ordinality AS unnested_ordinality, - b.id AS b_id, b.ref AS b_ref - FROM a LEFT OUTER JOIN unnest(a.refs) - `WITH ORDINALITY AS unnested(unnested, ordinality) ON true - LEFT OUTER JOIN b ON unnested.unnested = b.ref - - """ # noqa 501 - - a = table("a", column("id"), column("refs")) - b = table("b", column("id"), column("ref")) - - unnested = ( - func.unnest(a.c.refs) - .named_table_valued("unnested", with_ordinality="ordinality") - .alias("unnested") - ) - - stmt = ( - select( - a.c.id, a.c.refs, unnested.c.unnested, unnested.c.ordinality - ) - .outerjoin(unnested, true()) - .outerjoin( - b, - unnested.c.unnested == b.c.ref, - ) - ) - self.assert_compile( - stmt, - "SELECT a.id, a.refs, unnested.unnested, unnested.ordinality " - "FROM a " - "LEFT OUTER JOIN unnest(a.refs) " - "WITH ORDINALITY AS unnested(unnested, ordinality) ON true " - "LEFT OUTER JOIN b ON unnested.unnested = b.ref", - ) - - def test_star_with_ordinality(self): - """ - SELECT * FROM generate_series(4,1,-1) WITH ORDINALITY; - """ - - stmt = select("*").select_from( # noqa - func.generate_series(4, 1, -1).table_valued( - with_ordinality="ordinality" - ) - ) - self.assert_compile( - stmt, - "SELECT * FROM generate_series" - "(:generate_series_1, :generate_series_2, :generate_series_3) " - "WITH ORDINALITY AS anon_1", - ) - - def test_json_object_keys_with_ordinality(self): - """ - SELECT * FROM json_object_keys('{"a1":"1","a2":"2","a3":"3"}') - WITH ORDINALITY AS t(keys, n); - """ - stmt = select("*").select_from( - func.json_object_keys( - literal({"a1": "1", "a2": "2", "a3": "3"}, type_=JSON) - ) - .named_table_valued("keys", with_ordinality="n") - .alias("t") - ) - - self.assert_compile( - stmt, - "SELECT * FROM json_object_keys(:param_1) " - "WITH ORDINALITY AS t(keys, n)", - ) - - def test_alias_column(self): - """ - - :: - - SELECT x, y - FROM - generate_series(:generate_series_1, :generate_series_2) AS x, - generate_series(:generate_series_3, :generate_series_4) AS y - - """ - - x = func.generate_series(1, 2).alias("x") - y = func.generate_series(3, 4).alias("y") - stmt = select(x.column, y.column) - - self.assert_compile( - stmt, - "SELECT x, y FROM " - "generate_series(:generate_series_1, :generate_series_2) AS x, " - "generate_series(:generate_series_3, :generate_series_4) AS y", - ) - - def test_column_valued_one(self): - fn = func.unnest(["one", "two", "three", "four"]).column_valued() - - stmt = select(fn) - - self.assert_compile( - stmt, "SELECT anon_1 FROM unnest(:unnest_1) AS anon_1" - ) - - def test_column_valued_two(self): - """ - - :: - - SELECT x, y - FROM - generate_series(:generate_series_1, :generate_series_2) AS x, - generate_series(:generate_series_3, :generate_series_4) AS y - - """ - - x = func.generate_series(1, 2).column_valued("x") - y = func.generate_series(3, 4).column_valued("y") - stmt = select(x, y) - - self.assert_compile( - stmt, - "SELECT x, y FROM " - "generate_series(:generate_series_1, :generate_series_2) AS x, " - "generate_series(:generate_series_3, :generate_series_4) AS y", - ) - - def test_column_valued_subquery(self): - x = func.generate_series(1, 2).column_valued("x") - y = func.generate_series(3, 4).column_valued("y") - subq = select(x, y).subquery() - stmt = select(subq).where(subq.c.x > 2) - - self.assert_compile( - stmt, - "SELECT anon_1.x, anon_1.y FROM " - "(SELECT x, y FROM " - "generate_series(:generate_series_1, :generate_series_2) AS x, " - "generate_series(:generate_series_3, :generate_series_4) AS y" - ") AS anon_1 " - "WHERE anon_1.x > :x_1", - ) - - def test_ten(self): - """ - # this is the "record" type - - SELECT - table1.user_id AS table1_user_id, - table2.name AS table2_name, - jsonb_table.name AS jsonb_table_name, - count(jsonb_table.time) AS count_1 - FROM table1 - JOIN table2 ON table1.user_id = table2.id - JOIN LATERAL jsonb_to_recordset(table1.jsonb) AS jsonb_table(name TEXT, time FLOAT) ON true - WHERE - table2.route_id = %(route_id_1)s - AND table1.list_id IN (%(list_id_1)s, %(list_id_2)s, %(list_id_3)s) - AND jsonb_table.name IN (%(name_1)s, %(name_2)s, %(name_3)s) - GROUP BY table1.user_id, table2.name, jsonb_table.name - ORDER BY table2.name - - """ # noqa - - def test_function_alias(self): - """ - :: - - SELECT result_elem -> 'Field' as field - FROM "check" AS check_, json_array_elements( - ( - SELECT check_inside.response -> 'Results' - FROM "check" as check_inside - WHERE check_inside.id = check_.id - ) - ) AS result_elem - WHERE result_elem ->> 'Name' = 'FooBar' - - """ - check = table("check", column("id"), column("response", JSON)) - - check_inside = check.alias("check_inside") - check_outside = check.alias("_check") - - subq = ( - select(check_inside.c.response["Results"]) - .where(check_inside.c.id == check_outside.c.id) - .scalar_subquery() - ) - - fn = func.json_array_elements(subq, type_=JSON).alias("result_elem") - - stmt = ( - select(fn.column["Field"].label("field")) - .where(fn.column["Name"] == "FooBar") - .select_from(check_outside) - ) - - self.assert_compile( - stmt, - "SELECT result_elem[:result_elem_1] AS field " - "FROM json_array_elements(" - "(SELECT check_inside.response[:response_1] AS anon_1 " - 'FROM "check" AS check_inside ' - "WHERE check_inside.id = _check.id)" - ') AS result_elem, "check" AS _check ' - "WHERE result_elem[:result_elem_2] = :param_1", - ) - - def test_named_table_valued(self): - - fn = func.json_to_recordset( # noqa - '[{"a":1,"b":"foo"},{"a":"2","c":"bar"}]' - ).named_table_valued(column("a", Integer), column("b", String)) - - stmt = select(fn.c.a, fn.c.b) - - self.assert_compile( - stmt, - "SELECT anon_1.a, anon_1.b " - "FROM json_to_recordset(:json_to_recordset_1) " - "AS anon_1(a INTEGER, b VARCHAR)", - ) - - def test_named_table_valued_subquery(self): - - fn = func.json_to_recordset( # noqa - '[{"a":1,"b":"foo"},{"a":"2","c":"bar"}]' - ).named_table_valued(column("a", Integer), column("b", String)) - - stmt = select(fn.c.a, fn.c.b).subquery() - - stmt = select(stmt) - - self.assert_compile( - stmt, - "SELECT anon_1.a, anon_1.b FROM " - "(SELECT anon_2.a AS a, anon_2.b AS b " - "FROM json_to_recordset(:json_to_recordset_1) " - "AS anon_2(a INTEGER, b VARCHAR)" - ") AS anon_1", - ) - - def test_named_table_valued_alias(self): - - """select * from json_to_recordset - ('[{"a":1,"b":"foo"},{"a":"2","c":"bar"}]') as x(a int, b text);""" - - fn = ( - func.json_to_recordset( # noqa - '[{"a":1,"b":"foo"},{"a":"2","c":"bar"}]' - ) - .named_table_valued(column("a", Integer), column("b", String)) - .alias("jbr") - ) - - stmt = select(fn.c.a, fn.c.b) - - self.assert_compile( - stmt, - "SELECT jbr.a, jbr.b " - "FROM json_to_recordset(:json_to_recordset_1) " - "AS jbr(a INTEGER, b VARCHAR)", - ) - - # continuing from - # https://paquier.xyz/postgresql-2/postgres-9-4-feature-highlight-with-ordinality/ - # https://github.com/sqlalchemy/sqlalchemy/issues/3566 |