summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-07-19 10:50:05 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-07-19 12:37:26 -0400
commit807d4945d2c1e2f2e1c77ac9ee6e84af330706a3 (patch)
tree4bb43a5fa68f8515eb965fc863e6bbbc21429d4f /test/sql
parent42c0928a731954649e95a7ee56b6709b1ec59aed (diff)
downloadsqlalchemy-807d4945d2c1e2f2e1c77ac9ee6e84af330706a3.tar.gz
check for TypeDecorator when handling getitem
Fixed issue where :class:`.TypeDecorator` would not correctly proxy the ``__getitem__()`` operator when decorating the :class:`.ARRAY` datatype, without explicit workarounds. Fixes: #7249 Change-Id: I3273572b4757e41fb5952639cb867314227d368a
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_types.py130
1 files changed, 130 insertions, 0 deletions
diff --git a/test/sql/test_types.py b/test/sql/test_types.py
index a154666bb..a608d0040 100644
--- a/test/sql/test_types.py
+++ b/test/sql/test_types.py
@@ -782,6 +782,136 @@ class UserDefinedRoundTripTest(_UserDefinedTypeFixture, fixtures.TablesTest):
eq_(result.fetchall(), [(3, 1500), (4, 900)])
+class TypeDecoratorSpecialCasesTest(AssertsCompiledSQL, fixtures.TestBase):
+ __backend__ = True
+
+ @testing.requires.array_type
+ def test_typedec_of_array_modified(self, metadata, connection):
+ """test #7249"""
+
+ class SkipsFirst(TypeDecorator): # , Indexable):
+ impl = ARRAY(Integer, zero_indexes=True)
+
+ cache_ok = True
+
+ def process_bind_param(self, value, dialect):
+ return value[1:]
+
+ def copy(self, **kw):
+ return SkipsFirst(**kw)
+
+ def coerce_compared_value(self, op, value):
+ return self.impl.coerce_compared_value(op, value)
+
+ t = Table(
+ "t",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", SkipsFirst),
+ )
+ t.create(connection)
+
+ connection.execute(t.insert(), {"data": [1, 2, 3]})
+ val = connection.scalar(select(t.c.data))
+ eq_(val, [2, 3])
+
+ val = connection.scalar(select(t.c.data[0]))
+ eq_(val, 2)
+
+ def test_typedec_of_array_ops(self):
+ class ArrayDec(TypeDecorator):
+ impl = ARRAY(Integer, zero_indexes=True)
+
+ cache_ok = True
+
+ def coerce_compared_value(self, op, value):
+ return self.impl.coerce_compared_value(op, value)
+
+ expr1 = column("q", ArrayDec)[0]
+ expr2 = column("q", ARRAY(Integer, zero_indexes=True))[0]
+
+ eq_(expr1.right.type._type_affinity, Integer)
+ eq_(expr2.right.type._type_affinity, Integer)
+
+ self.assert_compile(
+ column("q", ArrayDec).any(7, operator=operators.lt),
+ "%(q_1)s < ANY (q)",
+ dialect="postgresql",
+ )
+
+ self.assert_compile(
+ column("q", ArrayDec)[5], "q[%(q_1)s]", dialect="postgresql"
+ )
+
+ def test_typedec_of_json_ops(self):
+ class JsonDec(TypeDecorator):
+ impl = JSON()
+
+ cache_ok = True
+
+ self.assert_compile(
+ column("q", JsonDec)["q"], "q -> %(q_1)s", dialect="postgresql"
+ )
+
+ self.assert_compile(
+ column("q", JsonDec)["q"].as_integer(),
+ "CAST(q ->> %(q_1)s AS INTEGER)",
+ dialect="postgresql",
+ )
+
+ @testing.requires.array_type
+ def test_typedec_of_array(self, metadata, connection):
+ """test #7249"""
+
+ class ArrayDec(TypeDecorator):
+ impl = ARRAY(Integer, zero_indexes=True)
+
+ cache_ok = True
+
+ def coerce_compared_value(self, op, value):
+ return self.impl.coerce_compared_value(op, value)
+
+ t = Table(
+ "t",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", ArrayDec),
+ )
+
+ t.create(connection)
+
+ connection.execute(t.insert(), {"data": [1, 2, 3]})
+ val = connection.scalar(select(t.c.data))
+ eq_(val, [1, 2, 3])
+
+ val = connection.scalar(select(t.c.data[0]))
+ eq_(val, 1)
+
+ @testing.requires.json_type
+ def test_typedec_of_json(self, metadata, connection):
+ """test #7249"""
+
+ class JsonDec(TypeDecorator):
+ impl = JSON()
+
+ cache_ok = True
+
+ t = Table(
+ "t",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", JsonDec),
+ )
+ t.create(connection)
+
+ connection.execute(t.insert(), {"data": {"key": "value"}})
+ val = connection.scalar(select(t.c.data))
+ eq_(val, {"key": "value"})
+
+ val = connection.scalar(select(t.c.data["key"].as_string()))
+ eq_(val, "value")
+
+
class BindProcessorInsertValuesTest(UserDefinedRoundTripTest):
"""related to #6770, test that insert().values() applies to
bound parameter handlers including the None value."""