diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-07-19 10:50:05 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-07-19 12:37:26 -0400 |
| commit | 807d4945d2c1e2f2e1c77ac9ee6e84af330706a3 (patch) | |
| tree | 4bb43a5fa68f8515eb965fc863e6bbbc21429d4f /test/sql | |
| parent | 42c0928a731954649e95a7ee56b6709b1ec59aed (diff) | |
| download | sqlalchemy-807d4945d2c1e2f2e1c77ac9ee6e84af330706a3.tar.gz | |
check for TypeDecorator when handling getitem
Fixed issue where :class:`.TypeDecorator` would not correctly proxy the
``__getitem__()`` operator when decorating the :class:`.ARRAY` datatype,
without explicit workarounds.
Fixes: #7249
Change-Id: I3273572b4757e41fb5952639cb867314227d368a
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_types.py | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/test/sql/test_types.py b/test/sql/test_types.py index a154666bb..a608d0040 100644 --- a/test/sql/test_types.py +++ b/test/sql/test_types.py @@ -782,6 +782,136 @@ class UserDefinedRoundTripTest(_UserDefinedTypeFixture, fixtures.TablesTest): eq_(result.fetchall(), [(3, 1500), (4, 900)]) +class TypeDecoratorSpecialCasesTest(AssertsCompiledSQL, fixtures.TestBase): + __backend__ = True + + @testing.requires.array_type + def test_typedec_of_array_modified(self, metadata, connection): + """test #7249""" + + class SkipsFirst(TypeDecorator): # , Indexable): + impl = ARRAY(Integer, zero_indexes=True) + + cache_ok = True + + def process_bind_param(self, value, dialect): + return value[1:] + + def copy(self, **kw): + return SkipsFirst(**kw) + + def coerce_compared_value(self, op, value): + return self.impl.coerce_compared_value(op, value) + + t = Table( + "t", + metadata, + Column("id", Integer, primary_key=True), + Column("data", SkipsFirst), + ) + t.create(connection) + + connection.execute(t.insert(), {"data": [1, 2, 3]}) + val = connection.scalar(select(t.c.data)) + eq_(val, [2, 3]) + + val = connection.scalar(select(t.c.data[0])) + eq_(val, 2) + + def test_typedec_of_array_ops(self): + class ArrayDec(TypeDecorator): + impl = ARRAY(Integer, zero_indexes=True) + + cache_ok = True + + def coerce_compared_value(self, op, value): + return self.impl.coerce_compared_value(op, value) + + expr1 = column("q", ArrayDec)[0] + expr2 = column("q", ARRAY(Integer, zero_indexes=True))[0] + + eq_(expr1.right.type._type_affinity, Integer) + eq_(expr2.right.type._type_affinity, Integer) + + self.assert_compile( + column("q", ArrayDec).any(7, operator=operators.lt), + "%(q_1)s < ANY (q)", + dialect="postgresql", + ) + + self.assert_compile( + column("q", ArrayDec)[5], "q[%(q_1)s]", dialect="postgresql" + ) + + def test_typedec_of_json_ops(self): + class JsonDec(TypeDecorator): + impl = JSON() + + cache_ok = True + + self.assert_compile( + column("q", JsonDec)["q"], "q -> %(q_1)s", dialect="postgresql" + ) + + self.assert_compile( + column("q", JsonDec)["q"].as_integer(), + "CAST(q ->> %(q_1)s AS INTEGER)", + dialect="postgresql", + ) + + @testing.requires.array_type + def test_typedec_of_array(self, metadata, connection): + """test #7249""" + + class ArrayDec(TypeDecorator): + impl = ARRAY(Integer, zero_indexes=True) + + cache_ok = True + + def coerce_compared_value(self, op, value): + return self.impl.coerce_compared_value(op, value) + + t = Table( + "t", + metadata, + Column("id", Integer, primary_key=True), + Column("data", ArrayDec), + ) + + t.create(connection) + + connection.execute(t.insert(), {"data": [1, 2, 3]}) + val = connection.scalar(select(t.c.data)) + eq_(val, [1, 2, 3]) + + val = connection.scalar(select(t.c.data[0])) + eq_(val, 1) + + @testing.requires.json_type + def test_typedec_of_json(self, metadata, connection): + """test #7249""" + + class JsonDec(TypeDecorator): + impl = JSON() + + cache_ok = True + + t = Table( + "t", + metadata, + Column("id", Integer, primary_key=True), + Column("data", JsonDec), + ) + t.create(connection) + + connection.execute(t.insert(), {"data": {"key": "value"}}) + val = connection.scalar(select(t.c.data)) + eq_(val, {"key": "value"}) + + val = connection.scalar(select(t.c.data["key"].as_string())) + eq_(val, "value") + + class BindProcessorInsertValuesTest(UserDefinedRoundTripTest): """related to #6770, test that insert().values() applies to bound parameter handlers including the None value.""" |
