from decimal import Decimal from sqlalchemy import column from sqlalchemy import exc from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy import insert from sqlalchemy import inspect from sqlalchemy import Integer from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL from sqlalchemy import literal_column from sqlalchemy import Numeric from sqlalchemy import select from sqlalchemy import String from sqlalchemy import testing from sqlalchemy.ext import hybrid from sqlalchemy.orm import aliased from sqlalchemy.orm import column_property from sqlalchemy.orm import declarative_base from sqlalchemy.orm import declared_attr from sqlalchemy.orm import relationship from sqlalchemy.orm import Session from sqlalchemy.orm import synonym from sqlalchemy.sql import coercions from sqlalchemy.sql import operators from sqlalchemy.sql import roles from sqlalchemy.sql import update from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_false from sqlalchemy.testing import is_not from sqlalchemy.testing.fixtures import fixture_session from sqlalchemy.testing.schema import Column class PropertyComparatorTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _fixture(self, use_inplace=False, use_classmethod=False): Base = declarative_base() class UCComparator(hybrid.Comparator): def __eq__(self, other): if other is None: return self.expression is None else: return func.upper(self.expression) == func.upper(other) class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) _value = Column("value", String) @hybrid.hybrid_property def value(self): "This is a docstring" return self._value - 5 if use_classmethod: if use_inplace: @value.inplace.comparator @classmethod def _value_comparator(cls): return UCComparator(cls._value) else: @value.comparator @classmethod def value(cls): return UCComparator(cls._value) else: if use_inplace: @value.inplace.comparator def _value_comparator(cls): return UCComparator(cls._value) else: @value.comparator def value(cls): return UCComparator(cls._value) @value.setter def value(self, v): self._value = v + 5 return A def test_set_get(self): A = self._fixture() a1 = A(value=5) eq_(a1._value, 10) eq_(a1.value, 5) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_value(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) eq_(str(A.value == 5), "upper(a.value) = upper(:upper_1)") @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_aliased_value(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) eq_(str(aliased(A).value == 5), "upper(a_1.value) = upper(:upper_1)") @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_query(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) sess = fixture_session() self.assert_compile( sess.query(A.value), "SELECT a.value AS a_value FROM a" ) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_aliased_query(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) sess = fixture_session() self.assert_compile( sess.query(aliased(A).value), "SELECT a_1.value AS a_1_value FROM a AS a_1", ) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_aliased_filter(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) sess = fixture_session() self.assert_compile( sess.query(aliased(A)).filter_by(value="foo"), "SELECT a_1.id AS a_1_id, a_1.value AS a_1_value " "FROM a AS a_1 WHERE upper(a_1.value) = upper(:upper_1)", ) def test_docstring(self): A = self._fixture() eq_(A.value.__doc__, "This is a docstring") def test_no_name_one(self): """test :ticket:`6215`""" Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) name = Column(String(50)) @hybrid.hybrid_property def same_name(self): return self.id def name1(self): return self.id different_name = hybrid.hybrid_property(name1) no_name = hybrid.hybrid_property(lambda self: self.name) stmt = select(A.same_name, A.different_name, A.no_name) compiled = stmt.compile() eq_( [ent._label_name for ent in compiled.compile_state._entities], ["same_name", "id", "name"], ) def test_no_name_two(self): """test :ticket:`6215`""" Base = declarative_base() class SomeMixin: @hybrid.hybrid_property def same_name(self): return self.id def name1(self): return self.id different_name = hybrid.hybrid_property(name1) no_name = hybrid.hybrid_property(lambda self: self.name) class A(SomeMixin, Base): __tablename__ = "a" id = Column(Integer, primary_key=True) name = Column(String(50)) stmt = select(A.same_name, A.different_name, A.no_name) compiled = stmt.compile() eq_( [ent._label_name for ent in compiled.compile_state._entities], ["same_name", "id", "name"], ) def test_custom_op(self, registry): """test #3162""" my_op = operators.custom_op( "my_op", python_impl=lambda a, b: a + "_foo_" + b ) @registry.mapped class SomeClass: __tablename__ = "sc" id = Column(Integer, primary_key=True) data = Column(String) @hybrid.hybrid_property def foo_data(self): return my_op(self.data, "bar") eq_(SomeClass(data="data").foo_data, "data_foo_bar") self.assert_compile(SomeClass.foo_data, "sc.data my_op :data_1") class PropertyExpressionTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _fixture(self, use_inplace=False, use_classmethod=False): use_inplace, use_classmethod = bool(use_inplace), bool(use_classmethod) Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) _value = Column("value", String) @hybrid.hybrid_property def value(self): "This is an instance-level docstring" return int(self._value) - 5 @value.setter def value(self, v): self._value = v + 5 if use_classmethod: if use_inplace: @value.inplace.expression @classmethod def _value_expr(cls): "This is a class-level docstring" return func.foo(cls._value) + cls.bar_value else: @value.expression @classmethod def value(cls): "This is a class-level docstring" return func.foo(cls._value) + cls.bar_value else: if use_inplace: @value.inplace.expression def _value_expr(cls): "This is a class-level docstring" return func.foo(cls._value) + cls.bar_value else: @value.expression def value(cls): "This is a class-level docstring" return func.foo(cls._value) + cls.bar_value @hybrid.hybrid_property def bar_value(cls): return func.bar(cls._value) return A def _wrong_expr_fixture(self): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) _value = Column("value", String) @hybrid.hybrid_property def value(self): return self._value is not None @value.expression def value(cls): return cls._value is not None return A def _relationship_fixture(self): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) b_id = Column("bid", Integer, ForeignKey("b.id")) _value = Column("value", String) @hybrid.hybrid_property def value(self): return int(self._value) - 5 @value.expression def value(cls): return func.foo(cls._value) + cls.bar_value @value.setter def value(self, v): self._value = v + 5 @hybrid.hybrid_property def bar_value(cls): return func.bar(cls._value) class B(Base): __tablename__ = "b" id = Column(Integer, primary_key=True) as_ = relationship("A") return A, B @testing.fixture def _related_polymorphic_attr_fixture(self): """test for #7425""" Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) bs = relationship("B", back_populates="a", lazy="joined") class B(Base): __tablename__ = "poly" __mapper_args__ = { "polymorphic_on": "type", # if with_polymorphic is removed, issue does not occur "with_polymorphic": "*", } name = Column(String, primary_key=True) type = Column(String) a_id = Column(ForeignKey(A.id)) a = relationship(A, back_populates="bs") @hybrid.hybrid_property def is_foo(self): return self.name == "foo" return A, B def test_cloning_in_polymorphic_any( self, _related_polymorphic_attr_fixture ): A, B = _related_polymorphic_attr_fixture session = fixture_session() # in the polymorphic case, A.bs.any() does a traverse() / clone() # on the expression. so the proxedattribute coming from the hybrid # has to support this. self.assert_compile( session.query(A).filter(A.bs.any(B.name == "foo")), "SELECT a.id AS a_id, poly_1.name AS poly_1_name, poly_1.type " "AS poly_1_type, poly_1.a_id AS poly_1_a_id FROM a " "LEFT OUTER JOIN poly AS poly_1 ON a.id = poly_1.a_id " "WHERE EXISTS (SELECT 1 FROM poly WHERE a.id = poly.a_id " "AND poly.name = :name_1)", ) # SQL should be identical self.assert_compile( session.query(A).filter(A.bs.any(B.is_foo)), "SELECT a.id AS a_id, poly_1.name AS poly_1_name, poly_1.type " "AS poly_1_type, poly_1.a_id AS poly_1_a_id FROM a " "LEFT OUTER JOIN poly AS poly_1 ON a.id = poly_1.a_id " "WHERE EXISTS (SELECT 1 FROM poly WHERE a.id = poly.a_id " "AND poly.name = :name_1)", ) @testing.fixture def _unnamed_expr_fixture(self): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) firstname = Column(String) lastname = Column(String) @hybrid.hybrid_property def name(self): return self.firstname + " " + self.lastname return A def test_access_from_unmapped(self): """test #9519""" class DnsRecord: name = Column("name", String) @hybrid.hybrid_property def ip_value(self): return self.name[1:3] @ip_value.expression def ip_value(cls): return func.substring(cls.name, 1, 3) raw_attr = DnsRecord.ip_value is_(raw_attr._parententity, None) self.assert_compile( raw_attr, "substring(name, :substring_1, :substring_2)" ) self.assert_compile( select(DnsRecord.ip_value), "SELECT substring(name, :substring_2, :substring_3) " "AS substring_1", ) def test_access_from_not_yet_mapped(self, decl_base): """test #9519""" class DnsRecord(decl_base): __tablename__ = "dnsrecord" id = Column(Integer, primary_key=True) name = Column(String, unique=False, nullable=False) @declared_attr def thing(cls): return column_property(cls.ip_value) name = Column("name", String) @hybrid.hybrid_property def ip_value(self): return self.name[1:3] @ip_value.expression def ip_value(cls): return func.substring(cls.name, 1, 3) self.assert_compile( select(DnsRecord.thing), "SELECT substring(dnsrecord.name, :substring_2, :substring_3) " "AS substring_1 FROM dnsrecord", ) def test_labeling_for_unnamed(self, _unnamed_expr_fixture): A = _unnamed_expr_fixture stmt = select(A.id, A.name) self.assert_compile( stmt, "SELECT a.id, a.firstname || :firstname_1 || a.lastname AS name " "FROM a", ) eq_(stmt.subquery().c.keys(), ["id", "name"]) self.assert_compile( select(stmt.subquery()), "SELECT anon_1.id, anon_1.name " "FROM (SELECT a.id AS id, a.firstname || :firstname_1 || " "a.lastname AS name FROM a) AS anon_1", ) def test_labeling_for_unnamed_tablename_plus_col( self, _unnamed_expr_fixture ): A = _unnamed_expr_fixture stmt = select(A.id, A.name).set_label_style( LABEL_STYLE_TABLENAME_PLUS_COL ) # looks like legacy query self.assert_compile( stmt, "SELECT a.id AS a_id, a.firstname || :firstname_1 || " "a.lastname AS name FROM a", ) eq_(stmt.subquery().c.keys(), ["a_id", "name"]) self.assert_compile( select(stmt.subquery()), "SELECT anon_1.a_id, anon_1.name FROM (SELECT a.id AS a_id, " "a.firstname || :firstname_1 || a.lastname AS name FROM a) " "AS anon_1", ) def test_labeling_for_unnamed_legacy(self, _unnamed_expr_fixture): A = _unnamed_expr_fixture sess = fixture_session() stmt = sess.query(A.id, A.name) self.assert_compile( stmt, "SELECT a.id AS a_id, a.firstname || " ":firstname_1 || a.lastname AS name FROM a", ) # for the subquery, we lose the "ORM-ness" from the subquery # so we have to carry it over using _proxy_key eq_(stmt.subquery().c.keys(), ["id", "name"]) self.assert_compile( sess.query(stmt.subquery()), "SELECT anon_1.id AS anon_1_id, anon_1.name AS anon_1_name " "FROM (SELECT a.id AS id, a.firstname || :firstname_1 || " "a.lastname AS name FROM a) AS anon_1", ) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_info(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) inspect(A).all_orm_descriptors.value.info["some key"] = "some value" eq_( inspect(A).all_orm_descriptors.value.info, {"some key": "some value"}, ) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_set_get(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) a1 = A(value=5) eq_(a1._value, 10) eq_(a1.value, 5) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_expression(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) self.assert_compile( A.value.__clause_element__(), "foo(a.value) + bar(a.value)" ) def test_expression_isnt_clause_element(self): A = self._wrong_expr_fixture() with testing.expect_raises_message( exc.InvalidRequestError, 'When interpreting attribute "A.value" as a SQL expression, ' r"expected __clause_element__\(\) to return a " "ClauseElement object, got: True", ): coercions.expect(roles.ExpressionElementRole, A.value) def test_any(self): A, B = self._relationship_fixture() sess = fixture_session() self.assert_compile( sess.query(B).filter(B.as_.any(value=5)), "SELECT b.id AS b_id FROM b WHERE EXISTS " "(SELECT 1 FROM a WHERE b.id = a.bid " "AND foo(a.value) + bar(a.value) = :param_1)", ) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_aliased_expression(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) self.assert_compile( aliased(A).value.__clause_element__(), "foo(a_1.value) + bar(a_1.value)", ) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_query(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) sess = fixture_session() self.assert_compile( sess.query(A).filter_by(value="foo"), "SELECT a.id AS a_id, a.value AS a_value " "FROM a WHERE foo(a.value) + bar(a.value) = :param_1", ) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_aliased_query(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) sess = fixture_session() self.assert_compile( sess.query(aliased(A)).filter_by(value="foo"), "SELECT a_1.id AS a_1_id, a_1.value AS a_1_value " "FROM a AS a_1 WHERE foo(a_1.value) + bar(a_1.value) = :param_1", ) @testing.variation("use_inplace", [True, False]) @testing.variation("use_classmethod", [True, False]) def test_docstring(self, use_inplace, use_classmethod): A = self._fixture( use_inplace=use_inplace, use_classmethod=use_classmethod ) eq_(A.value.__doc__, "This is a class-level docstring") # no docstring here since we get a literal a1 = A(_value=10) eq_(a1.value, 5) class PropertyValueTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _fixture(self, assignable): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) _value = Column("value", String) @hybrid.hybrid_property def value(self): return self._value - 5 if assignable: @value.setter def value(self, v): self._value = v + 5 return A def test_nonassignable(self): A = self._fixture(False) a1 = A(_value=5) assert_raises_message( AttributeError, "can't set attribute", setattr, a1, "value", 10 ) def test_nondeletable(self): A = self._fixture(False) a1 = A(_value=5) assert_raises_message( AttributeError, "can't delete attribute", delattr, a1, "value" ) def test_set_get(self): A = self._fixture(True) a1 = A(value=5) eq_(a1.value, 5) eq_(a1._value, 10) class PropertyOverrideTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _fixture(self): Base = declarative_base() class Person(Base): __tablename__ = "person" id = Column(Integer, primary_key=True) _name = Column(String) @hybrid.hybrid_property def name(self): return self._name @name.setter def name(self, value): self._name = value.title() class OverrideSetter(Person): __tablename__ = "override_setter" id = Column(Integer, ForeignKey("person.id"), primary_key=True) other = Column(String) @Person.name.setter def name(self, value): self._name = value.upper() class OverrideGetter(Person): __tablename__ = "override_getter" id = Column(Integer, ForeignKey("person.id"), primary_key=True) other = Column(String) @Person.name.getter def name(self): return "Hello " + self._name class OverrideExpr(Person): __tablename__ = "override_expr" id = Column(Integer, ForeignKey("person.id"), primary_key=True) other = Column(String) @Person.name.overrides.expression def name(self): return func.concat("Hello", self._name) class FooComparator(hybrid.Comparator): def __clause_element__(self): return func.concat("Hello", self.expression._name) class OverrideComparator(Person): __tablename__ = "override_comp" id = Column(Integer, ForeignKey("person.id"), primary_key=True) other = Column(String) @Person.name.overrides.comparator def name(self): return FooComparator(self) return ( Person, OverrideSetter, OverrideGetter, OverrideExpr, OverrideComparator, ) def test_property(self): Person, _, _, _, _ = self._fixture() p1 = Person() p1.name = "mike" eq_(p1._name, "Mike") eq_(p1.name, "Mike") def test_override_setter(self): _, OverrideSetter, _, _, _ = self._fixture() p1 = OverrideSetter() p1.name = "mike" eq_(p1._name, "MIKE") eq_(p1.name, "MIKE") def test_override_getter(self): _, _, OverrideGetter, _, _ = self._fixture() p1 = OverrideGetter() p1.name = "mike" eq_(p1._name, "Mike") eq_(p1.name, "Hello Mike") def test_override_expr(self): Person, _, _, OverrideExpr, _ = self._fixture() self.assert_compile(Person.name.__clause_element__(), "person._name") self.assert_compile( OverrideExpr.name.__clause_element__(), "concat(:concat_1, person._name)", ) def test_override_comparator(self): Person, _, _, _, OverrideComparator = self._fixture() self.assert_compile(Person.name.__clause_element__(), "person._name") self.assert_compile( OverrideComparator.name.__clause_element__(), "concat(:concat_1, person._name)", ) class PropertyMirrorTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _fixture(self): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) _value = Column("value", String) @hybrid.hybrid_property def value(self): "This is an instance-level docstring" return self._value return A @testing.fixture def _function_fixture(self): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) value = Column(Integer) @hybrid.hybrid_property def foo_value(self): return func.foo(self.value) return A @testing.fixture def _name_mismatch_fixture(self): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) addresses = relationship("B") @hybrid.hybrid_property def some_email(self): if self.addresses: return self.addresses[0].email_address else: return None @some_email.expression def some_email(cls): return B.email_address class B(Base): __tablename__ = "b" id = Column(Integer, primary_key=True) aid = Column(ForeignKey("a.id")) email_address = Column(String) return A, B def test_dont_assume_attr_key_is_present(self, _name_mismatch_fixture): A, B = _name_mismatch_fixture self.assert_compile( select(A, A.some_email).join(A.addresses), "SELECT a.id, b.email_address FROM a JOIN b ON a.id = b.aid", ) def test_dont_assume_attr_key_is_present_ac(self, _name_mismatch_fixture): A, B = _name_mismatch_fixture ac = aliased(A) self.assert_compile( select(ac, ac.some_email).join(ac.addresses), "SELECT a_1.id, b.email_address " "FROM a AS a_1 JOIN b ON a_1.id = b.aid", ) def test_c_collection_func_element(self, _function_fixture): A = _function_fixture stmt = select(A.id, A.foo_value) eq_(stmt.subquery().c.keys(), ["id", "foo_value"]) def test_filter_by_mismatched_col(self, _name_mismatch_fixture): A, B = _name_mismatch_fixture self.assert_compile( select(A).filter_by(some_email="foo").join(A.addresses), "SELECT a.id FROM a JOIN b ON a.id = b.aid " "WHERE b.email_address = :email_address_1", ) def test_aliased_mismatched_col(self, _name_mismatch_fixture): A, B = _name_mismatch_fixture sess = fixture_session() # so what should this do ? it's just a weird hybrid case self.assert_compile( sess.query(aliased(A).some_email), "SELECT b.email_address AS b_email_address FROM b", ) def test_property(self): A = self._fixture() is_(A.value.property, A._value.property) def test_key(self): A = self._fixture() eq_(A.value.key, "value") eq_(A._value.key, "_value") def test_class(self): A = self._fixture() is_(A.value.class_, A._value.class_) def test_get_history(self): A = self._fixture() inst = A(_value=5) eq_(A.value.get_history(inst), A._value.get_history(inst)) def test_info_not_mirrored(self): A = self._fixture() A._value.info["foo"] = "bar" A.value.info["bar"] = "hoho" eq_(A._value.info, {"foo": "bar"}) eq_(A.value.info, {"bar": "hoho"}) def test_info_from_hybrid(self): A = self._fixture() A._value.info["foo"] = "bar" A.value.info["bar"] = "hoho" insp = inspect(A) is_(insp.all_orm_descriptors["value"].info, A.value.info) class SynonymOfPropertyTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _fixture(self): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) _value = Column("value", String) @hybrid.hybrid_property def value(self): return self._value value_syn = synonym("value") @hybrid.hybrid_property def string_value(self): return "foo" string_value_syn = synonym("string_value") @hybrid.hybrid_property def string_expr_value(self): return "foo" @string_expr_value.expression def string_expr_value(cls): return literal_column("'foo'") string_expr_value_syn = synonym("string_expr_value") return A def test_hasattr(self): A = self._fixture() is_false(hasattr(A.value_syn, "nonexistent")) is_false(hasattr(A.string_value_syn, "nonexistent")) is_false(hasattr(A.string_expr_value_syn, "nonexistent")) def test_instance_access(self): A = self._fixture() a1 = A(_value="hi") eq_(a1.value_syn, "hi") eq_(a1.string_value_syn, "foo") eq_(a1.string_expr_value_syn, "foo") def test_expression_property(self): A = self._fixture() self.assert_compile( select(A.id, A.value_syn).where(A.value_syn == "value"), "SELECT a.id, a.value FROM a WHERE a.value = :value_1", ) def test_expression_expr(self): A = self._fixture() self.assert_compile( select(A.id, A.string_expr_value_syn).where( A.string_expr_value_syn == "value" ), "SELECT a.id, 'foo' FROM a WHERE 'foo' = :'foo'_1", ) class InplaceCreationTest(fixtures.TestBase, AssertsCompiledSQL): """test 'inplace' definitions added for 2.0 to assist with typing limitations. """ __dialect__ = "default" def test_property_integration(self, decl_base): class Person(decl_base): __tablename__ = "person" id = Column(Integer, primary_key=True) _name = Column(String) @hybrid.hybrid_property def name(self): return self._name @name.inplace.setter def _name_setter(self, value): self._name = value.title() @name.inplace.expression def _name_expression(self): return func.concat("Hello", self._name) p1 = Person(_name="name") eq_(p1.name, "name") p1.name = "new name" eq_(p1.name, "New Name") self.assert_compile(Person.name, "concat(:concat_1, person._name)") def test_method_integration(self, decl_base): class A(decl_base): __tablename__ = "a" id = Column(Integer, primary_key=True) _value = Column("value", String) @hybrid.hybrid_method def value(self, x): return int(self._value) + x @value.inplace.expression def _value_expression(cls, value): return func.foo(cls._value, value) + value a1 = A(_value="10") eq_(a1.value(5), 15) self.assert_compile(A.value(column("q")), "foo(a.value, q) + q") def test_property_unit(self): def one(): pass def two(): pass def three(): pass prop = hybrid.hybrid_property(one) prop2 = prop.inplace.expression(two) prop3 = prop.inplace.setter(three) is_(prop, prop2) is_(prop, prop3) def four(): pass prop4 = prop.setter(four) is_not(prop, prop4) class MethodExpressionTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" def _fixture(self): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) _value = Column("value", String) @hybrid.hybrid_method def value(self, x): "This is an instance-level docstring" return int(self._value) + x @value.expression def value(cls, value): "This is a class-level docstring" return func.foo(cls._value, value) + value @hybrid.hybrid_method def other_value(self, x): "This is an instance-level docstring" return int(self._value) + x @other_value.expression def other_value(cls, value): return func.foo(cls._value, value) + value return A def test_call(self): A = self._fixture() a1 = A(_value=10) eq_(a1.value(7), 17) def test_expression(self): A = self._fixture() self.assert_compile(A.value(5), "foo(a.value, :foo_1) + :foo_2") def test_info(self): A = self._fixture() inspect(A).all_orm_descriptors.value.info["some key"] = "some value" eq_( inspect(A).all_orm_descriptors.value.info, {"some key": "some value"}, ) def test_aliased_expression(self): A = self._fixture() self.assert_compile( aliased(A).value(5), "foo(a_1.value, :foo_1) + :foo_2" ) def test_query(self): A = self._fixture() sess = fixture_session() self.assert_compile( sess.query(A).filter(A.value(5) == "foo"), "SELECT a.id AS a_id, a.value AS a_value " "FROM a WHERE foo(a.value, :foo_1) + :foo_2 = :param_1", ) def test_aliased_query(self): A = self._fixture() sess = fixture_session() a1 = aliased(A) self.assert_compile( sess.query(a1).filter(a1.value(5) == "foo"), "SELECT a_1.id AS a_1_id, a_1.value AS a_1_value " "FROM a AS a_1 WHERE foo(a_1.value, :foo_1) + :foo_2 = :param_1", ) def test_query_col(self): A = self._fixture() sess = fixture_session() self.assert_compile( sess.query(A.value(5)), "SELECT foo(a.value, :foo_1) + :foo_2 AS anon_1 FROM a", ) def test_aliased_query_col(self): A = self._fixture() sess = fixture_session() self.assert_compile( sess.query(aliased(A).value(5)), "SELECT foo(a_1.value, :foo_1) + :foo_2 AS anon_1 FROM a AS a_1", ) def test_docstring(self): A = self._fixture() eq_(A.value.__doc__, "This is a class-level docstring") eq_(A.other_value.__doc__, "This is an instance-level docstring") a1 = A(_value=10) # a1.value is still a method, so it has a # docstring eq_(a1.value.__doc__, "This is an instance-level docstring") eq_(a1.other_value.__doc__, "This is an instance-level docstring") class BulkUpdateTest(fixtures.DeclarativeMappedTest, AssertsCompiledSQL): __dialect__ = "default" @classmethod def setup_classes(cls): Base = cls.DeclarativeBasic class Person(Base): __tablename__ = "person" id = Column(Integer, primary_key=True) first_name = Column(String(10)) last_name = Column(String(10)) @hybrid.hybrid_property def name(self): return self.first_name + " " + self.last_name @name.setter def name(self, value): self.first_name, self.last_name = value.split(" ", 1) @name.expression def name(cls): return func.concat(cls.first_name, " ", cls.last_name) @name.update_expression def name(cls, value): f, l = value.split(" ", 1) return [(cls.first_name, f), (cls.last_name, l)] @hybrid.hybrid_property def uname(self): return self.name @hybrid.hybrid_property def fname(self): return self.first_name @hybrid.hybrid_property def fname2(self): return self.fname @classmethod def insert_data(cls, connection): s = Session(connection) jill = cls.classes.Person(id=3, first_name="jill") s.add(jill) s.commit() def test_update_plain(self): Person = self.classes.Person statement = update(Person).values({Person.fname: "Dr."}) self.assert_compile( statement, "UPDATE person SET first_name=:first_name", params={"first_name": "Dr."}, ) @testing.combinations("attr", "str", "kwarg", argnames="keytype") def test_update_expr(self, keytype): Person = self.classes.Person if keytype == "attr": statement = update(Person).values({Person.name: "Dr. No"}) elif keytype == "str": statement = update(Person).values({"name": "Dr. No"}) elif keytype == "kwarg": statement = update(Person).values(name="Dr. No") else: assert False self.assert_compile( statement, "UPDATE person SET first_name=:first_name, last_name=:last_name", checkparams={"first_name": "Dr.", "last_name": "No"}, ) @testing.combinations("attr", "str", "kwarg", argnames="keytype") def test_insert_expr(self, keytype): Person = self.classes.Person if keytype == "attr": statement = insert(Person).values({Person.name: "Dr. No"}) elif keytype == "str": statement = insert(Person).values({"name": "Dr. No"}) elif keytype == "kwarg": statement = insert(Person).values(name="Dr. No") else: assert False self.assert_compile( statement, "INSERT INTO person (first_name, last_name) VALUES " "(:first_name, :last_name)", checkparams={"first_name": "Dr.", "last_name": "No"}, ) # these tests all run two UPDATES to assert that caching is not # interfering. this is #7209 def test_evaluate_non_hybrid_attr(self): # this is a control case Person = self.classes.Person s = fixture_session() jill = s.get(Person, 3) s.query(Person).update( {Person.first_name: "moonbeam"}, synchronize_session="evaluate" ) eq_(jill.first_name, "moonbeam") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "moonbeam", ) s.query(Person).update( {Person.first_name: "sunshine"}, synchronize_session="evaluate" ) eq_(jill.first_name, "sunshine") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "sunshine", ) def test_evaluate_hybrid_attr_indirect(self): Person = self.classes.Person s = fixture_session() jill = s.get(Person, 3) s.query(Person).update( {Person.fname2: "moonbeam"}, synchronize_session="evaluate" ) eq_(jill.fname2, "moonbeam") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "moonbeam", ) s.query(Person).update( {Person.fname2: "sunshine"}, synchronize_session="evaluate" ) eq_(jill.fname2, "sunshine") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "sunshine", ) def test_evaluate_hybrid_attr_plain(self): Person = self.classes.Person s = fixture_session() jill = s.get(Person, 3) s.query(Person).update( {Person.fname: "moonbeam"}, synchronize_session="evaluate" ) eq_(jill.fname, "moonbeam") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "moonbeam", ) s.query(Person).update( {Person.fname: "sunshine"}, synchronize_session="evaluate" ) eq_(jill.fname, "sunshine") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "sunshine", ) def test_fetch_hybrid_attr_indirect(self): Person = self.classes.Person s = fixture_session() jill = s.get(Person, 3) s.query(Person).update( {Person.fname2: "moonbeam"}, synchronize_session="fetch" ) eq_(jill.fname2, "moonbeam") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "moonbeam", ) s.query(Person).update( {Person.fname2: "sunshine"}, synchronize_session="fetch" ) eq_(jill.fname2, "sunshine") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "sunshine", ) def test_fetch_hybrid_attr_plain(self): Person = self.classes.Person s = fixture_session() jill = s.get(Person, 3) s.query(Person).update( {Person.fname: "moonbeam"}, synchronize_session="fetch" ) eq_(jill.fname, "moonbeam") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "moonbeam", ) s.query(Person).update( {Person.fname: "sunshine"}, synchronize_session="fetch" ) eq_(jill.fname, "sunshine") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "sunshine", ) def test_evaluate_hybrid_attr_w_update_expr(self): Person = self.classes.Person s = fixture_session() jill = s.get(Person, 3) s.query(Person).update( {Person.name: "moonbeam sunshine"}, synchronize_session="evaluate" ) eq_(jill.name, "moonbeam sunshine") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "moonbeam", ) s.query(Person).update( {Person.name: "first last"}, synchronize_session="evaluate" ) eq_(jill.name, "first last") eq_(s.scalar(select(Person.first_name).where(Person.id == 3)), "first") def test_fetch_hybrid_attr_w_update_expr(self): Person = self.classes.Person s = fixture_session() jill = s.get(Person, 3) s.query(Person).update( {Person.name: "moonbeam sunshine"}, synchronize_session="fetch" ) eq_(jill.name, "moonbeam sunshine") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "moonbeam", ) s.query(Person).update( {Person.name: "first last"}, synchronize_session="fetch" ) eq_(jill.name, "first last") eq_(s.scalar(select(Person.first_name).where(Person.id == 3)), "first") def test_evaluate_hybrid_attr_indirect_w_update_expr(self): Person = self.classes.Person s = fixture_session() jill = s.get(Person, 3) s.query(Person).update( {Person.uname: "moonbeam sunshine"}, synchronize_session="evaluate" ) eq_(jill.uname, "moonbeam sunshine") eq_( s.scalar(select(Person.first_name).where(Person.id == 3)), "moonbeam", ) s.query(Person).update( {Person.uname: "first last"}, synchronize_session="evaluate" ) eq_(jill.uname, "first last") eq_(s.scalar(select(Person.first_name).where(Person.id == 3)), "first") class SpecialObjectTest(fixtures.TestBase, AssertsCompiledSQL): """tests against hybrids that return a non-ClauseElement. use cases derived from the example at https://techspot.zzzeek.org/2011/10/21/hybrids-and-value-agnostic-types/ """ __dialect__ = "default" @classmethod def setup_test_class(cls): from sqlalchemy import literal symbols = ("usd", "gbp", "cad", "eur", "aud") currency_lookup = { (currency_from, currency_to): Decimal(str(rate)) for currency_to, values in zip( symbols, [ (1, 1.59009, 0.988611, 1.37979, 1.02962), (0.628895, 1, 0.621732, 0.867748, 0.647525), (1.01152, 1.6084, 1, 1.39569, 1.04148), (0.724743, 1.1524, 0.716489, 1, 0.746213), (0.971228, 1.54434, 0.960166, 1.34009, 1), ], ) for currency_from, rate in zip(symbols, values) } class Amount: def __init__(self, amount, currency): self.currency = currency self.amount = amount def __add__(self, other): return Amount( self.amount + other.as_currency(self.currency).amount, self.currency, ) def __sub__(self, other): return Amount( self.amount - other.as_currency(self.currency).amount, self.currency, ) def __lt__(self, other): return self.amount < other.as_currency(self.currency).amount def __gt__(self, other): return self.amount > other.as_currency(self.currency).amount def __eq__(self, other): return self.amount == other.as_currency(self.currency).amount def as_currency(self, other_currency): return Amount( currency_lookup[(self.currency, other_currency)] * self.amount, other_currency, ) def __clause_element__(self): # helper method for SQLAlchemy to interpret # the Amount object as a SQL element if isinstance(self.amount, (float, int, Decimal)): return literal(self.amount) else: return self.amount def __str__(self): return "%2.4f %s" % (self.amount, self.currency) def __repr__(self): return "Amount(%r, %r)" % (self.amount, self.currency) Base = declarative_base() class BankAccount(Base): __tablename__ = "bank_account" id = Column(Integer, primary_key=True) _balance = Column("balance", Numeric) @hybrid.hybrid_property def balance(self): """Return an Amount view of the current balance.""" return Amount(self._balance, "usd") @balance.setter def balance(self, value): self._balance = value.as_currency("usd").amount cls.Amount = Amount cls.BankAccount = BankAccount def test_instance_one(self): BankAccount, Amount = self.BankAccount, self.Amount account = BankAccount(balance=Amount(4000, "usd")) # 3b. print balance in usd eq_(account.balance.amount, 4000) def test_instance_two(self): BankAccount, Amount = self.BankAccount, self.Amount account = BankAccount(balance=Amount(4000, "usd")) # 3c. print balance in gbp eq_(account.balance.as_currency("gbp").amount, Decimal("2515.58")) def test_instance_three(self): BankAccount, Amount = self.BankAccount, self.Amount account = BankAccount(balance=Amount(4000, "usd")) # 3d. perform currency-agnostic comparisons, math is_(account.balance > Amount(500, "cad"), True) def test_instance_four(self): BankAccount, Amount = self.BankAccount, self.Amount account = BankAccount(balance=Amount(4000, "usd")) eq_( account.balance + Amount(500, "cad") - Amount(50, "eur"), Amount(Decimal("4425.316"), "usd"), ) def test_query_one(self): BankAccount, Amount = self.BankAccount, self.Amount session = fixture_session() query = session.query(BankAccount).filter( BankAccount.balance == Amount(10000, "cad") ) self.assert_compile( query, "SELECT bank_account.id AS bank_account_id, " "bank_account.balance AS bank_account_balance " "FROM bank_account " "WHERE bank_account.balance = :balance_1", checkparams={"balance_1": Decimal("9886.110000")}, ) def test_query_two(self): BankAccount, Amount = self.BankAccount, self.Amount session = fixture_session() # alternatively we can do the calc on the DB side. query = ( session.query(BankAccount) .filter( BankAccount.balance.as_currency("cad") > Amount(9999, "cad") ) .filter( BankAccount.balance.as_currency("cad") < Amount(10001, "cad") ) ) self.assert_compile( query, "SELECT bank_account.id AS bank_account_id, " "bank_account.balance AS bank_account_balance " "FROM bank_account " "WHERE :balance_1 * bank_account.balance > :param_1 " "AND :balance_2 * bank_account.balance < :param_2", checkparams={ "balance_1": Decimal("1.01152"), "balance_2": Decimal("1.01152"), "param_1": Decimal("9999"), "param_2": Decimal("10001"), }, ) def test_query_three(self): BankAccount = self.BankAccount session = fixture_session() query = session.query(BankAccount).filter( BankAccount.balance.as_currency("cad") > BankAccount.balance.as_currency("eur") ) self.assert_compile( query, "SELECT bank_account.id AS bank_account_id, " "bank_account.balance AS bank_account_balance " "FROM bank_account " "WHERE :balance_1 * bank_account.balance > " ":param_1 * :balance_2 * bank_account.balance", checkparams={ "balance_1": Decimal("1.01152"), "balance_2": Decimal("0.724743"), "param_1": Decimal("1.39569"), }, ) def test_query_four(self): BankAccount = self.BankAccount session = fixture_session() # 4c. query all amounts, converting to "CAD" on the DB side query = session.query(BankAccount.balance.as_currency("cad").amount) self.assert_compile( query, "SELECT :balance_1 * bank_account.balance AS anon_1 " "FROM bank_account", checkparams={"balance_1": Decimal("1.01152")}, ) def test_query_five(self): BankAccount = self.BankAccount session = fixture_session() # 4d. average balance in EUR query = session.query(func.avg(BankAccount.balance.as_currency("eur"))) self.assert_compile( query, "SELECT avg(:balance_1 * bank_account.balance) AS avg_1 " "FROM bank_account", checkparams={"balance_1": Decimal("0.724743")}, ) def test_docstring(self): BankAccount = self.BankAccount eq_( BankAccount.balance.__doc__, "Return an Amount view of the current balance.", )