diff options
Diffstat (limited to 'test/dialect/postgresql/test_query.py')
-rw-r--r-- | test/dialect/postgresql/test_query.py | 230 |
1 files changed, 0 insertions, 230 deletions
diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py index a2c152501..47f86e791 100644 --- a/test/dialect/postgresql/test_query.py +++ b/test/dialect/postgresql/test_query.py @@ -3,13 +3,11 @@ import datetime from sqlalchemy import and_ -from sqlalchemy import cast from sqlalchemy import Column from sqlalchemy import Date from sqlalchemy import DateTime from sqlalchemy import exc from sqlalchemy import extract -from sqlalchemy import Float from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy import Integer @@ -24,10 +22,8 @@ from sqlalchemy import Table from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import Time -from sqlalchemy import true from sqlalchemy import tuple_ from sqlalchemy.dialects import postgresql -from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.testing import assert_raises from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import AssertsExecutionResults @@ -1101,229 +1097,3 @@ class ExtractTest(fixtures.TablesTest): datetime.timedelta(days=5) + t.c.dtme, overrides={"day": 15, "epoch": 1337084125.0}, ) - - -class TableValuedRoundTripTest(fixtures.TestBase): - __backend__ = True - __only_on__ = "postgresql" - - def test_generate_series_scalar(self, connection): - x = func.generate_series(1, 2).alias("x") - y = func.generate_series(1, 2).alias("y") - - stmt = select(x.column, y.column).join_from(x, y, true()) - - eq_(connection.execute(stmt).all(), [(1, 1), (1, 2), (2, 1), (2, 2)]) - - def test_aggregate_scalar_over_table_valued(self, metadata, connection): - test = Table( - "test", metadata, Column("id", Integer), Column("data", JSONB) - ) - test.create(connection) - - connection.execute( - test.insert(), - [ - {"id": 1, "data": {"key": [23.7, 108.17, 55.98]}}, - {"id": 2, "data": {"key": [2.320, 9.55]}}, - {"id": 3, "data": {"key": [10.5, 6]}}, - ], - ) - - elem = ( - func.jsonb_array_elements_text(test.c.data["key"]) - .table_valued("value") - .alias("elem") - ) - - maxdepth = select(func.max(cast(elem.c.value, Float))).label( - "maxdepth" - ) - - stmt = select(test.c.id.label("test_id"), maxdepth).order_by( - "maxdepth" - ) - - eq_( - connection.execute(stmt).all(), [(2, 9.55), (3, 10.5), (1, 108.17)] - ) - - @testing.fixture - def assets_transactions(self, metadata, connection): - assets_transactions = Table( - "assets_transactions", - metadata, - Column("id", Integer), - Column("contents", JSONB), - ) - assets_transactions.create(connection) - connection.execute( - assets_transactions.insert(), - [ - {"id": 1, "contents": {"k1": "v1"}}, - {"id": 2, "contents": {"k2": "v2"}}, - {"id": 3, "contents": {"k3": "v3"}}, - ], - ) - return assets_transactions - - def test_scalar_table_valued(self, assets_transactions, connection): - stmt = select( - assets_transactions.c.id, - func.jsonb_each( - assets_transactions.c.contents, type_=JSONB - ).scalar_table_valued("key"), - func.jsonb_each( - assets_transactions.c.contents, type_=JSONB - ).scalar_table_valued("value"), - ) - - eq_( - connection.execute(stmt).all(), - [(1, "k1", "v1"), (2, "k2", "v2"), (3, "k3", "v3")], - ) - - def test_table_valued(self, assets_transactions, connection): - - jb = func.jsonb_each(assets_transactions.c.contents).table_valued( - "key", "value" - ) - - stmt = select(assets_transactions.c.id, jb.c.key, jb.c.value).join( - jb, true() - ) - eq_( - connection.execute(stmt).all(), - [(1, "k1", "v1"), (2, "k2", "v2"), (3, "k3", "v3")], - ) - - @testing.fixture - def axy_table(self, metadata, connection): - a = Table( - "a", - metadata, - Column("id", Integer), - Column("x", Integer), - Column("y", Integer), - ) - a.create(connection) - connection.execute( - a.insert(), - [ - {"id": 1, "x": 5, "y": 4}, - {"id": 2, "x": 15, "y": 3}, - {"id": 3, "x": 7, "y": 9}, - ], - ) - - return a - - def test_function_against_table_record(self, axy_table, connection): - """ - SELECT row_to_json(anon_1) AS row_to_json_1 - FROM (SELECT a.id AS id, a.x AS x, a.y AS y - FROM a) AS anon_1 - - """ - - stmt = select(func.row_to_json(axy_table.record())) - - eq_( - connection.execute(stmt).scalars().all(), - [ - {"id": 1, "x": 5, "y": 4}, - {"id": 2, "x": 15, "y": 3}, - {"id": 3, "x": 7, "y": 9}, - ], - ) - - def test_function_against_subq_record(self, axy_table, connection): - """ - SELECT row_to_json(anon_1) AS row_to_json_1 - FROM (SELECT a.id AS id, a.x AS x, a.y AS y - FROM a) AS anon_1 - - """ - - stmt = select(func.row_to_json(axy_table.select().subquery().record())) - - eq_( - connection.execute(stmt).scalars().all(), - [ - {"id": 1, "x": 5, "y": 4}, - {"id": 2, "x": 15, "y": 3}, - {"id": 3, "x": 7, "y": 9}, - ], - ) - - def test_function_against_row_constructor(self, connection): - - stmt = select(func.row_to_json(func.row(1, "foo"))) - - eq_(connection.scalar(stmt), {"f1": 1, "f2": "foo"}) - - def test_with_ordinality_named(self, connection): - - stmt = select( - func.generate_series(4, 1, -1).named_table_valued( - "gs", with_ordinality="ordinality" - ) - ) - - eq_(connection.execute(stmt).all(), [(4, 1), (3, 2), (2, 3), (1, 4)]) - - def test_with_ordinality_star(self, connection): - - stmt = select("*").select_from( - func.generate_series(4, 1, -1).table_valued( - with_ordinality="ordinality" - ) - ) - - eq_(connection.execute(stmt).all(), [(4, 1), (3, 2), (2, 3), (1, 4)]) - - def test_plain_old_unnest(self, connection): - fn = func.unnest( - postgresql.array(["one", "two", "three", "four"]) - ).column_valued() - - stmt = select(fn) - - eq_( - connection.execute(stmt).all(), - [("one",), ("two",), ("three",), ("four",)], - ) - - def test_unnest_with_ordinality(self, connection): - - array_val = postgresql.array( - [postgresql.array([14, 41, 7]), postgresql.array([54, 9, 49])] - ) - stmt = select("*").select_from( - func.unnest(array_val) - .named_table_valued("elts", with_ordinality="num") - .alias("t") - ) - eq_( - connection.execute(stmt).all(), - [(14, 1), (41, 2), (7, 3), (54, 4), (9, 5), (49, 6)], - ) - - def test_unnest_with_ordinality_named(self, connection): - - array_val = postgresql.array( - [postgresql.array([14, 41, 7]), postgresql.array([54, 9, 49])] - ) - - fn = ( - func.unnest(array_val) - .named_table_valued("elts", with_ordinality="num") - .alias("t") - ) - - stmt = select(fn.c.elts, fn.c.num) - - eq_( - connection.execute(stmt).all(), - [(14, 1), (41, 2), (7, 3), (54, 4), (9, 5), (49, 6)], - ) |