summaryrefslogtreecommitdiff
path: root/test/dialect/postgresql/test_query.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/dialect/postgresql/test_query.py')
-rw-r--r--test/dialect/postgresql/test_query.py230
1 files changed, 0 insertions, 230 deletions
diff --git a/test/dialect/postgresql/test_query.py b/test/dialect/postgresql/test_query.py
index a2c152501..47f86e791 100644
--- a/test/dialect/postgresql/test_query.py
+++ b/test/dialect/postgresql/test_query.py
@@ -3,13 +3,11 @@
import datetime
from sqlalchemy import and_
-from sqlalchemy import cast
from sqlalchemy import Column
from sqlalchemy import Date
from sqlalchemy import DateTime
from sqlalchemy import exc
from sqlalchemy import extract
-from sqlalchemy import Float
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import Integer
@@ -24,10 +22,8 @@ from sqlalchemy import Table
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import Time
-from sqlalchemy import true
from sqlalchemy import tuple_
from sqlalchemy.dialects import postgresql
-from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import AssertsExecutionResults
@@ -1101,229 +1097,3 @@ class ExtractTest(fixtures.TablesTest):
datetime.timedelta(days=5) + t.c.dtme,
overrides={"day": 15, "epoch": 1337084125.0},
)
-
-
-class TableValuedRoundTripTest(fixtures.TestBase):
- __backend__ = True
- __only_on__ = "postgresql"
-
- def test_generate_series_scalar(self, connection):
- x = func.generate_series(1, 2).alias("x")
- y = func.generate_series(1, 2).alias("y")
-
- stmt = select(x.column, y.column).join_from(x, y, true())
-
- eq_(connection.execute(stmt).all(), [(1, 1), (1, 2), (2, 1), (2, 2)])
-
- def test_aggregate_scalar_over_table_valued(self, metadata, connection):
- test = Table(
- "test", metadata, Column("id", Integer), Column("data", JSONB)
- )
- test.create(connection)
-
- connection.execute(
- test.insert(),
- [
- {"id": 1, "data": {"key": [23.7, 108.17, 55.98]}},
- {"id": 2, "data": {"key": [2.320, 9.55]}},
- {"id": 3, "data": {"key": [10.5, 6]}},
- ],
- )
-
- elem = (
- func.jsonb_array_elements_text(test.c.data["key"])
- .table_valued("value")
- .alias("elem")
- )
-
- maxdepth = select(func.max(cast(elem.c.value, Float))).label(
- "maxdepth"
- )
-
- stmt = select(test.c.id.label("test_id"), maxdepth).order_by(
- "maxdepth"
- )
-
- eq_(
- connection.execute(stmt).all(), [(2, 9.55), (3, 10.5), (1, 108.17)]
- )
-
- @testing.fixture
- def assets_transactions(self, metadata, connection):
- assets_transactions = Table(
- "assets_transactions",
- metadata,
- Column("id", Integer),
- Column("contents", JSONB),
- )
- assets_transactions.create(connection)
- connection.execute(
- assets_transactions.insert(),
- [
- {"id": 1, "contents": {"k1": "v1"}},
- {"id": 2, "contents": {"k2": "v2"}},
- {"id": 3, "contents": {"k3": "v3"}},
- ],
- )
- return assets_transactions
-
- def test_scalar_table_valued(self, assets_transactions, connection):
- stmt = select(
- assets_transactions.c.id,
- func.jsonb_each(
- assets_transactions.c.contents, type_=JSONB
- ).scalar_table_valued("key"),
- func.jsonb_each(
- assets_transactions.c.contents, type_=JSONB
- ).scalar_table_valued("value"),
- )
-
- eq_(
- connection.execute(stmt).all(),
- [(1, "k1", "v1"), (2, "k2", "v2"), (3, "k3", "v3")],
- )
-
- def test_table_valued(self, assets_transactions, connection):
-
- jb = func.jsonb_each(assets_transactions.c.contents).table_valued(
- "key", "value"
- )
-
- stmt = select(assets_transactions.c.id, jb.c.key, jb.c.value).join(
- jb, true()
- )
- eq_(
- connection.execute(stmt).all(),
- [(1, "k1", "v1"), (2, "k2", "v2"), (3, "k3", "v3")],
- )
-
- @testing.fixture
- def axy_table(self, metadata, connection):
- a = Table(
- "a",
- metadata,
- Column("id", Integer),
- Column("x", Integer),
- Column("y", Integer),
- )
- a.create(connection)
- connection.execute(
- a.insert(),
- [
- {"id": 1, "x": 5, "y": 4},
- {"id": 2, "x": 15, "y": 3},
- {"id": 3, "x": 7, "y": 9},
- ],
- )
-
- return a
-
- def test_function_against_table_record(self, axy_table, connection):
- """
- SELECT row_to_json(anon_1) AS row_to_json_1
- FROM (SELECT a.id AS id, a.x AS x, a.y AS y
- FROM a) AS anon_1
-
- """
-
- stmt = select(func.row_to_json(axy_table.record()))
-
- eq_(
- connection.execute(stmt).scalars().all(),
- [
- {"id": 1, "x": 5, "y": 4},
- {"id": 2, "x": 15, "y": 3},
- {"id": 3, "x": 7, "y": 9},
- ],
- )
-
- def test_function_against_subq_record(self, axy_table, connection):
- """
- SELECT row_to_json(anon_1) AS row_to_json_1
- FROM (SELECT a.id AS id, a.x AS x, a.y AS y
- FROM a) AS anon_1
-
- """
-
- stmt = select(func.row_to_json(axy_table.select().subquery().record()))
-
- eq_(
- connection.execute(stmt).scalars().all(),
- [
- {"id": 1, "x": 5, "y": 4},
- {"id": 2, "x": 15, "y": 3},
- {"id": 3, "x": 7, "y": 9},
- ],
- )
-
- def test_function_against_row_constructor(self, connection):
-
- stmt = select(func.row_to_json(func.row(1, "foo")))
-
- eq_(connection.scalar(stmt), {"f1": 1, "f2": "foo"})
-
- def test_with_ordinality_named(self, connection):
-
- stmt = select(
- func.generate_series(4, 1, -1).named_table_valued(
- "gs", with_ordinality="ordinality"
- )
- )
-
- eq_(connection.execute(stmt).all(), [(4, 1), (3, 2), (2, 3), (1, 4)])
-
- def test_with_ordinality_star(self, connection):
-
- stmt = select("*").select_from(
- func.generate_series(4, 1, -1).table_valued(
- with_ordinality="ordinality"
- )
- )
-
- eq_(connection.execute(stmt).all(), [(4, 1), (3, 2), (2, 3), (1, 4)])
-
- def test_plain_old_unnest(self, connection):
- fn = func.unnest(
- postgresql.array(["one", "two", "three", "four"])
- ).column_valued()
-
- stmt = select(fn)
-
- eq_(
- connection.execute(stmt).all(),
- [("one",), ("two",), ("three",), ("four",)],
- )
-
- def test_unnest_with_ordinality(self, connection):
-
- array_val = postgresql.array(
- [postgresql.array([14, 41, 7]), postgresql.array([54, 9, 49])]
- )
- stmt = select("*").select_from(
- func.unnest(array_val)
- .named_table_valued("elts", with_ordinality="num")
- .alias("t")
- )
- eq_(
- connection.execute(stmt).all(),
- [(14, 1), (41, 2), (7, 3), (54, 4), (9, 5), (49, 6)],
- )
-
- def test_unnest_with_ordinality_named(self, connection):
-
- array_val = postgresql.array(
- [postgresql.array([14, 41, 7]), postgresql.array([54, 9, 49])]
- )
-
- fn = (
- func.unnest(array_val)
- .named_table_valued("elts", with_ordinality="num")
- .alias("t")
- )
-
- stmt = select(fn.c.elts, fn.c.num)
-
- eq_(
- connection.execute(stmt).all(),
- [(14, 1), (41, 2), (7, 3), (54, 4), (9, 5), (49, 6)],
- )