from __future__ import annotations from collections import abc import copy import dataclasses import pickle from typing import List from unittest.mock import call from unittest.mock import Mock from sqlalchemy import cast from sqlalchemy import exc from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy import inspect from sqlalchemy import Integer from sqlalchemy import MetaData from sqlalchemy import or_ from sqlalchemy import String from sqlalchemy import testing from sqlalchemy.engine import default from sqlalchemy.ext.associationproxy import _AssociationList from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import AssociationProxy from sqlalchemy.orm import aliased from sqlalchemy.orm import clear_mappers from sqlalchemy.orm import collections from sqlalchemy.orm import composite from sqlalchemy.orm import configure_mappers from sqlalchemy.orm import declarative_base from sqlalchemy.orm import declared_attr from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from sqlalchemy.orm import mapper from sqlalchemy.orm import relationship from sqlalchemy.orm import Session from sqlalchemy.orm.collections import attribute_keyed_dict from sqlalchemy.orm.collections import collection from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_false from sqlalchemy.testing.assertions import expect_raises_message from sqlalchemy.testing.entities import ComparableMixin # noqa from sqlalchemy.testing.fixtures import fixture_session from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table from sqlalchemy.testing.util import gc_collect class DictCollection(dict): @collection.appender def append(self, obj): self[obj.foo] = obj @collection.remover def remove(self, obj): del self[obj.foo] class SetCollection(set): pass class ListCollection(list): pass class ObjectCollection: def __init__(self): self.values = list() @collection.appender def append(self, obj): self.values.append(obj) @collection.remover def remove(self, obj): self.values.remove(obj) def __iter__(self): return iter(self.values) class AutoFlushTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "parent", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), ) Table( "association", metadata, Column("parent_id", ForeignKey("parent.id"), primary_key=True), Column("child_id", ForeignKey("child.id"), primary_key=True), Column("name", String(50)), ) Table( "child", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(50)), ) def _fixture(self, collection_class, is_dict=False): class Parent: collection = association_proxy("_collection", "child") class Child: def __init__(self, name): self.name = name class Association: if is_dict: def __init__(self, key, child): self.child = child else: def __init__(self, child): self.child = child self.mapper_registry.map_imperatively( Parent, self.tables.parent, properties={ "_collection": relationship( Association, collection_class=collection_class, backref="parent", ) }, ) self.mapper_registry.map_imperatively( Association, self.tables.association, properties={"child": relationship(Child, backref="association")}, ) self.mapper_registry.map_imperatively(Child, self.tables.child) return Parent, Child, Association def _test_premature_flush(self, collection_class, fn, is_dict=False): Parent, Child, Association = self._fixture( collection_class, is_dict=is_dict ) session = Session( testing.db, autoflush=True, expire_on_commit=True, future=True ) p1 = Parent() c1 = Child("c1") c2 = Child("c2") session.add(p1) session.add(c1) session.add(c2) fn(p1.collection, c1) session.commit() fn(p1.collection, c2) session.commit() is_(c1.association[0].parent, p1) is_(c2.association[0].parent, p1) session.close() def test_list_append(self): self._test_premature_flush( list, lambda collection, obj: collection.append(obj) ) def test_list_extend(self): self._test_premature_flush( list, lambda collection, obj: collection.extend([obj]) ) def test_set_add(self): self._test_premature_flush( set, lambda collection, obj: collection.add(obj) ) def test_set_extend(self): self._test_premature_flush( set, lambda collection, obj: collection.update([obj]) ) def test_dict_set(self): def set_(collection, obj): collection[obj.name] = obj self._test_premature_flush( collections.attribute_keyed_dict( "name", ignore_unpopulated_attribute=True ), set_, is_dict=True, ) class _CollectionOperations(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "Parent", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(128)), ) Table( "Children", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("parent_id", Integer, ForeignKey("Parent.id")), Column("foo", String(128)), Column("name", String(128)), ) @classmethod def setup_mappers(cls): collection_class = cls.collection_class class Parent(cls.Basic): children = association_proxy("_children", "name") def __init__(self, name): self.name = name class Child(cls.Basic): if collection_class and issubclass(collection_class, dict): def __init__(self, foo, name): self.foo = foo self.name = name else: def __init__(self, name): self.name = name parents_table, children_table = cls.tables("Parent", "Children") cls.mapper_registry.map_imperatively( Parent, parents_table, properties={ "_children": relationship( Child, lazy="joined", backref="parent", collection_class=collection_class, ) }, ) cls.mapper_registry.map_imperatively(Child, children_table) def test_abc(self): Parent = self.classes.Parent p1 = Parent("x") collection_class = self.collection_class or list for abc_ in (abc.Set, abc.MutableMapping, abc.MutableSequence): if issubclass(collection_class, abc_): break else: abc_ = None if abc_: p1 = Parent("x") assert isinstance(p1.children, abc_) def roundtrip(self, obj): if obj not in self.session: self.session.add(obj) self.session.flush() id_, type_ = obj.id, type(obj) self.session.expunge_all() return self.session.get(type_, id_) def _test_sequence_ops(self): Parent, Child = self.classes("Parent", "Child") self.session = fixture_session() p1 = Parent("P1") def assert_index(expected, value, *args): """Assert index of child value is equal to expected. If expected is None, assert that index raises ValueError. """ try: index = p1.children.index(value, *args) except ValueError: self.assert_(expected is None) else: self.assert_(expected is not None) self.assert_(index == expected) self.assert_(not p1._children) self.assert_(not p1.children) ch = Child("regular") p1._children.append(ch) self.assert_(ch in p1._children) self.assert_(len(p1._children) == 1) self.assert_(p1.children) self.assert_(len(p1.children) == 1) self.assert_(ch not in p1.children) self.assert_("regular" in p1.children) assert_index(0, "regular") assert_index(None, "regular", 1) p1.children.append("proxied") self.assert_("proxied" in p1.children) self.assert_("proxied" not in p1._children) self.assert_(len(p1.children) == 2) self.assert_(len(p1._children) == 2) self.assert_(p1._children[0].name == "regular") self.assert_(p1._children[1].name == "proxied") assert_index(0, "regular") assert_index(1, "proxied") assert_index(1, "proxied", 1) assert_index(None, "proxied", 0, 1) del p1._children[1] self.assert_(len(p1._children) == 1) self.assert_(len(p1.children) == 1) self.assert_(p1._children[0] == ch) assert_index(None, "proxied") del p1.children[0] self.assert_(len(p1._children) == 0) self.assert_(len(p1.children) == 0) assert_index(None, "regular") p1.children = ["a", "b", "c"] self.assert_(len(p1._children) == 3) self.assert_(len(p1.children) == 3) assert_index(0, "a") assert_index(1, "b") assert_index(2, "c") del ch p1 = self.roundtrip(p1) self.assert_(len(p1._children) == 3) self.assert_(len(p1.children) == 3) assert_index(0, "a") assert_index(1, "b") assert_index(2, "c") popped = p1.children.pop() self.assert_(len(p1.children) == 2) self.assert_(popped not in p1.children) assert_index(None, popped) p1 = self.roundtrip(p1) self.assert_(len(p1.children) == 2) self.assert_(popped not in p1.children) assert_index(None, popped) p1.children[1] = "changed-in-place" self.assert_(p1.children[1] == "changed-in-place") assert_index(1, "changed-in-place") assert_index(None, "b") inplace_id = p1._children[1].id p1 = self.roundtrip(p1) self.assert_(p1.children[1] == "changed-in-place") assert p1._children[1].id == inplace_id p1.children.append("changed-in-place") self.assert_(p1.children.count("changed-in-place") == 2) assert_index(1, "changed-in-place") p1.children.remove("changed-in-place") self.assert_(p1.children.count("changed-in-place") == 1) assert_index(1, "changed-in-place") p1 = self.roundtrip(p1) self.assert_(p1.children.count("changed-in-place") == 1) assert_index(1, "changed-in-place") p1._children = [] self.assert_(len(p1.children) == 0) assert_index(None, "changed-in-place") after = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"] p1.children = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"] self.assert_(len(p1.children) == 10) self.assert_([c.name for c in p1._children] == after) for i, val in enumerate(after): assert_index(i, val) p1.children[2:6] = ["x"] * 4 after = ["a", "b", "x", "x", "x", "x", "g", "h", "i", "j"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) assert_index(2, "x") assert_index(3, "x", 3) assert_index(None, "x", 6) p1.children[2:6] = ["y"] after = ["a", "b", "y", "g", "h", "i", "j"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) assert_index(2, "y") assert_index(None, "y", 3) p1.children[2:3] = ["z"] * 4 after = ["a", "b", "z", "z", "z", "z", "g", "h", "i", "j"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) p1.children[2::2] = ["O"] * 4 after = ["a", "b", "O", "z", "O", "z", "O", "h", "O", "j"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) assert_raises(TypeError, set, [p1.children]) p1.children *= 0 after = [] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) p1.children += ["a", "b"] after = ["a", "b"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) p1.children[:] = ["d", "e"] after = ["d", "e"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) p1.children[:] = ["a", "b"] p1.children += ["c"] after = ["a", "b", "c"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) p1.children *= 1 after = ["a", "b", "c"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) p1.children *= 2 after = ["a", "b", "c", "a", "b", "c"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) p1.children = ["a"] after = ["a"] self.assert_(p1.children == after) self.assert_([c.name for c in p1._children] == after) self.assert_((p1.children * 2) == ["a", "a"]) self.assert_((2 * p1.children) == ["a", "a"]) self.assert_((p1.children * 0) == []) self.assert_((0 * p1.children) == []) self.assert_((p1.children + ["b"]) == ["a", "b"]) self.assert_((["b"] + p1.children) == ["b", "a"]) try: p1.children + 123 assert False except TypeError: assert True class DefaultTest(_CollectionOperations): collection_class = None def test_sequence_ops(self): self._test_sequence_ops() class ListTest(_CollectionOperations): collection_class = list def test_sequence_ops(self): self._test_sequence_ops() class CustomDictTest(_CollectionOperations): collection_class = DictCollection def test_mapping_ops(self): Parent, Child = self.classes("Parent", "Child") self.session = fixture_session() p1 = Parent("P1") self.assert_(not p1._children) self.assert_(not p1.children) ch = Child("a", "regular") p1._children.append(ch) self.assert_(ch in list(p1._children.values())) self.assert_(len(p1._children) == 1) self.assert_(p1.children) self.assert_(len(p1.children) == 1) self.assert_(ch not in p1.children) self.assert_("a" in p1.children) self.assert_(p1.children["a"] == "regular") self.assert_(p1._children["a"] == ch) p1.children["b"] = "proxied" eq_(list(p1.children.keys()), ["a", "b"]) eq_(list(p1.children.items()), [("a", "regular"), ("b", "proxied")]) eq_(list(p1.children.values()), ["regular", "proxied"]) self.assert_("proxied" in list(p1.children.values())) self.assert_("b" in p1.children) self.assert_("proxied" not in p1._children) self.assert_(len(p1.children) == 2) self.assert_(len(p1._children) == 2) self.assert_(p1._children["a"].name == "regular") self.assert_(p1._children["b"].name == "proxied") del p1._children["b"] self.assert_(len(p1._children) == 1) self.assert_(len(p1.children) == 1) self.assert_(p1._children["a"] == ch) del p1.children["a"] self.assert_(len(p1._children) == 0) self.assert_(len(p1.children) == 0) p1.children = {"d": "v d", "e": "v e", "f": "v f"} self.assert_(len(p1._children) == 3) self.assert_(len(p1.children) == 3) self.assert_(set(p1.children) == {"d", "e", "f"}) del ch p1 = self.roundtrip(p1) self.assert_(len(p1._children) == 3) self.assert_(len(p1.children) == 3) p1.children["e"] = "changed-in-place" self.assert_(p1.children["e"] == "changed-in-place") inplace_id = p1._children["e"].id p1 = self.roundtrip(p1) self.assert_(p1.children["e"] == "changed-in-place") self.assert_(p1._children["e"].id == inplace_id) p1._children = {} self.assert_(len(p1.children) == 0) try: p1._children = [] self.assert_(False) except TypeError: self.assert_(True) try: p1._children = None self.assert_(False) except TypeError: self.assert_(True) assert_raises(TypeError, set, [p1.children]) def test_bulk_replace(self): Parent = self.classes.Parent p1 = Parent("foo") p1.children = {"a": "v a", "b": "v b", "c": "v c"} assocs = set(p1._children.values()) keep_assocs = {a for a in assocs if a.foo in ("a", "c")} eq_(len(keep_assocs), 2) remove_assocs = {a for a in assocs if a.foo == "b"} p1.children = {"a": "v a", "d": "v d", "c": "v c"} eq_( {a for a in p1._children.values() if a.foo in ("a", "c")}, keep_assocs, ) assert not remove_assocs.intersection(p1._children.values()) eq_(p1.children, {"a": "v a", "d": "v d", "c": "v c"}) class SetTest(_CollectionOperations): collection_class = set def test_set_operations(self): Parent, Child = self.classes.Parent, self.classes.Child self.session = fixture_session() p1 = Parent("P1") self.assert_(not p1._children) self.assert_(not p1.children) ch1 = Child("regular") p1._children.add(ch1) self.assert_(ch1 in p1._children) self.assert_(len(p1._children) == 1) self.assert_(p1.children) self.assert_(len(p1.children) == 1) self.assert_(ch1 not in p1.children) self.assert_("regular" in p1.children) p1.children.add("proxied") self.assert_("proxied" in p1.children) self.assert_("proxied" not in p1._children) self.assert_(len(p1.children) == 2) self.assert_(len(p1._children) == 2) self.assert_({o.name for o in p1._children} == {"regular", "proxied"}) ch2 = None for o in p1._children: if o.name == "proxied": ch2 = o break p1._children.remove(ch2) self.assert_(len(p1._children) == 1) self.assert_(len(p1.children) == 1) self.assert_(p1._children == {ch1}) p1.children.remove("regular") self.assert_(len(p1._children) == 0) self.assert_(len(p1.children) == 0) p1.children = ["a", "b", "c"] self.assert_(len(p1._children) == 3) self.assert_(len(p1.children) == 3) del ch1 p1 = self.roundtrip(p1) self.assert_(len(p1._children) == 3) self.assert_(len(p1.children) == 3) self.assert_("a" in p1.children) self.assert_("b" in p1.children) self.assert_("d" not in p1.children) self.assert_(p1.children == {"a", "b", "c"}) assert_raises(KeyError, p1.children.remove, "d") self.assert_(len(p1.children) == 3) p1.children.discard("d") self.assert_(len(p1.children) == 3) p1 = self.roundtrip(p1) self.assert_(len(p1.children) == 3) popped = p1.children.pop() self.assert_(len(p1.children) == 2) self.assert_(popped not in p1.children) p1 = self.roundtrip(p1) self.assert_(len(p1.children) == 2) self.assert_(popped not in p1.children) p1.children = ["a", "b", "c"] p1 = self.roundtrip(p1) self.assert_(p1.children == {"a", "b", "c"}) p1.children.discard("b") p1 = self.roundtrip(p1) self.assert_(p1.children == {"a", "c"}) p1.children.remove("a") p1 = self.roundtrip(p1) self.assert_(p1.children == {"c"}) p1._children = set() self.assert_(len(p1.children) == 0) try: p1._children = [] self.assert_(False) except TypeError: self.assert_(True) try: p1._children = None self.assert_(False) except TypeError: self.assert_(True) assert_raises(TypeError, set, [p1.children]) def test_set_comparisons(self): Parent = self.classes.Parent p1 = Parent("P1") p1.children = ["a", "b", "c"] control = {"a", "b", "c"} for other in ( {"a", "b", "c"}, {"a", "b", "c", "d"}, {"a"}, {"a", "b"}, {"c", "d"}, {"e", "f", "g"}, set(), ): eq_(p1.children.union(other), control.union(other)) eq_(p1.children.difference(other), control.difference(other)) eq_((p1.children - other), (control - other)) eq_(p1.children.intersection(other), control.intersection(other)) eq_( p1.children.symmetric_difference(other), control.symmetric_difference(other), ) eq_(p1.children.issubset(other), control.issubset(other)) eq_(p1.children.issuperset(other), control.issuperset(other)) self.assert_((p1.children == other) == (control == other)) self.assert_((p1.children != other) == (control != other)) self.assert_((p1.children < other) == (control < other)) self.assert_((p1.children <= other) == (control <= other)) self.assert_((p1.children > other) == (control > other)) self.assert_((p1.children >= other) == (control >= other)) def test_set_comparison_empty_to_empty(self): # test issue #3265 which was fixed in Python version 2.7.8 Parent = self.classes.Parent p1 = Parent("P1") p1.children = [] p2 = Parent("P2") p2.children = [] set_0 = set() set_a = p1.children set_b = p2.children is_(set_a == set_a, True) is_(set_a == set_b, True) is_(set_a == set_0, True) is_(set_0 == set_a, True) is_(set_a != set_a, False) is_(set_a != set_b, False) is_(set_a != set_0, False) is_(set_0 != set_a, False) def test_set_mutation(self): Parent = self.classes.Parent self.session = fixture_session() # mutations for op in ( "update", "intersection_update", "difference_update", "symmetric_difference_update", ): for base in (["a", "b", "c"], []): for other in ( {"a", "b", "c"}, {"a", "b", "c", "d"}, {"a"}, {"a", "b"}, {"c", "d"}, {"e", "f", "g"}, set(), ): p = Parent("p") p.children = base[:] control = set(base[:]) getattr(p.children, op)(other) getattr(control, op)(other) try: self.assert_(p.children == control) except Exception: print("Test %s.%s(%s):" % (set(base), op, other)) print("want", repr(control)) print("got", repr(p.children)) raise p = self.roundtrip(p) try: self.assert_(p.children == control) except Exception: print("Test %s.%s(%s):" % (base, op, other)) print("want", repr(control)) print("got", repr(p.children)) raise # in-place mutations for op in ("|=", "-=", "&=", "^="): for base in (["a", "b", "c"], []): for other in ( {"a", "b", "c"}, {"a", "b", "c", "d"}, {"a"}, {"a", "b"}, {"c", "d"}, {"e", "f", "g"}, frozenset(["e", "f", "g"]), set(), ): p = Parent("p") p.children = base[:] control = set(base[:]) exec("p.children %s other" % op) exec("control %s other" % op) try: self.assert_(p.children == control) except Exception: print("Test %s %s %s:" % (set(base), op, other)) print("want", repr(control)) print("got", repr(p.children)) raise p = self.roundtrip(p) try: self.assert_(p.children == control) except Exception: print("Test %s %s %s:" % (base, op, other)) print("want", repr(control)) print("got", repr(p.children)) raise def test_bulk_replace(self): Parent = self.classes.Parent p1 = Parent("foo") p1.children = {"a", "b", "c"} assocs = set(p1._children) keep_assocs = {a for a in assocs if a.name in ("a", "c")} eq_(len(keep_assocs), 2) remove_assocs = {a for a in assocs if a.name == "b"} p1.children = {"a", "c", "d"} eq_({a for a in p1._children if a.name in ("a", "c")}, keep_assocs) assert not remove_assocs.intersection(p1._children) eq_(p1.children, {"a", "c", "d"}) class CustomSetTest(SetTest): collection_class = SetCollection class CustomObjectTest(_CollectionOperations): collection_class = ObjectCollection def test_basic(self): Parent = self.classes.Parent self.session = fixture_session() p = Parent("p1") self.assert_(len(list(p.children)) == 0) p.children.append("child") self.assert_(len(list(p.children)) == 1) p = self.roundtrip(p) self.assert_(len(list(p.children)) == 1) # We didn't provide an alternate _AssociationList implementation # for our ObjectCollection, so indexing will fail. assert_raises(TypeError, p.children.__getitem__, 1) class ProxyFactoryTest(ListTest): @classmethod def define_tables(cls, metadata): Table( "Parent", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(128)), ) Table( "Children", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("parent_id", Integer, ForeignKey("Parent.id")), Column("foo", String(128)), Column("name", String(128)), ) @classmethod def setup_mappers(cls): parents_table, children_table = cls.tables("Parent", "Children") class CustomProxy(_AssociationList): def __init__(self, lazy_collection, creator, value_attr, parent): getter, setter = parent._default_getset(lazy_collection) _AssociationList.__init__( self, lazy_collection, creator, getter, setter, parent ) class Parent(cls.Basic): children = association_proxy( "_children", "name", proxy_factory=CustomProxy, proxy_bulk_set=CustomProxy.extend, ) def __init__(self, name): self.name = name class Child(cls.Basic): def __init__(self, name): self.name = name cls.mapper_registry.map_imperatively( Parent, parents_table, properties={ "_children": relationship( Child, lazy="joined", collection_class=list ) }, ) cls.mapper_registry.map_imperatively(Child, children_table) def test_sequence_ops(self): self._test_sequence_ops() class ScalarTest(fixtures.MappedTest): @testing.provide_metadata def test_scalar_proxy(self): metadata = self.metadata parents_table = Table( "Parent", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(128)), ) children_table = Table( "Children", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("parent_id", Integer, ForeignKey("Parent.id")), Column("foo", String(128)), Column("bar", String(128)), Column("baz", String(128)), ) class Parent: foo = association_proxy("child", "foo") bar = association_proxy( "child", "bar", creator=lambda v: Child(bar=v) ) baz = association_proxy( "child", "baz", creator=lambda v: Child(baz=v) ) def __init__(self, name): self.name = name class Child: def __init__(self, **kw): for attr in kw: setattr(self, attr, kw[attr]) self.mapper_registry.map_imperatively( Parent, parents_table, properties={ "child": relationship( Child, lazy="joined", backref="parent", uselist=False ) }, ) self.mapper_registry.map_imperatively(Child, children_table) metadata.create_all(testing.db) session = fixture_session() def roundtrip(obj): if obj not in session: session.add(obj) session.flush() id_, type_ = obj.id, type(obj) session.expunge_all() return session.get(type_, id_) p = Parent("p") eq_(p.child, None) eq_(p.foo, None) p.child = Child(foo="a", bar="b", baz="c") self.assert_(p.foo == "a") self.assert_(p.bar == "b") self.assert_(p.baz == "c") p.bar = "x" self.assert_(p.foo == "a") self.assert_(p.bar == "x") self.assert_(p.baz == "c") p = roundtrip(p) self.assert_(p.foo == "a") self.assert_(p.bar == "x") self.assert_(p.baz == "c") p.child = None eq_(p.foo, None) # Bogus creator for this scalar type assert_raises(TypeError, setattr, p, "foo", "zzz") p.bar = "yyy" self.assert_(p.foo is None) self.assert_(p.bar == "yyy") self.assert_(p.baz is None) del p.child p = roundtrip(p) self.assert_(p.child is None) p.baz = "xxx" self.assert_(p.foo is None) self.assert_(p.bar is None) self.assert_(p.baz == "xxx") p = roundtrip(p) self.assert_(p.foo is None) self.assert_(p.bar is None) self.assert_(p.baz == "xxx") # Ensure an immediate __set__ works. p2 = Parent("p2") p2.bar = "quux" @testing.provide_metadata def test_empty_scalars(self): metadata = self.metadata a = Table( "a", metadata, Column("id", Integer, primary_key=True), Column("name", String(50)), ) a2b = Table( "a2b", metadata, Column("id", Integer, primary_key=True), Column("id_a", Integer, ForeignKey("a.id")), Column("id_b", Integer, ForeignKey("b.id")), Column("name", String(50)), ) b = Table( "b", metadata, Column("id", Integer, primary_key=True), Column("name", String(50)), ) class A: a2b_name = association_proxy("a2b_single", "name") b_single = association_proxy("a2b_single", "b") class A2B: pass class B: pass self.mapper_registry.map_imperatively( A, a, properties=dict(a2b_single=relationship(A2B, uselist=False)) ) self.mapper_registry.map_imperatively( A2B, a2b, properties=dict(b=relationship(B)) ) self.mapper_registry.map_imperatively(B, b) a1 = A() assert a1.a2b_name is None assert a1.b_single is None def test_custom_getset(self): metadata = MetaData() p = Table( "p", metadata, Column("id", Integer, primary_key=True), Column("cid", Integer, ForeignKey("c.id")), ) c = Table( "c", metadata, Column("id", Integer, primary_key=True), Column("foo", String(128)), ) get = Mock() set_ = Mock() class Parent: foo = association_proxy( "child", "foo", getset_factory=lambda cc, parent: (get, set_) ) class Child: def __init__(self, foo): self.foo = foo self.mapper_registry.map_imperatively( Parent, p, properties={"child": relationship(Child)} ) self.mapper_registry.map_imperatively(Child, c) p1 = Parent() eq_(p1.foo, get(None)) p1.child = child = Child(foo="x") eq_(p1.foo, get(child)) p1.foo = "y" eq_(set_.mock_calls, [call(child, "y")]) class LazyLoadTest(fixtures.MappedTest): @classmethod def define_tables(cls, metadata): Table( "Parent", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(128)), ) Table( "Children", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("parent_id", Integer, ForeignKey("Parent.id")), Column("foo", String(128)), Column("name", String(128)), ) @classmethod def setup_mappers(cls): class Parent(cls.Basic): children = association_proxy("_children", "name") def __init__(self, name): self.name = name class Child(cls.Basic): def __init__(self, name): self.name = name cls.mapper_registry.map_imperatively(Child, cls.tables.Children) def roundtrip(self, obj): self.session.add(obj) self.session.flush() id_, type_ = obj.id, type(obj) self.session.expunge_all() return self.session.get(type_, id_) def test_lazy_list(self): Parent, Child = self.classes("Parent", "Child") self.session = fixture_session() self.mapper_registry.map_imperatively( Parent, self.tables.Parent, properties={ "_children": relationship( Child, lazy="select", collection_class=list ) }, ) p = Parent("p") p.children = ["a", "b", "c"] p = self.roundtrip(p) # Is there a better way to ensure that the association_proxy # didn't convert a lazy load to an eager load? This does work though. self.assert_("_children" not in p.__dict__) self.assert_(len(p._children) == 3) self.assert_("_children" in p.__dict__) def test_eager_list(self): Parent, Child = self.classes("Parent", "Child") self.session = fixture_session() self.mapper_registry.map_imperatively( Parent, self.tables.Parent, properties={ "_children": relationship( Child, lazy="joined", collection_class=list ) }, ) p = Parent("p") p.children = ["a", "b", "c"] p = self.roundtrip(p) self.assert_("_children" in p.__dict__) self.assert_(len(p._children) == 3) def test_slicing_list(self): Parent, Child = self.classes("Parent", "Child") self.session = fixture_session() self.mapper_registry.map_imperatively( Parent, self.tables.Parent, properties={ "_children": relationship( Child, lazy="select", collection_class=list ) }, ) p = Parent("p") p.children = ["a", "b", "c"] p = self.roundtrip(p) self.assert_(len(p._children) == 3) eq_("b", p.children[1]) eq_(["b", "c"], p.children[-2:]) def test_lazy_scalar(self): Parent, Child = self.classes("Parent", "Child") self.session = fixture_session() self.mapper_registry.map_imperatively( Parent, self.tables.Parent, properties={ "_children": relationship(Child, lazy="select", uselist=False) }, ) p = Parent("p") p.children = "value" p = self.roundtrip(p) self.assert_("_children" not in p.__dict__) self.assert_(p._children is not None) def test_eager_scalar(self): Parent, Child = self.classes("Parent", "Child") self.session = fixture_session() self.mapper_registry.map_imperatively( Parent, self.tables.Parent, properties={ "_children": relationship(Child, lazy="joined", uselist=False) }, ) p = Parent("p") p.children = "value" p = self.roundtrip(p) self.assert_("_children" in p.__dict__) self.assert_(p._children is not None) class Parent: def __init__(self, name): self.name = name class Child: def __init__(self, name): self.name = name class KVChild: def __init__(self, name, value): self.name = name self.value = value class ReconstitutionTest(fixtures.MappedTest): run_setup_mappers = "each" run_setup_classes = "each" @classmethod def define_tables(cls, metadata): Table( "parents", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(30)), ) Table( "children", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("parent_id", Integer, ForeignKey("parents.id")), Column("name", String(30)), ) @classmethod def insert_data(cls, connection): parents = cls.tables.parents connection.execute(parents.insert(), dict(name="p1")) @classmethod def setup_classes(cls): Parent.kids = association_proxy("children", "name") def test_weak_identity_map(self): self.mapper_registry.map_imperatively( Parent, self.tables.parents, properties=dict(children=relationship(Child)), ) self.mapper_registry.map_imperatively(Child, self.tables.children) session = fixture_session() def add_child(parent_name, child_name): parent = session.query(Parent).filter_by(name=parent_name).one() parent.kids.append(child_name) add_child("p1", "c1") gc_collect() add_child("p1", "c2") session.flush() p = session.query(Parent).filter_by(name="p1").one() assert set(p.kids) == {"c1", "c2"}, p.kids def test_copy(self): self.mapper_registry.map_imperatively( Parent, self.tables.parents, properties=dict(children=relationship(Child)), ) self.mapper_registry.map_imperatively(Child, self.tables.children) p = Parent("p1") p.kids.extend(["c1", "c2"]) p_copy = copy.copy(p) del p gc_collect() assert set(p_copy.kids) == {"c1", "c2"}, p_copy.kids def test_pickle_list(self): self.mapper_registry.map_imperatively( Parent, self.tables.parents, properties=dict(children=relationship(Child)), ) self.mapper_registry.map_imperatively(Child, self.tables.children) p = Parent("p1") p.kids.extend(["c1", "c2"]) r1 = pickle.loads(pickle.dumps(p)) assert r1.kids == ["c1", "c2"] # can't do this without parent having a cycle # r2 = pickle.loads(pickle.dumps(p.kids)) # assert r2 == ['c1', 'c2'] def test_pickle_set(self): self.mapper_registry.map_imperatively( Parent, self.tables.parents, properties=dict( children=relationship(Child, collection_class=set) ), ) self.mapper_registry.map_imperatively(Child, self.tables.children) p = Parent("p1") p.kids.update(["c1", "c2"]) r1 = pickle.loads(pickle.dumps(p)) assert r1.kids == {"c1", "c2"} # can't do this without parent having a cycle # r2 = pickle.loads(pickle.dumps(p.kids)) # assert r2 == set(['c1', 'c2']) def test_pickle_dict(self): self.mapper_registry.map_imperatively( Parent, self.tables.parents, properties=dict( children=relationship( KVChild, collection_class=collections.keyfunc_mapping( PickleKeyFunc("name") ), ) ), ) self.mapper_registry.map_imperatively(KVChild, self.tables.children) p = Parent("p1") p.kids.update({"c1": "v1", "c2": "v2"}) assert p.kids == {"c1": "c1", "c2": "c2"} r1 = pickle.loads(pickle.dumps(p)) assert r1.kids == {"c1": "c1", "c2": "c2"} # can't do this without parent having a cycle # r2 = pickle.loads(pickle.dumps(p.kids)) # assert r2 == {'c1': 'c1', 'c2': 'c2'} class PickleKeyFunc: def __init__(self, name): self.name = name def __call__(self, obj): return getattr(obj, self.name) class ComparatorTest(fixtures.MappedTest, AssertsCompiledSQL): __dialect__ = "default" run_inserts = "once" run_deletes = None run_setup_mappers = "once" run_setup_classes = "once" @classmethod def define_tables(cls, metadata): Table( "userkeywords", metadata, Column( "keyword_id", Integer, ForeignKey("keywords.id"), primary_key=True, ), Column("user_id", Integer, ForeignKey("users.id")), Column("value", String(50)), ) Table( "users", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("name", String(64)), Column("singular_id", Integer, ForeignKey("singular.id")), ) Table( "keywords", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("keyword", String(64)), Column("singular_id", Integer, ForeignKey("singular.id")), ) Table( "singular", metadata, Column( "id", Integer, primary_key=True, test_needs_autoincrement=True ), Column("value", String(50)), ) @classmethod def setup_classes(cls): class User(cls.Comparable): def __init__(self, name): self.name = name # o2m -> m2o # uselist -> nonuselist keywords = association_proxy( "user_keywords", "keyword", creator=lambda k: UserKeyword(keyword=k), ) # m2o -> o2m # nonuselist -> uselist singular_keywords = association_proxy("singular", "keywords") # m2o -> scalar # nonuselist singular_value = association_proxy("singular", "value") # o2m -> scalar singular_collection = association_proxy("user_keywords", "value") # uselist assoc_proxy -> assoc_proxy -> obj common_users = association_proxy("user_keywords", "common_users") # non uselist assoc_proxy -> assoc_proxy -> obj common_singular = association_proxy("singular", "keyword") # non uselist assoc_proxy -> assoc_proxy -> scalar singular_keyword = association_proxy("singular", "keyword") # uselist assoc_proxy -> assoc_proxy -> scalar common_keyword_name = association_proxy( "user_keywords", "keyword_name" ) class Keyword(cls.Comparable): def __init__(self, keyword): self.keyword = keyword # o2o -> m2o # nonuselist -> nonuselist user = association_proxy("user_keyword", "user") # uselist assoc_proxy -> collection -> assoc_proxy -> scalar object # (o2m relationship, # associationproxy(m2o relationship, m2o relationship)) singulars = association_proxy("user_keywords", "singular") class UserKeyword(cls.Comparable): def __init__(self, user=None, keyword=None): self.user = user self.keyword = keyword common_users = association_proxy("keyword", "user") keyword_name = association_proxy("keyword", "keyword") singular = association_proxy("user", "singular") class Singular(cls.Comparable): def __init__(self, value=None): self.value = value keyword = association_proxy("keywords", "keyword") @classmethod def setup_mappers(cls): ( users, Keyword, UserKeyword, singular, userkeywords, User, keywords, Singular, ) = ( cls.tables.users, cls.classes.Keyword, cls.classes.UserKeyword, cls.tables.singular, cls.tables.userkeywords, cls.classes.User, cls.tables.keywords, cls.classes.Singular, ) cls.mapper_registry.map_imperatively( User, users, properties={"singular": relationship(Singular)} ) cls.mapper_registry.map_imperatively( Keyword, keywords, properties={ "user_keyword": relationship( UserKeyword, uselist=False, back_populates="keyword" ), "user_keywords": relationship(UserKeyword, viewonly=True), }, ) cls.mapper_registry.map_imperatively( UserKeyword, userkeywords, properties={ "user": relationship(User, backref="user_keywords"), "keyword": relationship( Keyword, back_populates="user_keyword" ), }, ) cls.mapper_registry.map_imperatively( Singular, singular, properties={"keywords": relationship(Keyword)} ) @classmethod def insert_data(cls, connection): UserKeyword, User, Keyword, Singular = ( cls.classes.UserKeyword, cls.classes.User, cls.classes.Keyword, cls.classes.Singular, ) session = Session(connection) words = ("quick", "brown", "fox", "jumped", "over", "the", "lazy") for ii in range(16): user = User("user%d" % ii) if ii % 2 == 0: user.singular = Singular( value=("singular%d" % ii) if ii % 4 == 0 else None ) session.add(user) for jj in words[(ii % len(words)) : ((ii + 3) % len(words))]: k = Keyword(jj) user.keywords.append(k) if ii % 2 == 0: user.singular.keywords.append(k) user.user_keywords[-1].value = "singular%d" % ii orphan = Keyword("orphan") orphan.user_keyword = UserKeyword(keyword=orphan, user=None) session.add(orphan) keyword_with_nothing = Keyword("kwnothing") session.add(keyword_with_nothing) session.commit() cls.u = user cls.kw = user.keywords[0] # TODO: this is not the correct pattern, use session per test cls.session = Session(testing.db) def _equivalent(self, q_proxy, q_direct): proxy_sql = q_proxy.statement.compile(dialect=default.DefaultDialect()) direct_sql = q_direct.statement.compile( dialect=default.DefaultDialect() ) eq_(str(proxy_sql), str(direct_sql)) eq_(q_proxy.all(), q_direct.all()) def test_no_straight_expr(self): User = self.classes.User assert_raises_message( NotImplementedError, "The association proxy can't be used as a plain column expression", func.foo, User.singular_value, ) assert_raises_message( NotImplementedError, "The association proxy can't be used as a plain column expression", self.session.query, User.singular_value, ) def test_filter_any_criterion_ul_scalar(self): UserKeyword, User = self.classes.UserKeyword, self.classes.User q1 = self.session.query(User).filter( User.singular_collection.any(UserKeyword.value == "singular8") ) self.assert_compile( q1, "SELECT users.id AS users_id, users.name AS users_name, " "users.singular_id AS users_singular_id " "FROM users " "WHERE EXISTS (SELECT 1 " "FROM userkeywords " "WHERE users.id = userkeywords.user_id AND " "userkeywords.value = :value_1)", checkparams={"value_1": "singular8"}, ) q2 = self.session.query(User).filter( User.user_keywords.any(UserKeyword.value == "singular8") ) self._equivalent(q1, q2) def test_filter_any_kwarg_ul_nul(self): UserKeyword, User = self.classes.UserKeyword, self.classes.User self._equivalent( self.session.query(User).filter( User.keywords.any(keyword="jumped") ), self.session.query(User).filter( User.user_keywords.any( UserKeyword.keyword.has(keyword="jumped") ) ), ) def test_filter_has_kwarg_nul_nul(self): UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword self._equivalent( self.session.query(Keyword).filter(Keyword.user.has(name="user2")), self.session.query(Keyword).filter( Keyword.user_keyword.has(UserKeyword.user.has(name="user2")) ), ) def test_filter_has_kwarg_nul_ul(self): User, Singular = self.classes.User, self.classes.Singular self._equivalent( self.session.query(User).filter( User.singular_keywords.any(keyword="jumped") ), self.session.query(User).filter( User.singular.has(Singular.keywords.any(keyword="jumped")) ), ) def test_filter_any_criterion_ul_nul(self): UserKeyword, User, Keyword = ( self.classes.UserKeyword, self.classes.User, self.classes.Keyword, ) self._equivalent( self.session.query(User).filter( User.keywords.any(Keyword.keyword == "jumped") ), self.session.query(User).filter( User.user_keywords.any( UserKeyword.keyword.has(Keyword.keyword == "jumped") ) ), ) def test_filter_has_criterion_nul_nul(self): UserKeyword, User, Keyword = ( self.classes.UserKeyword, self.classes.User, self.classes.Keyword, ) self._equivalent( self.session.query(Keyword).filter( Keyword.user.has(User.name == "user2") ), self.session.query(Keyword).filter( Keyword.user_keyword.has( UserKeyword.user.has(User.name == "user2") ) ), ) def test_filter_any_criterion_nul_ul(self): User, Keyword, Singular = ( self.classes.User, self.classes.Keyword, self.classes.Singular, ) self._equivalent( self.session.query(User).filter( User.singular_keywords.any(Keyword.keyword == "jumped") ), self.session.query(User).filter( User.singular.has( Singular.keywords.any(Keyword.keyword == "jumped") ) ), ) def test_filter_contains_ul_nul(self): User = self.classes.User self._equivalent( self.session.query(User).filter(User.keywords.contains(self.kw)), self.session.query(User).filter( User.user_keywords.any(keyword=self.kw) ), ) def test_filter_contains_nul_ul(self): User, Singular = self.classes.User, self.classes.Singular with expect_warnings( "Got None for value of column keywords.singular_id;" ): self._equivalent( self.session.query(User).filter( User.singular_keywords.contains(self.kw) ), self.session.query(User).filter( User.singular.has(Singular.keywords.contains(self.kw)) ), ) def test_filter_eq_nul_nul(self): Keyword = self.classes.Keyword self._equivalent( self.session.query(Keyword).filter(Keyword.user == self.u), self.session.query(Keyword).filter( Keyword.user_keyword.has(user=self.u) ), ) def test_filter_ne_nul_nul(self): Keyword = self.classes.Keyword UserKeyword = self.classes.UserKeyword self._equivalent( self.session.query(Keyword).filter(Keyword.user != self.u), self.session.query(Keyword).filter( Keyword.user_keyword.has(UserKeyword.user != self.u) ), ) def test_filter_eq_null_nul_nul(self): UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword self._equivalent( self.session.query(Keyword).filter(Keyword.user == None), # noqa self.session.query(Keyword).filter( or_( Keyword.user_keyword.has(UserKeyword.user == None), Keyword.user_keyword == None, ) ), ) def test_filter_ne_null_nul_nul(self): UserKeyword, Keyword = self.classes.UserKeyword, self.classes.Keyword self._equivalent( self.session.query(Keyword).filter(Keyword.user != None), # noqa self.session.query(Keyword).filter( Keyword.user_keyword.has(UserKeyword.user != None) ), ) def test_filter_object_eq_None_nul(self): UserKeyword = self.classes.UserKeyword User = self.classes.User self._equivalent( self.session.query(UserKeyword).filter( UserKeyword.singular == None ), # noqa self.session.query(UserKeyword).filter( or_( UserKeyword.user.has(User.singular == None), UserKeyword.user_id == None, ) ), ) def test_filter_column_eq_None_nul(self): User = self.classes.User Singular = self.classes.Singular self._equivalent( self.session.query(User).filter( User.singular_value == None ), # noqa self.session.query(User).filter( or_( User.singular.has(Singular.value == None), User.singular == None, ) ), ) def test_filter_object_ne_value_nul(self): UserKeyword = self.classes.UserKeyword User = self.classes.User Singular = self.classes.Singular s4 = self.session.query(Singular).filter_by(value="singular4").one() self._equivalent( self.session.query(UserKeyword).filter(UserKeyword.singular != s4), self.session.query(UserKeyword).filter( UserKeyword.user.has(User.singular != s4) ), ) def test_filter_column_ne_value_nul(self): User = self.classes.User Singular = self.classes.Singular self._equivalent( self.session.query(User).filter( User.singular_value != "singular4" ), self.session.query(User).filter( User.singular.has(Singular.value != "singular4") ), ) def test_filter_eq_value_nul(self): User = self.classes.User Singular = self.classes.Singular self._equivalent( self.session.query(User).filter( User.singular_value == "singular4" ), self.session.query(User).filter( User.singular.has(Singular.value == "singular4") ), ) def test_filter_ne_None_nul(self): User = self.classes.User Singular = self.classes.Singular self._equivalent( self.session.query(User).filter( User.singular_value != None ), # noqa self.session.query(User).filter( User.singular.has(Singular.value != None) ), ) def test_has_nul(self): # a special case where we provide an empty has() on a # non-object-targeted association proxy. User = self.classes.User self.classes.Singular self._equivalent( self.session.query(User).filter(User.singular_value.has()), self.session.query(User).filter(User.singular.has()), ) def test_nothas_nul(self): # a special case where we provide an empty has() on a # non-object-targeted association proxy. User = self.classes.User self.classes.Singular self._equivalent( self.session.query(User).filter(~User.singular_value.has()), self.session.query(User).filter(~User.singular.has()), ) def test_filter_any_chained(self): User = self.classes.User UserKeyword, User = self.classes.UserKeyword, self.classes.User Keyword = self.classes.Keyword q1 = self.session.query(User).filter( User.common_users.any(User.name == "user7") ) self.assert_compile( q1, "SELECT users.id AS users_id, users.name AS users_name, " "users.singular_id AS users_singular_id " "FROM users " "WHERE EXISTS (SELECT 1 " "FROM userkeywords " "WHERE users.id = userkeywords.user_id AND (EXISTS (SELECT 1 " "FROM keywords " "WHERE keywords.id = userkeywords.keyword_id AND " "(EXISTS (SELECT 1 " "FROM userkeywords " "WHERE keywords.id = userkeywords.keyword_id AND " "(EXISTS (SELECT 1 " "FROM users " "WHERE users.id = userkeywords.user_id AND users.name = :name_1)" "))))))", checkparams={"name_1": "user7"}, ) q2 = self.session.query(User).filter( User.user_keywords.any( UserKeyword.keyword.has( Keyword.user_keyword.has( UserKeyword.user.has(User.name == "user7") ) ) ) ) self._equivalent(q1, q2) def test_filter_has_chained_has_to_any(self): User = self.classes.User Singular = self.classes.Singular Keyword = self.classes.Keyword q1 = self.session.query(User).filter( User.common_singular.has(Keyword.keyword == "brown") ) self.assert_compile( q1, "SELECT users.id AS users_id, users.name AS users_name, " "users.singular_id AS users_singular_id " "FROM users " "WHERE EXISTS (SELECT 1 " "FROM singular " "WHERE singular.id = users.singular_id AND (EXISTS (SELECT 1 " "FROM keywords " "WHERE singular.id = keywords.singular_id AND " "keywords.keyword = :keyword_1)))", checkparams={"keyword_1": "brown"}, ) q2 = self.session.query(User).filter( User.singular.has( Singular.keywords.any(Keyword.keyword == "brown") ) ) self._equivalent(q1, q2) def test_filter_has_scalar_raises(self): User = self.classes.User assert_raises_message( exc.ArgumentError, r"Can't apply keyword arguments to column-targeted", User.singular_keyword.has, keyword="brown", ) def test_filter_eq_chained_has_to_any(self): User = self.classes.User Keyword = self.classes.Keyword Singular = self.classes.Singular q1 = self.session.query(User).filter(User.singular_keyword == "brown") self.assert_compile( q1, "SELECT users.id AS users_id, users.name AS users_name, " "users.singular_id AS users_singular_id " "FROM users " "WHERE EXISTS (SELECT 1 " "FROM singular " "WHERE singular.id = users.singular_id AND (EXISTS (SELECT 1 " "FROM keywords " "WHERE singular.id = keywords.singular_id " "AND keywords.keyword = :keyword_1)))", checkparams={"keyword_1": "brown"}, ) q2 = self.session.query(User).filter( User.singular.has( Singular.keywords.any(Keyword.keyword == "brown") ) ) self._equivalent(q1, q2) def test_filter_contains_chained_any_to_has(self): User = self.classes.User Keyword = self.classes.Keyword UserKeyword = self.classes.UserKeyword q1 = self.session.query(User).filter( User.common_keyword_name.contains("brown") ) self.assert_compile( q1, "SELECT users.id AS users_id, users.name AS users_name, " "users.singular_id AS users_singular_id " "FROM users " "WHERE EXISTS (SELECT 1 " "FROM userkeywords " "WHERE users.id = userkeywords.user_id AND (EXISTS (SELECT 1 " "FROM keywords " "WHERE keywords.id = userkeywords.keyword_id AND " "keywords.keyword = :keyword_1)))", checkparams={"keyword_1": "brown"}, ) q2 = self.session.query(User).filter( User.user_keywords.any( UserKeyword.keyword.has(Keyword.keyword == "brown") ) ) self._equivalent(q1, q2) def test_filter_contains_chained_any_to_has_to_eq(self): User = self.classes.User Keyword = self.classes.Keyword UserKeyword = self.classes.UserKeyword Singular = self.classes.Singular singular = self.session.query(Singular).order_by(Singular.id).first() q1 = self.session.query(Keyword).filter( Keyword.singulars.contains(singular) ) self.assert_compile( q1, "SELECT keywords.id AS keywords_id, " "keywords.keyword AS keywords_keyword, " "keywords.singular_id AS keywords_singular_id " "FROM keywords " "WHERE EXISTS (SELECT 1 " "FROM userkeywords " "WHERE keywords.id = userkeywords.keyword_id AND " "(EXISTS (SELECT 1 " "FROM users " "WHERE users.id = userkeywords.user_id AND " ":param_1 = users.singular_id)))", checkparams={"param_1": singular.id}, ) q2 = self.session.query(Keyword).filter( Keyword.user_keywords.any( UserKeyword.user.has(User.singular == singular) ) ) self._equivalent(q1, q2) def test_has_criterion_nul(self): # but we don't allow that with any criterion... User = self.classes.User self.classes.Singular assert_raises_message( exc.ArgumentError, r"Non-empty has\(\) not allowed", User.singular_value.has, User.singular_value == "singular4", ) def test_has_kwargs_nul(self): # ... or kwargs User = self.classes.User self.classes.Singular assert_raises_message( exc.ArgumentError, r"Can't apply keyword arguments to column-targeted", User.singular_value.has, singular_value="singular4", ) def test_filter_scalar_object_contains_fails_nul_nul(self): Keyword = self.classes.Keyword assert_raises( exc.InvalidRequestError, lambda: Keyword.user.contains(self.u) ) def test_filter_scalar_object_any_fails_nul_nul(self): Keyword = self.classes.Keyword assert_raises( exc.InvalidRequestError, lambda: Keyword.user.any(name="user2") ) def test_filter_scalar_column_like(self): User = self.classes.User Singular = self.classes.Singular self._equivalent( self.session.query(User).filter(User.singular_value.like("foo")), self.session.query(User).filter( User.singular.has(Singular.value.like("foo")) ), ) def test_filter_scalar_column_contains(self): User = self.classes.User Singular = self.classes.Singular self._equivalent( self.session.query(User).filter( User.singular_value.contains("foo") ), self.session.query(User).filter( User.singular.has(Singular.value.contains("foo")) ), ) def test_filter_scalar_column_eq(self): User = self.classes.User Singular = self.classes.Singular self._equivalent( self.session.query(User).filter(User.singular_value == "foo"), self.session.query(User).filter( User.singular.has(Singular.value == "foo") ), ) def test_filter_scalar_column_ne(self): User = self.classes.User Singular = self.classes.Singular self._equivalent( self.session.query(User).filter(User.singular_value != "foo"), self.session.query(User).filter( User.singular.has(Singular.value != "foo") ), ) def test_filter_scalar_column_eq_nul(self): User = self.classes.User Singular = self.classes.Singular self._equivalent( self.session.query(User).filter(User.singular_value == None), self.session.query(User).filter( or_( User.singular.has(Singular.value == None), User.singular == None, ) ), ) def test_filter_collection_has_fails_ul_nul(self): User = self.classes.User assert_raises( exc.InvalidRequestError, lambda: User.keywords.has(keyword="quick") ) def test_filter_collection_eq_fails_ul_nul(self): User = self.classes.User assert_raises( exc.InvalidRequestError, lambda: User.keywords == self.kw ) def test_filter_collection_ne_fails_ul_nul(self): User = self.classes.User assert_raises( exc.InvalidRequestError, lambda: User.keywords != self.kw ) def test_join_separate_attr(self): User = self.classes.User self.assert_compile( self.session.query(User) .join(User.keywords.local_attr) .join(User.keywords.remote_attr), "SELECT users.id AS users_id, users.name AS users_name, " "users.singular_id AS users_singular_id " "FROM users JOIN userkeywords ON users.id = " "userkeywords.user_id JOIN keywords ON keywords.id = " "userkeywords.keyword_id", ) def test_join_single_attr(self): User = self.classes.User self.assert_compile( self.session.query(User) .join(User.keywords.attr[0]) .join(User.keywords.attr[1]), "SELECT users.id AS users_id, users.name AS users_name, " "users.singular_id AS users_singular_id " "FROM users JOIN userkeywords ON users.id = " "userkeywords.user_id JOIN keywords ON keywords.id = " "userkeywords.keyword_id", ) class DictOfTupleUpdateTest(fixtures.MappedTest): run_create_tables = None @classmethod def define_tables(cls, metadata): Table("a", metadata, Column("id", Integer, primary_key=True)) Table( "b", metadata, Column("id", Integer, primary_key=True), Column("aid", Integer, ForeignKey("a.id")), Column("elem", String), ) @classmethod def setup_mappers(cls): a, b = cls.tables("a", "b") class B(cls.Basic): def __init__(self, key, elem): self.key = key self.elem = elem class A(cls.Basic): elements = association_proxy("orig", "elem", creator=B) cls.mapper_registry.map_imperatively( A, a, properties={ "orig": relationship( B, collection_class=attribute_keyed_dict("key") ) }, ) cls.mapper_registry.map_imperatively(B, b) def test_update_one_elem_dict(self): a1 = self.classes.A() a1.elements.update({("B", 3): "elem2"}) eq_(a1.elements, {("B", 3): "elem2"}) def test_update_multi_elem_dict(self): a1 = self.classes.A() a1.elements.update({("B", 3): "elem2", ("C", 4): "elem3"}) eq_(a1.elements, {("B", 3): "elem2", ("C", 4): "elem3"}) def test_update_one_elem_list(self): a1 = self.classes.A() a1.elements.update([(("B", 3), "elem2")]) eq_(a1.elements, {("B", 3): "elem2"}) def test_update_multi_elem_list(self): a1 = self.classes.A() a1.elements.update([(("B", 3), "elem2"), (("C", 4), "elem3")]) eq_(a1.elements, {("B", 3): "elem2", ("C", 4): "elem3"}) def test_update_one_elem_varg(self): a1 = self.classes.A() assert_raises_message( ValueError, "dictionary update sequence element #1 has length 5; " "2 is required", a1.elements.update, (("B", 3), "elem2"), ) def test_update_multi_elem_varg(self): a1 = self.classes.A() assert_raises_message( TypeError, "update expected at most 1 arguments?, got 2", a1.elements.update, (("B", 3), "elem2"), (("C", 4), "elem3"), ) class CompositeAccessTest(fixtures.DeclarativeMappedTest): run_create_tables = None @classmethod def setup_classes(cls): class Point(cls.Basic): def __init__(self, x, y): self.x = x self.y = y def __composite_values__(self): return [self.x, self.y] __hash__ = None def __eq__(self, other): return ( isinstance(other, Point) and other.x == self.x and other.y == self.y ) def __ne__(self, other): return not isinstance(other, Point) or not self.__eq__(other) class Graph(cls.DeclarativeBasic): __tablename__ = "graph" id = Column( Integer, primary_key=True, test_needs_autoincrement=True ) name = Column(String(30)) point_data = relationship("PointData") points = association_proxy( "point_data", "point", creator=lambda point: PointData(point=point), ) class PointData(fixtures.ComparableEntity, cls.DeclarativeBasic): __tablename__ = "point" id = Column( Integer, primary_key=True, test_needs_autoincrement=True ) graph_id = Column(ForeignKey("graph.id")) x1 = Column(Integer) y1 = Column(Integer) point = composite(Point, x1, y1) return Point, Graph, PointData def test_append(self): Point, Graph, PointData = self.classes("Point", "Graph", "PointData") g1 = Graph() g1.points.append(Point(3, 5)) eq_(g1.point_data, [PointData(point=Point(3, 5))]) def test_access(self): Point, Graph, PointData = self.classes("Point", "Graph", "PointData") g1 = Graph() g1.point_data.append(PointData(point=Point(3, 5))) g1.point_data.append(PointData(point=Point(10, 7))) eq_(g1.points, [Point(3, 5), Point(10, 7)]) class AttributeAccessTest(fixtures.TestBase): def teardown_test(self): clear_mappers() def test_resolve_aliased_class(self): Base = declarative_base() class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) value = Column(String) class B(Base): __tablename__ = "b" id = Column(Integer, primary_key=True) a_id = Column(Integer, ForeignKey(A.id)) a = relationship(A) a_value = association_proxy("a", "value") spec = aliased(B).a_value is_(spec.owning_class, B) spec = B.a_value is_(spec.owning_class, B) def test_resolved_w_subclass(self): # test for issue #4185, as well as several below Base = declarative_base() class Mixin: @declared_attr def children(cls): return association_proxy("_children", "value") # 1. build parent, Mixin.children gets invoked, we add # Parent.children class Parent(Mixin, Base): __tablename__ = "parent" id = Column(Integer, primary_key=True) _children = relationship("Child") class Child(Base): __tablename__ = "child" parent_id = Column( Integer, ForeignKey(Parent.id), primary_key=True ) value = Column(String) # 2. declarative builds up SubParent, scans through all attributes # over all classes. Hits Mixin, hits "children", accesses "children" # in terms of the class, e.g. SubParent.children. SubParent isn't # mapped yet. association proxy then sets up "owning_class" # as NoneType. class SubParent(Parent): __tablename__ = "subparent" id = Column(Integer, ForeignKey(Parent.id), primary_key=True) configure_mappers() # 3. which would break here. p1 = Parent() eq_(p1.children, []) def test_resolved_to_correct_class_one(self): Base = declarative_base() class Parent(Base): __tablename__ = "parent" id = Column(Integer, primary_key=True) _children = relationship("Child") children = association_proxy("_children", "value") class Child(Base): __tablename__ = "child" parent_id = Column( Integer, ForeignKey(Parent.id), primary_key=True ) value = Column(String) class SubParent(Parent): __tablename__ = "subparent" id = Column(Integer, ForeignKey(Parent.id), primary_key=True) is_(SubParent.children.owning_class, SubParent) is_(Parent.children.owning_class, Parent) def test_resolved_to_correct_class_two(self): Base = declarative_base() class Parent(Base): __tablename__ = "parent" id = Column(Integer, primary_key=True) _children = relationship("Child") class Child(Base): __tablename__ = "child" parent_id = Column( Integer, ForeignKey(Parent.id), primary_key=True ) value = Column(String) class SubParent(Parent): __tablename__ = "subparent" id = Column(Integer, ForeignKey(Parent.id), primary_key=True) children = association_proxy("_children", "value") is_(SubParent.children.owning_class, SubParent) def test_resolved_to_correct_class_three(self): Base = declarative_base() class Parent(Base): __tablename__ = "parent" id = Column(Integer, primary_key=True) _children = relationship("Child") class Child(Base): __tablename__ = "child" parent_id = Column( Integer, ForeignKey(Parent.id), primary_key=True ) value = Column(String) class SubParent(Parent): __tablename__ = "subparent" id = Column(Integer, ForeignKey(Parent.id), primary_key=True) children = association_proxy("_children", "value") class SubSubParent(SubParent): __tablename__ = "subsubparent" id = Column(Integer, ForeignKey(SubParent.id), primary_key=True) is_(SubParent.children.owning_class, SubParent) is_(SubSubParent.children.owning_class, SubSubParent) def test_resolved_to_correct_class_four(self): Base = declarative_base() class Parent(Base): __tablename__ = "parent" id = Column(Integer, primary_key=True) _children = relationship("Child") children = association_proxy( "_children", "value", creator=lambda value: Child(value=value) ) class Child(Base): __tablename__ = "child" parent_id = Column( Integer, ForeignKey(Parent.id), primary_key=True ) value = Column(String) class SubParent(Parent): __tablename__ = "subparent" id = Column(Integer, ForeignKey(Parent.id), primary_key=True) sp = SubParent() sp.children = "c" is_(SubParent.children.owning_class, SubParent) is_(Parent.children.owning_class, Parent) def test_resolved_to_correct_class_five(self): Base = declarative_base() class Mixin: children = association_proxy("_children", "value") class Parent(Mixin, Base): __tablename__ = "parent" id = Column(Integer, primary_key=True) _children = relationship("Child") class Child(Base): __tablename__ = "child" parent_id = Column( Integer, ForeignKey(Parent.id), primary_key=True ) value = Column(String) # this triggers the owning routine, doesn't fail Mixin.children p1 = Parent() c1 = Child(value="c1") p1._children.append(c1) is_(Parent.children.owning_class, Parent) eq_(p1.children, ["c1"]) def _test_never_assign_nonetype(self): foo = association_proxy("x", "y") foo._calc_owner(None, None) is_(foo.owning_class, None) class Bat: foo = association_proxy("x", "y") Bat.foo is_(Bat.foo.owning_class, None) b1 = Bat() assert_raises_message( exc.InvalidRequestError, "This association proxy has no mapped owning class; " "can't locate a mapped property", getattr, b1, "foo", ) is_(Bat.foo.owning_class, None) # after all that, we can map it mapper( Bat, Table("bat", MetaData(), Column("x", Integer, primary_key=True)), ) # answer is correct is_(Bat.foo.owning_class, Bat) class ScalarRemoveTest: useobject = None cascade_scalar_deletes = None uselist = None @classmethod def setup_classes(cls): Base = cls.DeclarativeBasic class A(Base): __tablename__ = "test_a" id = Column(Integer, primary_key=True) ab = relationship("AB", backref="a", uselist=cls.uselist) b = association_proxy( "ab", "b", creator=lambda b: AB(b=b), cascade_scalar_deletes=cls.cascade_scalar_deletes, ) if cls.useobject: class B(Base): __tablename__ = "test_b" id = Column(Integer, primary_key=True) ab = relationship("AB", backref="b") class AB(Base): __tablename__ = "test_ab" a_id = Column(Integer, ForeignKey(A.id), primary_key=True) b_id = Column(Integer, ForeignKey(B.id), primary_key=True) else: class AB(Base): __tablename__ = "test_ab" b = Column(Integer) a_id = Column(Integer, ForeignKey(A.id), primary_key=True) def test_set_nonnone_to_none(self): if self.useobject: A, AB, B = self.classes("A", "AB", "B") else: A, AB = self.classes("A", "AB") a1 = A() b1 = B() if self.useobject else 5 if self.uselist: a1.b.append(b1) else: a1.b = b1 if self.uselist: assert isinstance(a1.ab[0], AB) else: assert isinstance(a1.ab, AB) if self.uselist: a1.b.remove(b1) else: a1.b = None if self.uselist: eq_(a1.ab, []) else: if self.cascade_scalar_deletes: assert a1.ab is None else: assert isinstance(a1.ab, AB) assert a1.ab.b is None def test_set_none_to_none(self): if self.uselist: return if self.useobject: A, AB, B = self.classes("A", "AB", "B") else: A, AB = self.classes("A", "AB") a1 = A() a1.b = None assert a1.ab is None def test_del_already_nonpresent(self): if self.useobject: A, AB, B = self.classes("A", "AB", "B") else: A, AB = self.classes("A", "AB") a1 = A() if self.uselist: del a1.b eq_(a1.ab, []) else: def go(): del a1.b assert_raises_message( AttributeError, "A.ab object does not have a value", go ) def test_del(self): if self.useobject: A, AB, B = self.classes("A", "AB", "B") else: A, AB = self.classes("A", "AB") b1 = B() if self.useobject else 5 a1 = A() if self.uselist: a1.b.append(b1) else: a1.b = b1 if self.uselist: assert isinstance(a1.ab[0], AB) else: assert isinstance(a1.ab, AB) del a1.b if self.uselist: eq_(a1.ab, []) else: assert a1.ab is None def test_del_no_proxy(self): if not self.uselist: return if self.useobject: A, AB, B = self.classes("A", "AB", "B") else: A, AB = self.classes("A", "AB") b1 = B() if self.useobject else 5 a1 = A() a1.b.append(b1) del a1.ab # this is what it does for now, so maintain that w/ assoc proxy eq_(a1.ab, []) def test_del_already_nonpresent_no_proxy(self): if not self.uselist: return if self.useobject: A, AB, B = self.classes("A", "AB", "B") else: A, AB = self.classes("A", "AB") a1 = A() del a1.ab # this is what it does for now, so maintain that w/ assoc proxy eq_(a1.ab, []) class ScalarRemoveListObjectCascade( ScalarRemoveTest, fixtures.DeclarativeMappedTest ): run_create_tables = None useobject = True cascade_scalar_deletes = True uselist = True class ScalarRemoveScalarObjectCascade( ScalarRemoveTest, fixtures.DeclarativeMappedTest ): run_create_tables = None useobject = True cascade_scalar_deletes = True uselist = False class ScalarRemoveListScalarCascade( ScalarRemoveTest, fixtures.DeclarativeMappedTest ): run_create_tables = None useobject = False cascade_scalar_deletes = True uselist = True class ScalarRemoveScalarScalarCascade( ScalarRemoveTest, fixtures.DeclarativeMappedTest ): run_create_tables = None useobject = False cascade_scalar_deletes = True uselist = False class ScalarRemoveListObjectNoCascade( ScalarRemoveTest, fixtures.DeclarativeMappedTest ): run_create_tables = None useobject = True cascade_scalar_deletes = False uselist = True class ScalarRemoveScalarObjectNoCascade( ScalarRemoveTest, fixtures.DeclarativeMappedTest ): run_create_tables = None useobject = True cascade_scalar_deletes = False uselist = False class ScalarRemoveListScalarNoCascade( ScalarRemoveTest, fixtures.DeclarativeMappedTest ): run_create_tables = None useobject = False cascade_scalar_deletes = False uselist = True class ScalarRemoveScalarScalarNoCascade( ScalarRemoveTest, fixtures.DeclarativeMappedTest ): run_create_tables = None useobject = False cascade_scalar_deletes = False uselist = False class InfoTest(fixtures.TestBase): def test_constructor(self): assoc = association_proxy("a", "b", info={"some_assoc": "some_value"}) eq_(assoc.info, {"some_assoc": "some_value"}) def test_empty(self): assoc = association_proxy("a", "b") eq_(assoc.info, {}) def test_via_cls(self): class Foob: assoc = association_proxy("a", "b") eq_(Foob.assoc.info, {}) Foob.assoc.info["foo"] = "bar" eq_(Foob.assoc.info, {"foo": "bar"}) class OnlyRelationshipTest(fixtures.DeclarativeMappedTest): run_define_tables = None run_create_tables = None run_inserts = None run_deletes = None @classmethod def setup_classes(cls): Base = cls.DeclarativeBasic class Foo(Base): __tablename__ = "foo" id = Column(Integer, primary_key=True) foo = Column(String) # assume some composite datatype bar = association_proxy("foo", "attr") def test_setattr(self): Foo = self.classes.Foo f1 = Foo() assert_raises_message( NotImplementedError, "association proxy to a non-relationship " "intermediary is not supported", setattr, f1, "bar", "asdf", ) def test_getattr(self): Foo = self.classes.Foo f1 = Foo() assert_raises_message( NotImplementedError, "association proxy to a non-relationship " "intermediary is not supported", getattr, f1, "bar", ) def test_get_class_attr(self): Foo = self.classes.Foo assert_raises_message( NotImplementedError, "association proxy to a non-relationship " "intermediary is not supported", getattr, Foo, "bar", ) class MultiOwnerTest( fixtures.DeclarativeMappedTest, testing.AssertsCompiledSQL ): __dialect__ = "default" run_define_tables = "each" run_create_tables = None run_inserts = None run_deletes = None run_setup_classes = "each" run_setup_mappers = "each" @classmethod def setup_classes(cls): Base = cls.DeclarativeBasic class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) type = Column(String(5), nullable=False) d_values = association_proxy("ds", "value") __mapper_args__ = {"polymorphic_on": type} class B(A): __tablename__ = "b" id = Column(ForeignKey("a.id"), primary_key=True) c1_id = Column(ForeignKey("c1.id")) ds = relationship("D", primaryjoin="D.b_id == B.id") __mapper_args__ = {"polymorphic_identity": "b"} class C(A): __tablename__ = "c" id = Column(ForeignKey("a.id"), primary_key=True) ds = relationship( "D", primaryjoin="D.c_id == C.id", back_populates="c" ) __mapper_args__ = {"polymorphic_identity": "c"} class C1(C): __tablename__ = "c1" id = Column(ForeignKey("c.id"), primary_key=True) csub_only_data = relationship("B") # uselist=True relationship ds = relationship( "D", primaryjoin="D.c1_id == C1.id", back_populates="c" ) __mapper_args__ = {"polymorphic_identity": "c1"} class C2(C): __tablename__ = "c2" id = Column(ForeignKey("c.id"), primary_key=True) csub_only_data = Column(String(50)) # scalar Column ds = relationship( "D", primaryjoin="D.c2_id == C2.id", back_populates="c" ) __mapper_args__ = {"polymorphic_identity": "c2"} class D(Base): __tablename__ = "d" id = Column(Integer, primary_key=True) value = Column(String(50)) b_id = Column(ForeignKey("b.id")) c_id = Column(ForeignKey("c.id")) c1_id = Column(ForeignKey("c1.id")) c2_id = Column(ForeignKey("c2.id")) c = relationship("C", primaryjoin="D.c_id == C.id") c_data = association_proxy("c", "csub_only_data") def _assert_raises_ambiguous(self, fn, *arg, **kw): assert_raises_message( AttributeError, "Association proxy D.c refers to an attribute 'csub_only_data'", fn, *arg, **kw, ) def _assert_raises_attribute(self, message, fn, *arg, **kw): assert_raises_message(AttributeError, message, fn, *arg, **kw) def test_column_collection_expressions(self): B, C, C2 = self.classes("B", "C", "C2") self.assert_compile( B.d_values.contains("b1"), "EXISTS (SELECT 1 FROM d, b WHERE d.b_id = b.id " "AND (d.value LIKE '%' || :value_1 || '%'))", ) self.assert_compile( C2.d_values.contains("c2"), "EXISTS (SELECT 1 FROM d, c2 WHERE d.c2_id = c2.id " "AND (d.value LIKE '%' || :value_1 || '%'))", ) self.assert_compile( C.d_values.contains("c1"), "EXISTS (SELECT 1 FROM d, c WHERE d.c_id = c.id " "AND (d.value LIKE '%' || :value_1 || '%'))", ) def test_subclass_only_owner_none(self): D, C, C2 = self.classes("D", "C", "C2") d1 = D() eq_(d1.c_data, None) def test_subclass_only_owner_assign(self): D, C, C2 = self.classes("D", "C", "C2") d1 = D(c=C2()) d1.c_data = "some c2" eq_(d1.c_data, "some c2") def test_subclass_only_owner_get(self): D, C, C2 = self.classes("D", "C", "C2") d1 = D(c=C2(csub_only_data="some c2")) eq_(d1.c_data, "some c2") def test_subclass_only_owner_none_raise(self): D, C, C2 = self.classes("D", "C", "C2") d1 = D() eq_(d1.c_data, None) def test_subclass_only_owner_delete(self): D, C, C2 = self.classes("D", "C", "C2") d1 = D(c=C2(csub_only_data="some c2")) eq_(d1.c.csub_only_data, "some c2") del d1.c_data assert not hasattr(d1.c, "csub_only_data") def test_subclass_only_owner_assign_passes(self): D, C, C2 = self.classes("D", "C", "C2") d1 = D(c=C()) d1.c_data = "some c1" # not mapped, but we set it eq_(d1.c.csub_only_data, "some c1") def test_subclass_only_owner_get_raises(self): D, C, C2 = self.classes("D", "C", "C2") d1 = D(c=C()) self._assert_raises_attribute( "'C' object has no attribute 'csub_only_data'", getattr, d1, "c_data", ) def test_subclass_only_owner_delete_raises(self): D, C, C2 = self.classes("D", "C", "C2") d1 = D(c=C2(csub_only_data="some c2")) eq_(d1.c_data, "some c2") # now switch d1.c = C() self._assert_raises_attribute("csub_only_data", delattr, d1, "c_data") def test_subclasses_conflicting_types(self): B, D, C, C1, C2 = self.classes("B", "D", "C", "C1", "C2") bs = [B(), B()] d1 = D(c=C1(csub_only_data=bs)) d2 = D(c=C2(csub_only_data="some c2")) association_proxy_object = inspect(D).all_orm_descriptors["c_data"] inst1 = association_proxy_object.for_class(D, d1) inst2 = association_proxy_object.for_class(D, d2) eq_(inst1._target_is_object, True) eq_(inst2._target_is_object, False) # both instances are cached inst0 = association_proxy_object.for_class(D) eq_(inst0._lookup_cache, {C1: inst1, C2: inst2}) # cache works is_(association_proxy_object.for_class(D, d1), inst1) is_(association_proxy_object.for_class(D, d2), inst2) def test_col_expressions_not_available(self): (D,) = self.classes("D") self._assert_raises_ambiguous(lambda: D.c_data == 5) def test_rel_expressions_not_available(self): ( B, D, ) = self.classes("B", "D") self._assert_raises_ambiguous(lambda: D.c_data.any(B.id == 5)) class ProxyOfSynonymTest(AssertsCompiledSQL, fixtures.DeclarativeMappedTest): __dialect__ = "default" run_create_tables = None @classmethod def setup_classes(cls): from sqlalchemy.orm import synonym Base = cls.DeclarativeBasic class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) data = Column(String) bs = relationship("B", backref="a") data_syn = synonym("data") b_data = association_proxy("bs", "data_syn") class B(Base): __tablename__ = "b" id = Column(Integer, primary_key=True) a_id = Column(ForeignKey("a.id")) data = Column(String) data_syn = synonym("data") a_data = association_proxy("a", "data_syn") def test_o2m_instance_getter(self): A, B = self.classes("A", "B") a1 = A(bs=[B(data="bdata1"), B(data="bdata2")]) eq_(a1.b_data, ["bdata1", "bdata2"]) def test_m2o_instance_getter(self): A, B = self.classes("A", "B") b1 = B(a=A(data="adata")) eq_(b1.a_data, "adata") def test_o2m_expr(self): A, B = self.classes("A", "B") self.assert_compile( A.b_data == "foo", "EXISTS (SELECT 1 FROM b, a WHERE a.id = b.a_id " "AND b.data = :data_1)", ) class SynonymOfProxyTest(AssertsCompiledSQL, fixtures.DeclarativeMappedTest): __dialect__ = "default" run_create_tables = None @classmethod def setup_classes(cls): from sqlalchemy.orm import synonym Base = cls.DeclarativeBasic class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) data = Column(String) bs = relationship("B", backref="a") b_data = association_proxy("bs", "data") b_data_syn = synonym("b_data") class B(Base): __tablename__ = "b" id = Column(Integer, primary_key=True) a_id = Column(ForeignKey("a.id")) data = Column(String) def test_hasattr(self): A, B = self.classes("A", "B") is_false(hasattr(A.b_data_syn, "nonexistent")) def test_o2m_instance_getter(self): A, B = self.classes("A", "B") a1 = A(bs=[B(data="bdata1"), B(data="bdata2")]) eq_(a1.b_data_syn, ["bdata1", "bdata2"]) def test_o2m_expr(self): A, B = self.classes("A", "B") self.assert_compile( A.b_data_syn == "foo", "EXISTS (SELECT 1 FROM b, a WHERE a.id = b.a_id " "AND b.data = :data_1)", ) class ProxyHybridTest(fixtures.DeclarativeMappedTest, AssertsCompiledSQL): __dialect__ = "default" run_create_tables = None @classmethod def setup_classes(cls): from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.orm.interfaces import PropComparator Base = cls.DeclarativeBasic class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) bs = relationship("B") b_data = association_proxy("bs", "value") well_behaved_b_data = association_proxy("bs", "well_behaved_value") fails_on_class_access = association_proxy( "bs", "fails_on_class_access" ) class B(Base): __tablename__ = "b" id = Column(Integer, primary_key=True) aid = Column(ForeignKey("a.id")) data = Column(String(50)) @hybrid_property def well_behaved_value(self): return self.data @well_behaved_value.setter def well_behaved_value(self, value): self.data = value @hybrid_property def value(self): return self.data @value.setter def value(self, value): self.data = value @value.comparator class value(PropComparator): # comparator has no proxy __getattr__, so we can't # get to impl to see what we ar proxying towards. # as of #4690 we assume column-oriented proxying def __init__(self, cls): self.cls = cls @hybrid_property def well_behaved_w_expr(self): return self.data @well_behaved_w_expr.setter def well_behaved_w_expr(self, value): self.data = value @well_behaved_w_expr.expression def well_behaved_w_expr(cls): return cast(cls.data, Integer) @hybrid_property def fails_on_class_access(self): return len(self.data) class C(Base): __tablename__ = "c" id = Column(Integer, primary_key=True) b_id = Column(ForeignKey("b.id")) _b = relationship("B") attr = association_proxy("_b", "well_behaved_w_expr") def test_msg_fails_on_cls_access(self): A, B = self.classes("A", "B") a1 = A(bs=[B(data="b1")]) with expect_raises_message( exc.InvalidRequestError, "Association proxy received an unexpected error when trying to " 'retreive attribute "B.fails_on_class_access" from ' r'class "B": .* no len\(\)', ): a1.fails_on_class_access def test_get_ambiguous(self): A, B = self.classes("A", "B") a1 = A(bs=[B(data="b1")]) eq_(a1.b_data[0], "b1") def test_get_nonambiguous(self): A, B = self.classes("A", "B") a1 = A(bs=[B(data="b1")]) eq_(a1.well_behaved_b_data[0], "b1") def test_set_ambiguous(self): A, B = self.classes("A", "B") a1 = A(bs=[B()]) a1.b_data[0] = "b1" eq_(a1.b_data[0], "b1") def test_set_nonambiguous(self): A, B = self.classes("A", "B") a1 = A(bs=[B()]) a1.b_data[0] = "b1" eq_(a1.well_behaved_b_data[0], "b1") def test_expr_nonambiguous(self): A, B = self.classes("A", "B") eq_( str(A.well_behaved_b_data == 5), "EXISTS (SELECT 1 \nFROM b, a \nWHERE " "a.id = b.aid AND b.data = :data_1)", ) def test_get_classlevel_ambiguous(self): A, B = self.classes("A", "B") eq_( str(A.b_data), "ColumnAssociationProxyInstance" "(AssociationProxy('bs', 'value'))", ) def test_comparator_ambiguous(self): A, B = self.classes("A", "B") s = fixture_session() self.assert_compile( s.query(A).filter(A.b_data.any()), "SELECT a.id AS a_id FROM a WHERE EXISTS " "(SELECT 1 FROM b WHERE a.id = b.aid)", ) def test_explicit_expr(self): (C,) = self.classes("C") s = fixture_session() self.assert_compile( s.query(C).filter_by(attr=5), "SELECT c.id AS c_id, c.b_id AS c_b_id FROM c WHERE EXISTS " "(SELECT 1 FROM b WHERE b.id = c.b_id AND " "CAST(b.data AS INTEGER) = :param_1)", ) class ProxyPlainPropertyTest(fixtures.DeclarativeMappedTest): run_create_tables = None @classmethod def setup_classes(cls): Base = cls.DeclarativeBasic class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) bs = relationship("B") b_data = association_proxy("bs", "value") class B(Base): __tablename__ = "b" id = Column(Integer, primary_key=True) aid = Column(ForeignKey("a.id")) data = Column(String(50)) @property def value(self): return self.data @value.setter def value(self, value): self.data = value def test_get_ambiguous(self): A, B = self.classes("A", "B") a1 = A(bs=[B(data="b1")]) eq_(a1.b_data[0], "b1") def test_set_ambiguous(self): A, B = self.classes("A", "B") a1 = A(bs=[B()]) a1.b_data[0] = "b1" eq_(a1.b_data[0], "b1") def test_get_classlevel_ambiguous(self): A, B = self.classes("A", "B") eq_( str(A.b_data), "AmbiguousAssociationProxyInstance" "(AssociationProxy('bs', 'value'))", ) def test_expr_ambiguous(self): A, B = self.classes("A", "B") assert_raises_message( AttributeError, "Association proxy A.bs refers to an attribute " "'value' that is not directly mapped", lambda: A.b_data == 5, ) class ScopeBehaviorTest(fixtures.DeclarativeMappedTest): # test some GC scenarios, including issue #4268 @classmethod def setup_classes(cls): Base = cls.DeclarativeBasic class A(Base): __tablename__ = "a" id = Column(Integer, primary_key=True) data = Column(String(50)) bs = relationship("B") b_dyn = relationship("B", lazy="dynamic", viewonly=True) b_data = association_proxy("bs", "data") b_dynamic_data = association_proxy("bs", "data") class B(Base): __tablename__ = "b" id = Column(Integer, primary_key=True) aid = Column(ForeignKey("a.id")) data = Column(String(50)) @classmethod def insert_data(cls, connection): A, B = cls.classes("A", "B") s = Session(connection) s.add_all( [ A(id=1, bs=[B(data="b1"), B(data="b2")]), A(id=2, bs=[B(data="b3"), B(data="b4")]), ] ) s.commit() s.close() def test_plain_collection_gc(self): A, B = self.classes("A", "B") s = Session(testing.db) a1 = s.query(A).filter_by(id=1).one() a1bs = a1.bs # noqa del a1 gc_collect() assert (A, (1,), None) not in s.identity_map @testing.fails("dynamic relationship strong references parent") def test_dynamic_collection_gc(self): A, B = self.classes("A", "B") s = Session(testing.db) a1 = s.query(A).filter_by(id=1).one() a1bs = a1.b_dyn # noqa del a1 gc_collect() # also fails, AppenderQuery holds onto parent assert (A, (1,), None) not in s.identity_map @testing.fails("association proxy strong references parent") def test_associated_collection_gc(self): A, B = self.classes("A", "B") s = Session(testing.db) a1 = s.query(A).filter_by(id=1).one() a1bs = a1.b_data # noqa del a1 gc_collect() assert (A, (1,), None) not in s.identity_map @testing.fails("association proxy strong references parent") def test_associated_dynamic_gc(self): A, B = self.classes("A", "B") s = Session(testing.db) a1 = s.query(A).filter_by(id=1).one() a1bs = a1.b_dynamic_data # noqa del a1 gc_collect() assert (A, (1,), None) not in s.identity_map def test_plain_collection_iterate(self): A, B = self.classes("A", "B") s = Session(testing.db) a1 = s.query(A).filter_by(id=1).one() a1bs = a1.bs del a1 gc_collect() assert len(a1bs) == 2 def test_dynamic_collection_iterate(self): A, B = self.classes("A", "B") s = Session(testing.db) a1 = s.query(A).filter_by(id=1).one() a1bs = a1.b_dyn # noqa del a1 gc_collect() assert len(list(a1bs)) == 2 def test_associated_collection_iterate(self): A, B = self.classes("A", "B") s = Session(testing.db) a1 = s.query(A).filter_by(id=1).one() a1bs = a1.b_data del a1 gc_collect() assert len(a1bs) == 2 def test_associated_dynamic_iterate(self): A, B = self.classes("A", "B") s = Session(testing.db) a1 = s.query(A).filter_by(id=1).one() a1bs = a1.b_dynamic_data del a1 gc_collect() assert len(a1bs) == 2 class DeclOrmForms(fixtures.TestBase): """test issues related to #8880, #8878, #8876""" def test_straight_decl_usage(self, decl_base): """test use of assoc prox as the default descriptor for a dataclasses.field. """ class User(decl_base): __allow_unmapped__ = True __tablename__ = "user" id: Mapped[int] = mapped_column(primary_key=True) user_keyword_associations: Mapped[ List[UserKeywordAssociation] ] = relationship( back_populates="user", cascade="all, delete-orphan", ) keywords: AssociationProxy[list[str]] = association_proxy( "user_keyword_associations", "keyword" ) UserKeywordAssociation, Keyword = self._keyword_mapping( User, decl_base ) self._assert_keyword_assoc_mapping( User, UserKeywordAssociation, Keyword, init=True ) @testing.variation("embed_in_field", [True, False]) @testing.combinations( {}, {"repr": False}, {"repr": True}, ({"kw_only": True}, testing.requires.python310), {"init": False}, {"default_factory": True}, argnames="field_kw", ) def test_dc_decl_usage(self, dc_decl_base, embed_in_field, field_kw): """test use of assoc prox as the default descriptor for a dataclasses.field. This exercises #8880 """ if field_kw.pop("default_factory", False) and not embed_in_field: has_default_factory = True field_kw["default_factory"] = lambda: [ Keyword("l1"), Keyword("l2"), Keyword("l3"), ] else: has_default_factory = False class User(dc_decl_base): __allow_unmapped__ = True __tablename__ = "user" id: Mapped[int] = mapped_column( primary_key=True, repr=True, init=False ) user_keyword_associations: Mapped[ List[UserKeywordAssociation] ] = relationship( back_populates="user", cascade="all, delete-orphan", init=False, ) if embed_in_field: # this is an incorrect form to use with # MappedAsDataclass. However, we want to make sure it # works as kind of a test to ensure we are being as well # behaved as possible with an explicit dataclasses.field(), # by testing that it uses its normal descriptor-as-default # behavior keywords: AssociationProxy[list[str]] = dataclasses.field( default=association_proxy( "user_keyword_associations", "keyword" ), **field_kw, ) else: keywords: AssociationProxy[list[str]] = association_proxy( "user_keyword_associations", "keyword", **field_kw ) UserKeywordAssociation, Keyword = self._dc_keyword_mapping( User, dc_decl_base ) # simplify __qualname__ so we can test repr() more easily User.__qualname__ = "mod.User" UserKeywordAssociation.__qualname__ = "mod.UserKeywordAssociation" Keyword.__qualname__ = "mod.Keyword" init = field_kw.get("init", True) u1 = self._assert_keyword_assoc_mapping( User, UserKeywordAssociation, Keyword, init=init, has_default_factory=has_default_factory, ) if field_kw.get("repr", True): eq_( repr(u1), "mod.User(id=None, user_keyword_associations=[" "mod.UserKeywordAssociation(user_id=None, keyword_id=None, " "keyword=mod.Keyword(id=None, keyword='k1'), user=...), " "mod.UserKeywordAssociation(user_id=None, keyword_id=None, " "keyword=mod.Keyword(id=None, keyword='k2'), user=...), " "mod.UserKeywordAssociation(user_id=None, keyword_id=None, " "keyword=mod.Keyword(id=None, keyword='k3'), user=...)], " "keywords=[mod.Keyword(id=None, keyword='k1'), " "mod.Keyword(id=None, keyword='k2'), " "mod.Keyword(id=None, keyword='k3')])", ) else: eq_( repr(u1), "mod.User(id=None, user_keyword_associations=[" "mod.UserKeywordAssociation(user_id=None, keyword_id=None, " "keyword=mod.Keyword(id=None, keyword='k1'), user=...), " "mod.UserKeywordAssociation(user_id=None, keyword_id=None, " "keyword=mod.Keyword(id=None, keyword='k2'), user=...), " "mod.UserKeywordAssociation(user_id=None, keyword_id=None, " "keyword=mod.Keyword(id=None, keyword='k3'), user=...)])", ) def _assert_keyword_assoc_mapping( self, User, UserKeywordAssociation, Keyword, *, init, has_default_factory=False, ): if not init: with expect_raises_message( TypeError, r"got an unexpected keyword argument 'keywords'" ): User(keywords=[Keyword("k1"), Keyword("k2"), Keyword("k3")]) if has_default_factory: u1 = User() eq_(u1.keywords, [Keyword("l1"), Keyword("l2"), Keyword("l3")]) eq_( [ka.keyword.keyword for ka in u1.user_keyword_associations], ["l1", "l2", "l3"], ) if init: u1 = User(keywords=[Keyword("k1"), Keyword("k2"), Keyword("k3")]) else: u1 = User() u1.keywords = [Keyword("k1"), Keyword("k2"), Keyword("k3")] eq_(u1.keywords, [Keyword("k1"), Keyword("k2"), Keyword("k3")]) eq_( [ka.keyword.keyword for ka in u1.user_keyword_associations], ["k1", "k2", "k3"], ) return u1 def _keyword_mapping(self, User, decl_base): class UserKeywordAssociation(decl_base): __tablename__ = "user_keyword" user_id: Mapped[int] = mapped_column( ForeignKey("user.id"), primary_key=True ) keyword_id: Mapped[int] = mapped_column( ForeignKey("keyword.id"), primary_key=True ) user: Mapped[User] = relationship( back_populates="user_keyword_associations", ) keyword: Mapped[Keyword] = relationship() def __init__(self, keyword=None, user=None): self.user = user self.keyword = keyword class Keyword(ComparableMixin, decl_base): __tablename__ = "keyword" id: Mapped[int] = mapped_column(primary_key=True) keyword: Mapped[str] = mapped_column() def __init__(self, keyword): self.keyword = keyword return UserKeywordAssociation, Keyword def _dc_keyword_mapping(self, User, dc_decl_base): class UserKeywordAssociation(dc_decl_base): __tablename__ = "user_keyword" user_id: Mapped[int] = mapped_column( ForeignKey("user.id"), primary_key=True, init=False ) keyword_id: Mapped[int] = mapped_column( ForeignKey("keyword.id"), primary_key=True, init=False ) keyword: Mapped[Keyword] = relationship(default=None) user: Mapped[User] = relationship( back_populates="user_keyword_associations", default=None ) class Keyword(dc_decl_base): __tablename__ = "keyword" id: Mapped[int] = mapped_column(primary_key=True, init=False) keyword: Mapped[str] = mapped_column(init=True) return UserKeywordAssociation, Keyword