import pickle from unittest.mock import call from unittest.mock import Mock from sqlalchemy import event from sqlalchemy import exc as sa_exc from sqlalchemy import testing from sqlalchemy.orm import attributes from sqlalchemy.orm import exc as orm_exc from sqlalchemy.orm import instrumentation from sqlalchemy.orm import NO_KEY from sqlalchemy.orm.collections import attribute_keyed_dict from sqlalchemy.orm.collections import collection from sqlalchemy.orm.state import InstanceState from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_false from sqlalchemy.testing import is_not from sqlalchemy.testing import is_true from sqlalchemy.testing import not_in from sqlalchemy.testing.assertions import assert_warns from sqlalchemy.testing.util import all_partial_orderings from sqlalchemy.testing.util import gc_collect # global for pickling tests MyTest = None MyTest2 = None def _set_callable(state, dict_, key, callable_): fn = InstanceState._instance_level_callable_processor( state.manager, callable_, key ) fn(state, dict_, None) def _register_attribute(class_, key, **kw): kw.setdefault("comparator", object()) kw.setdefault("parententity", object()) attributes.register_attribute(class_, key, **kw) class AttributeImplAPITest(fixtures.MappedTest): def _scalar_obj_fixture(self): class A: pass class B: pass instrumentation.register_class(A) instrumentation.register_class(B) _register_attribute(A, "b", uselist=False, useobject=True) return A, B def _collection_obj_fixture(self): class A: pass class B: pass instrumentation.register_class(A) instrumentation.register_class(B) _register_attribute(A, "b", uselist=True, useobject=True) return A, B def test_scalar_obj_remove_invalid(self): A, B = self._scalar_obj_fixture() a1 = A() b1 = B() b2 = B() A.b.impl.append( attributes.instance_state(a1), attributes.instance_dict(a1), b1, None, ) assert a1.b is b1 assert_raises_message( ValueError, "Object not " "associated with on attribute 'b'", A.b.impl.remove, attributes.instance_state(a1), attributes.instance_dict(a1), b2, None, ) def test_scalar_obj_pop_invalid(self): A, B = self._scalar_obj_fixture() a1 = A() b1 = B() b2 = B() A.b.impl.append( attributes.instance_state(a1), attributes.instance_dict(a1), b1, None, ) assert a1.b is b1 A.b.impl.pop( attributes.instance_state(a1), attributes.instance_dict(a1), b2, None, ) assert a1.b is b1 def test_scalar_obj_pop_valid(self): A, B = self._scalar_obj_fixture() a1 = A() b1 = B() A.b.impl.append( attributes.instance_state(a1), attributes.instance_dict(a1), b1, None, ) assert a1.b is b1 A.b.impl.pop( attributes.instance_state(a1), attributes.instance_dict(a1), b1, None, ) assert a1.b is None def test_collection_obj_remove_invalid(self): A, B = self._collection_obj_fixture() a1 = A() b1 = B() b2 = B() A.b.impl.append( attributes.instance_state(a1), attributes.instance_dict(a1), b1, None, ) assert a1.b == [b1] assert_raises_message( ValueError, r"list.remove\(.*?\): .* not in list", A.b.impl.remove, attributes.instance_state(a1), attributes.instance_dict(a1), b2, None, ) def test_collection_obj_pop_invalid(self): A, B = self._collection_obj_fixture() a1 = A() b1 = B() b2 = B() A.b.impl.append( attributes.instance_state(a1), attributes.instance_dict(a1), b1, None, ) assert a1.b == [b1] A.b.impl.pop( attributes.instance_state(a1), attributes.instance_dict(a1), b2, None, ) assert a1.b == [b1] def test_collection_obj_pop_valid(self): A, B = self._collection_obj_fixture() a1 = A() b1 = B() A.b.impl.append( attributes.instance_state(a1), attributes.instance_dict(a1), b1, None, ) assert a1.b == [b1] A.b.impl.pop( attributes.instance_state(a1), attributes.instance_dict(a1), b1, None, ) assert a1.b == [] class AttributesTest(fixtures.ORMTest): def setup_test(self): global MyTest, MyTest2 class MyTest: pass class MyTest2: pass def teardown_test(self): global MyTest, MyTest2 MyTest, MyTest2 = None, None def test_basic(self): class User: pass instrumentation.register_class(User) _register_attribute(User, "user_id", uselist=False, useobject=False) _register_attribute(User, "user_name", uselist=False, useobject=False) _register_attribute( User, "email_address", uselist=False, useobject=False ) u = User() u.user_id = 7 u.user_name = "john" u.email_address = "lala@123.com" self.assert_( u.user_id == 7 and u.user_name == "john" and u.email_address == "lala@123.com" ) attributes.instance_state(u)._commit_all(attributes.instance_dict(u)) self.assert_( u.user_id == 7 and u.user_name == "john" and u.email_address == "lala@123.com" ) u.user_name = "heythere" u.email_address = "foo@bar.com" self.assert_( u.user_id == 7 and u.user_name == "heythere" and u.email_address == "foo@bar.com" ) def test_pickleness(self): instrumentation.register_class(MyTest) instrumentation.register_class(MyTest2) _register_attribute(MyTest, "user_id", uselist=False, useobject=False) _register_attribute( MyTest, "user_name", uselist=False, useobject=False ) _register_attribute( MyTest, "email_address", uselist=False, useobject=False ) _register_attribute(MyTest2, "a", uselist=False, useobject=False) _register_attribute(MyTest2, "b", uselist=False, useobject=False) # shouldn't be pickling callables at the class level def somecallable(state, passive): return None _register_attribute( MyTest, "mt2", uselist=True, trackparent=True, callable_=somecallable, useobject=True, ) o = MyTest() o.mt2.append(MyTest2()) o.user_id = 7 o.mt2[0].a = "abcde" pk_o = pickle.dumps(o) o2 = pickle.loads(pk_o) pk_o2 = pickle.dumps(o2) # the above is kind of distrurbing, so let's do it again a little # differently. the string-id in serialization thing is just an # artifact of pickling that comes up in the first round-trip. # a -> b differs in pickle memoization of 'mt2', but b -> c will # serialize identically. o3 = pickle.loads(pk_o2) pk_o3 = pickle.dumps(o3) o4 = pickle.loads(pk_o3) # and lastly make sure we still have our data after all that. # identical serialzation is great, *if* it's complete :) self.assert_(o4.user_id == 7) self.assert_(o4.user_name is None) self.assert_(o4.email_address is None) self.assert_(len(o4.mt2) == 1) self.assert_(o4.mt2[0].a == "abcde") self.assert_(o4.mt2[0].b is None) @testing.requires.predictable_gc def test_state_gc(self): """test that InstanceState always has a dict, even after host object gc'ed.""" class Foo: pass instrumentation.register_class(Foo) f = Foo() state = attributes.instance_state(f) f.bar = "foo" eq_(state.dict, {"bar": "foo", state.manager.STATE_ATTR: state}) del f gc_collect() assert state.obj() is None assert state.dict == {} @testing.requires.predictable_gc def test_object_dereferenced_error(self): class Foo: pass class Bar: def __init__(self): gc_collect() instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "bars", uselist=True, useobject=True) assert_raises_message( orm_exc.ObjectDereferencedError, "Can't emit change event for attribute " "'Foo.bars' - parent object of type " "has been garbage collected.", lambda: Foo().bars.append(Bar()), ) def test_unmapped_instance_raises(self): class User: pass instrumentation.register_class(User) _register_attribute(User, "user_name", uselist=False, useobject=False) class Blog: name = User.user_name def go(): b = Blog() return b.name assert_raises( orm_exc.UnmappedInstanceError, go, ) def test_del_scalar_nonobject(self): class Foo: pass instrumentation.register_class(Foo) _register_attribute(Foo, "b", uselist=False, useobject=False) f1 = Foo() is_(f1.b, None) f1.b = 5 del f1.b is_(f1.b, None) f1 = Foo() def go(): del f1.b assert_raises_message( AttributeError, "Foo.b object does not have a value", go ) def test_del_scalar_object(self): class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "b", uselist=False, useobject=True) f1 = Foo() is_(f1.b, None) f1.b = Bar() del f1.b is_(f1.b, None) def go(): del f1.b assert_raises_message( AttributeError, "Foo.b object does not have a value", go ) def test_del_collection_object(self): class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "b", uselist=True, useobject=True) f1 = Foo() eq_(f1.b, []) f1.b = [Bar()] del f1.b eq_(f1.b, []) del f1.b eq_(f1.b, []) def test_deferred(self): class Foo: pass data = {"a": "this is a", "b": 12} def loader(state, keys, passive): for k in keys: state.dict[k] = data[k] return attributes.ATTR_WAS_SET instrumentation.register_class(Foo) manager = attributes.manager_of_class(Foo) manager.expired_attribute_loader = loader _register_attribute(Foo, "a", uselist=False, useobject=False) _register_attribute(Foo, "b", uselist=False, useobject=False) f = Foo() attributes.instance_state(f)._expire( attributes.instance_dict(f), set() ) eq_(f.a, "this is a") eq_(f.b, 12) f.a = "this is some new a" attributes.instance_state(f)._expire( attributes.instance_dict(f), set() ) eq_(f.a, "this is a") eq_(f.b, 12) attributes.instance_state(f)._expire( attributes.instance_dict(f), set() ) f.a = "this is another new a" eq_(f.a, "this is another new a") eq_(f.b, 12) attributes.instance_state(f)._expire( attributes.instance_dict(f), set() ) eq_(f.a, "this is a") eq_(f.b, 12) del f.a eq_(f.a, None) eq_(f.b, 12) attributes.instance_state(f)._commit_all( attributes.instance_dict(f), set() ) eq_(f.a, None) eq_(f.b, 12) def test_deferred_pickleable(self): data = {"a": "this is a", "b": 12} def loader(state, keys, passive): for k in keys: state.dict[k] = data[k] return attributes.ATTR_WAS_SET instrumentation.register_class(MyTest) manager = attributes.manager_of_class(MyTest) manager.expired_attribute_loader = loader _register_attribute(MyTest, "a", uselist=False, useobject=False) _register_attribute(MyTest, "b", uselist=False, useobject=False) m = MyTest() attributes.instance_state(m)._expire( attributes.instance_dict(m), set() ) assert "a" not in m.__dict__ m2 = pickle.loads(pickle.dumps(m)) assert "a" not in m2.__dict__ eq_(m2.a, "this is a") eq_(m2.b, 12) def test_list(self): class User: pass class Address: pass instrumentation.register_class(User) instrumentation.register_class(Address) _register_attribute(User, "user_id", uselist=False, useobject=False) _register_attribute(User, "user_name", uselist=False, useobject=False) _register_attribute(User, "addresses", uselist=True, useobject=True) _register_attribute( Address, "address_id", uselist=False, useobject=False ) _register_attribute( Address, "email_address", uselist=False, useobject=False ) u = User() u.user_id = 7 u.user_name = "john" u.addresses = [] a = Address() a.address_id = 10 a.email_address = "lala@123.com" u.addresses.append(a) self.assert_( u.user_id == 7 and u.user_name == "john" and u.addresses[0].email_address == "lala@123.com" ) ( u, attributes.instance_state(a)._commit_all( attributes.instance_dict(a) ), ) self.assert_( u.user_id == 7 and u.user_name == "john" and u.addresses[0].email_address == "lala@123.com" ) u.user_name = "heythere" a = Address() a.address_id = 11 a.email_address = "foo@bar.com" u.addresses.append(a) eq_(u.user_id, 7) eq_(u.user_name, "heythere") eq_(u.addresses[0].email_address, "lala@123.com") eq_(u.addresses[1].email_address, "foo@bar.com") def test_lazytrackparent(self): """test that the "hasparent" flag works properly when lazy loaders and backrefs are used """ class Post: pass class Blog: pass instrumentation.register_class(Post) instrumentation.register_class(Blog) # set up instrumented attributes with backrefs _register_attribute( Post, "blog", uselist=False, backref="posts", trackparent=True, useobject=True, ) _register_attribute( Blog, "posts", uselist=True, backref="blog", trackparent=True, useobject=True, ) # create objects as if they'd been freshly loaded from the database # (without history) b = Blog() p1 = Post() _set_callable( attributes.instance_state(b), attributes.instance_dict(b), "posts", lambda state, passive: [p1], ) _set_callable( attributes.instance_state(p1), attributes.instance_dict(p1), "blog", lambda state, passive: b, ) p1, attributes.instance_state(b)._commit_all( attributes.instance_dict(b) ) # no orphans (called before the lazy loaders fire off) assert attributes.has_parent(Blog, p1, "posts", optimistic=True) assert attributes.has_parent(Post, b, "blog", optimistic=True) # assert connections assert p1.blog is b assert p1 in b.posts # manual connections b2 = Blog() p2 = Post() b2.posts.append(p2) assert attributes.has_parent(Blog, p2, "posts") assert attributes.has_parent(Post, b2, "blog") def test_illegal_trackparent(self): class Post: pass class Blog: pass instrumentation.register_class(Post) instrumentation.register_class(Blog) _register_attribute(Post, "blog", useobject=True) assert_raises_message( AssertionError, "This AttributeImpl is not configured to track parents.", attributes.has_parent, Post, Blog(), "blog", ) assert_raises_message( AssertionError, "This AttributeImpl is not configured to track parents.", Post.blog.impl.sethasparent, "x", "x", True, ) def test_inheritance(self): """tests that attributes are polymorphic""" class Foo: pass class Bar(Foo): pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) def func1(state, passive): return "this is the foo attr" def func2(state, passive): return "this is the bar attr" def func3(state, passive): return "this is the shared attr" _register_attribute( Foo, "element", uselist=False, callable_=func1, useobject=True ) _register_attribute( Foo, "element2", uselist=False, callable_=func3, useobject=True ) _register_attribute( Bar, "element", uselist=False, callable_=func2, useobject=True ) x = Foo() y = Bar() assert x.element == "this is the foo attr" assert y.element == "this is the bar attr" assert x.element2 == "this is the shared attr" assert y.element2 == "this is the shared attr" def test_no_double_state(self): states = set() class Foo: def __init__(self): states.add(attributes.instance_state(self)) class Bar(Foo): def __init__(self): states.add(attributes.instance_state(self)) Foo.__init__(self) instrumentation.register_class(Foo) instrumentation.register_class(Bar) b = Bar() eq_(len(states), 1) eq_(list(states)[0].obj(), b) def test_inheritance2(self): """test that the attribute manager can properly traverse the managed attributes of an object, if the object is of a descendant class with managed attributes in the parent class""" class Foo: pass class Bar(Foo): pass class Element: _state = True instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "element", uselist=False, useobject=True) el = Element() x = Bar() x.element = el eq_( attributes.get_state_history( attributes.instance_state(x), "element" ), ([el], (), ()), ) attributes.instance_state(x)._commit_all(attributes.instance_dict(x)) added, unchanged, deleted = attributes.get_state_history( attributes.instance_state(x), "element" ) assert added == () assert unchanged == [el] def test_lazyhistory(self): """tests that history functions work with lazy-loading attributes""" class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), Bar(id=4)] def func1(state, passive): return "this is func 1" def func2(state, passive): return [bar1, bar2, bar3] _register_attribute( Foo, "col1", uselist=False, callable_=func1, useobject=True ) _register_attribute( Foo, "col2", uselist=True, callable_=func2, useobject=True ) _register_attribute(Bar, "id", uselist=False, useobject=True) x = Foo() attributes.instance_state(x)._commit_all(attributes.instance_dict(x)) x.col2.append(bar4) eq_( attributes.get_state_history(attributes.instance_state(x), "col2"), ([bar4], [bar1, bar2, bar3], []), ) def test_parenttrack(self): class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute( Foo, "element", uselist=False, trackparent=True, useobject=True ) _register_attribute( Bar, "element", uselist=False, trackparent=True, useobject=True ) f1 = Foo() f2 = Foo() b1 = Bar() b2 = Bar() f1.element = b1 b2.element = f2 assert attributes.has_parent(Foo, b1, "element") assert not attributes.has_parent(Foo, b2, "element") assert not attributes.has_parent(Foo, f2, "element") assert attributes.has_parent(Bar, f2, "element") b2.element = None assert not attributes.has_parent(Bar, f2, "element") # test that double assignment doesn't accidentally reset the # 'parent' flag. b3 = Bar() f4 = Foo() b3.element = f4 assert attributes.has_parent(Bar, f4, "element") b3.element = f4 assert attributes.has_parent(Bar, f4, "element") def test_descriptorattributes(self): """changeset: 1633 broke ability to use ORM to map classes with unusual descriptor attributes (for example, classes that inherit from ones implementing zope.interface.Interface). This is a simple regression test to prevent that defect.""" class des: def __get__(self, instance, owner): raise AttributeError("fake attribute") class Foo: A = des() instrumentation.register_class(Foo) instrumentation.unregister_class(Foo) def test_collectionclasses(self): class Foo: pass instrumentation.register_class(Foo) _register_attribute( Foo, "collection", uselist=True, typecallable=set, useobject=True ) assert attributes.manager_of_class(Foo).is_instrumented("collection") assert isinstance(Foo().collection, set) attributes.unregister_attribute(Foo, "collection") assert not attributes.manager_of_class(Foo).is_instrumented( "collection" ) try: _register_attribute( Foo, "collection", uselist=True, typecallable=dict, useobject=True, ) assert False except sa_exc.ArgumentError as e: assert ( str(e) == "Type InstrumentedDict must elect an appender " "method to be a collection class" ) class MyDict(dict): @collection.appender def append(self, item): self[item.foo] = item @collection.remover def remove(self, item): del self[item.foo] _register_attribute( Foo, "collection", uselist=True, typecallable=MyDict, useobject=True, ) assert isinstance(Foo().collection, MyDict) attributes.unregister_attribute(Foo, "collection") class MyColl: pass try: _register_attribute( Foo, "collection", uselist=True, typecallable=MyColl, useobject=True, ) assert False except sa_exc.ArgumentError as e: assert ( str(e) == "Type MyColl must elect an appender method to be a " "collection class" ) class MyColl: @collection.iterator def __iter__(self): return iter([]) @collection.appender def append(self, item): pass @collection.remover def remove(self, item): pass _register_attribute( Foo, "collection", uselist=True, typecallable=MyColl, useobject=True, ) try: Foo().collection assert True except sa_exc.ArgumentError: assert False def test_last_known_tracking(self): class Foo: pass instrumentation.register_class(Foo) _register_attribute(Foo, "a", useobject=False) _register_attribute(Foo, "b", useobject=False) _register_attribute(Foo, "c", useobject=False) f1 = Foo() state = attributes.instance_state(f1) f1.a = "a1" f1.b = "b1" f1.c = "c1" assert not state._last_known_values state._track_last_known_value("b") state._track_last_known_value("c") eq_( state._last_known_values, {"b": attributes.NO_VALUE, "c": attributes.NO_VALUE}, ) state._expire_attributes(state.dict, ["b"]) eq_(state._last_known_values, {"b": "b1", "c": attributes.NO_VALUE}) state._expire(state.dict, set()) eq_(state._last_known_values, {"b": "b1", "c": "c1"}) f1.b = "b2" eq_(state._last_known_values, {"b": attributes.NO_VALUE, "c": "c1"}) f1.c = "c2" eq_( state._last_known_values, {"b": attributes.NO_VALUE, "c": attributes.NO_VALUE}, ) state._expire(state.dict, set()) eq_(state._last_known_values, {"b": "b2", "c": "c2"}) class GetNoValueTest(fixtures.ORMTest): def _fixture(self, expected): class Foo: pass class Bar: pass def lazy_callable(state, passive): return expected instrumentation.register_class(Foo) instrumentation.register_class(Bar) if expected is not None: _register_attribute( Foo, "attr", useobject=True, uselist=False, callable_=lazy_callable, ) else: _register_attribute(Foo, "attr", useobject=True, uselist=False) f1 = self.f1 = Foo() return ( Foo.attr.impl, attributes.instance_state(f1), attributes.instance_dict(f1), ) def test_passive_no_result(self): attr, state, dict_ = self._fixture(attributes.PASSIVE_NO_RESULT) eq_( attr.get(state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE), attributes.PASSIVE_NO_RESULT, ) def test_passive_no_result_no_value(self): attr, state, dict_ = self._fixture(attributes.NO_VALUE) eq_( attr.get(state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE), attributes.PASSIVE_NO_RESULT, ) assert "attr" not in dict_ def test_passive_ret_no_value(self): attr, state, dict_ = self._fixture(attributes.NO_VALUE) eq_( attr.get(state, dict_, passive=attributes.PASSIVE_RETURN_NO_VALUE), attributes.NO_VALUE, ) assert "attr" not in dict_ def test_passive_ret_no_value_empty(self): attr, state, dict_ = self._fixture(None) eq_( attr.get(state, dict_, passive=attributes.PASSIVE_RETURN_NO_VALUE), attributes.NO_VALUE, ) assert "attr" not in dict_ def test_off_empty(self): attr, state, dict_ = self._fixture(None) eq_(attr.get(state, dict_, passive=attributes.PASSIVE_OFF), None) assert "attr" not in dict_ class UtilTest(fixtures.ORMTest): def test_helpers(self): class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "coll", uselist=True, useobject=True) f1 = Foo() b1 = Bar() b2 = Bar() coll = attributes.init_collection(f1, "coll") assert coll.data is f1.coll assert attributes.get_attribute(f1, "coll") is f1.coll attributes.set_attribute(f1, "coll", [b1]) assert f1.coll == [b1] eq_(attributes.get_history(f1, "coll"), ([b1], [], [])) attributes.set_committed_value(f1, "coll", [b2]) eq_(attributes.get_history(f1, "coll"), ((), [b2], ())) attributes.del_attribute(f1, "coll") assert "coll" not in f1.__dict__ def test_set_committed_value_none_uselist(self): """test that set_committed_value->None to a uselist generates an empty list""" class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "col_list", uselist=True, useobject=True) _register_attribute( Foo, "col_set", uselist=True, useobject=True, typecallable=set ) f1 = Foo() attributes.set_committed_value(f1, "col_list", None) eq_(f1.col_list, []) attributes.set_committed_value(f1, "col_set", None) eq_(f1.col_set, set()) def test_initiator_arg(self): class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "a", uselist=False, useobject=False) _register_attribute(Bar, "b", uselist=False, useobject=False) @event.listens_for(Foo.a, "set") def sync_a(target, value, oldvalue, initiator): parentclass = initiator.parent_token.class_ if parentclass is Foo: attributes.set_attribute(target.bar, "b", value, initiator) @event.listens_for(Bar.b, "set") def sync_b(target, value, oldvalue, initiator): parentclass = initiator.parent_token.class_ if parentclass is Bar: attributes.set_attribute(target.foo, "a", value, initiator) f1 = Foo() b1 = Bar() f1.bar = b1 b1.foo = f1 f1.a = "x" eq_(b1.b, "x") b1.b = "y" eq_(f1.a, "y") class BackrefTest(fixtures.ORMTest): def test_m2m(self): class Student: pass class Course: pass instrumentation.register_class(Student) instrumentation.register_class(Course) _register_attribute( Student, "courses", uselist=True, backref="students", useobject=True, ) _register_attribute( Course, "students", uselist=True, backref="courses", useobject=True ) s = Student() c = Course() s.courses.append(c) self.assert_(c.students == [s]) s.courses.remove(c) self.assert_(c.students == []) (s1, s2, s3) = (Student(), Student(), Student()) c.students = [s1, s2, s3] self.assert_(s2.courses == [c]) self.assert_(s1.courses == [c]) s1.courses.remove(c) self.assert_(c.students == [s2, s3]) def test_o2m(self): class Post: pass class Blog: pass instrumentation.register_class(Post) instrumentation.register_class(Blog) _register_attribute( Post, "blog", uselist=False, backref="posts", trackparent=True, useobject=True, ) _register_attribute( Blog, "posts", uselist=True, backref="blog", trackparent=True, useobject=True, ) b = Blog() (p1, p2, p3) = (Post(), Post(), Post()) b.posts.append(p1) b.posts.append(p2) b.posts.append(p3) self.assert_(b.posts == [p1, p2, p3]) self.assert_(p2.blog is b) p3.blog = None self.assert_(b.posts == [p1, p2]) p4 = Post() p4.blog = b self.assert_(b.posts == [p1, p2, p4]) p4.blog = b p4.blog = b self.assert_(b.posts == [p1, p2, p4]) # assert no failure removing None p5 = Post() p5.blog = None del p5.blog def test_o2o(self): class Port: pass class Jack: pass instrumentation.register_class(Port) instrumentation.register_class(Jack) _register_attribute( Port, "jack", uselist=False, useobject=True, backref="port" ) _register_attribute( Jack, "port", uselist=False, useobject=True, backref="jack" ) p = Port() j = Jack() p.jack = j self.assert_(j.port is p) self.assert_(p.jack is not None) j.port = None self.assert_(p.jack is None) def test_symmetric_o2o_inheritance(self): """Test that backref 'initiator' catching goes against a token that is global to all InstrumentedAttribute objects within a particular class, not just the individual IA object since we use distinct objects in an inheritance scenario. """ class Parent: pass class Child: pass class SubChild(Child): pass p_token = object() c_token = object() instrumentation.register_class(Parent) instrumentation.register_class(Child) instrumentation.register_class(SubChild) _register_attribute( Parent, "child", uselist=False, backref="parent", parent_token=p_token, useobject=True, ) _register_attribute( Child, "parent", uselist=False, backref="child", parent_token=c_token, useobject=True, ) _register_attribute( SubChild, "parent", uselist=False, backref="child", parent_token=c_token, useobject=True, ) p1 = Parent() c1 = Child() p1.child = c1 c2 = SubChild() c2.parent = p1 def test_symmetric_o2m_inheritance(self): class Parent: pass class SubParent(Parent): pass class Child: pass p_token = object() c_token = object() instrumentation.register_class(Parent) instrumentation.register_class(SubParent) instrumentation.register_class(Child) _register_attribute( Parent, "children", uselist=True, backref="parent", parent_token=p_token, useobject=True, ) _register_attribute( SubParent, "children", uselist=True, backref="parent", parent_token=p_token, useobject=True, ) _register_attribute( Child, "parent", uselist=False, backref="children", parent_token=c_token, useobject=True, ) p1 = Parent() p2 = SubParent() c1 = Child() p1.children.append(c1) assert c1.parent is p1 assert c1 in p1.children p2.children.append(c1) assert c1.parent is p2 # event propagates to remove as of [ticket:2789] assert c1 not in p1.children class CyclicBackrefAssertionTest(fixtures.TestBase): """test that infinite recursion due to incorrect backref assignments is blocked. """ def test_scalar_set_type_assertion(self): A, B, C = self._scalar_fixture() c1 = C() b1 = B() assert_raises_message( ValueError, "Bidirectional attribute conflict detected: " 'Passing object to attribute "C.a" ' 'triggers a modify event on attribute "C.b" ' 'via the backref "B.c".', setattr, c1, "a", b1, ) def test_collection_append_type_assertion(self): A, B, C = self._collection_fixture() c1 = C() b1 = B() assert_raises_message( ValueError, "Bidirectional attribute conflict detected: " 'Passing object to attribute "C.a" ' 'triggers a modify event on attribute "C.b" ' 'via the backref "B.c".', c1.a.append, b1, ) def _scalar_fixture(self): class A: pass class B: pass class C: pass instrumentation.register_class(A) instrumentation.register_class(B) instrumentation.register_class(C) _register_attribute(C, "a", backref="c", useobject=True) _register_attribute(C, "b", backref="c", useobject=True) _register_attribute(A, "c", backref="a", useobject=True, uselist=True) _register_attribute(B, "c", backref="b", useobject=True, uselist=True) return A, B, C def _collection_fixture(self): class A: pass class B: pass class C: pass instrumentation.register_class(A) instrumentation.register_class(B) instrumentation.register_class(C) _register_attribute(C, "a", backref="c", useobject=True, uselist=True) _register_attribute(C, "b", backref="c", useobject=True, uselist=True) _register_attribute(A, "c", backref="a", useobject=True) _register_attribute(B, "c", backref="b", useobject=True) return A, B, C def _broken_collection_fixture(self): class A: pass class B: pass instrumentation.register_class(A) instrumentation.register_class(B) _register_attribute(A, "b", backref="a1", useobject=True) _register_attribute(B, "a1", backref="b", useobject=True, uselist=True) _register_attribute(B, "a2", backref="b", useobject=True, uselist=True) return A, B def test_broken_collection_assertion(self): A, B = self._broken_collection_fixture() b1 = B() a1 = A() assert_raises_message( ValueError, "Bidirectional attribute conflict detected: " 'Passing object to attribute "B.a2" ' 'triggers a modify event on attribute "B.a1" ' 'via the backref "A.b".', b1.a2.append, a1, ) class PendingBackrefTest(fixtures.ORMTest): def _fixture(self): class Post: def __init__(self, name): self.name = name __hash__ = None def __eq__(self, other): return other is not None and other.name == self.name class Blog: def __init__(self, name): self.name = name __hash__ = None def __eq__(self, other): return other is not None and other.name == self.name lazy_posts = Mock() instrumentation.register_class(Post) instrumentation.register_class(Blog) _register_attribute( Post, "blog", uselist=False, backref="posts", trackparent=True, useobject=True, ) _register_attribute( Blog, "posts", uselist=True, backref="blog", callable_=lazy_posts, trackparent=True, useobject=True, ) return Post, Blog, lazy_posts def test_lazy_add(self): Post, Blog, lazy_posts = self._fixture() p1, p2, p3 = Post("post 1"), Post("post 2"), Post("post 3") lazy_posts.return_value = attributes.PASSIVE_NO_RESULT b = Blog("blog 1") b1_state = attributes.instance_state(b) p = Post("post 4") p.blog = b eq_( lazy_posts.mock_calls, [call(b1_state, attributes.PASSIVE_NO_FETCH)], ) p = Post("post 5") # setting blog doesn't call 'posts' callable, calls with no fetch p.blog = b eq_( lazy_posts.mock_calls, [ call(b1_state, attributes.PASSIVE_NO_FETCH), call(b1_state, attributes.PASSIVE_NO_FETCH), ], ) lazy_posts.return_value = [p1, p2, p3] # calling backref calls the callable, populates extra posts eq_(b.posts, [p1, p2, p3, Post("post 4"), Post("post 5")]) eq_( lazy_posts.mock_calls, [ call(b1_state, attributes.PASSIVE_NO_FETCH), call(b1_state, attributes.PASSIVE_NO_FETCH), call(b1_state, attributes.PASSIVE_OFF), ], ) def test_lazy_history_collection(self): Post, Blog, lazy_posts = self._fixture() p1, p2, p3 = Post("post 1"), Post("post 2"), Post("post 3") lazy_posts.return_value = [p1, p2, p3] b = Blog("blog 1") p = Post("post 4") p.blog = b p4 = Post("post 5") p4.blog = b eq_(lazy_posts.call_count, 1) eq_( attributes.instance_state(b).get_history( "posts", attributes.PASSIVE_OFF ), ([p, p4], [p1, p2, p3], []), ) eq_(lazy_posts.call_count, 1) def test_passive_history_collection_no_value(self): Post, Blog, lazy_posts = self._fixture() lazy_posts.return_value = attributes.PASSIVE_NO_RESULT b = Blog("blog 1") p = Post("post 1") state, dict_ = ( attributes.instance_state(b), attributes.instance_dict(b), ) # this sets up NO_VALUE on b.posts p.blog = b eq_(state.committed_state, {"posts": attributes.NO_VALUE}) assert "posts" not in dict_ # then suppose the object was made transient again, # the lazy loader would return this lazy_posts.return_value = attributes.ATTR_EMPTY p2 = Post("asdf") p2.blog = b eq_(state.committed_state, {"posts": attributes.NO_VALUE}) eq_(dict_["posts"], [p2]) # then this would fail. eq_( Blog.posts.impl.get_history( state, dict_, passive=attributes.PASSIVE_NO_INITIALIZE ), ([p2], (), ()), ) eq_( Blog.posts.impl.get_all_pending(state, dict_), [(attributes.instance_state(p2), p2)], ) def test_state_on_add_remove(self): Post, Blog, lazy_posts = self._fixture() lazy_posts.return_value = attributes.PASSIVE_NO_RESULT b = Blog("blog 1") b1_state = attributes.instance_state(b) p = Post("post 1") p.blog = b eq_( lazy_posts.mock_calls, [call(b1_state, attributes.PASSIVE_NO_FETCH)], ) p.blog = None eq_( lazy_posts.mock_calls, [ call(b1_state, attributes.PASSIVE_NO_FETCH), call(b1_state, attributes.PASSIVE_NO_FETCH), ], ) lazy_posts.return_value = [] eq_(b.posts, []) eq_( lazy_posts.mock_calls, [ call(b1_state, attributes.PASSIVE_NO_FETCH), call(b1_state, attributes.PASSIVE_NO_FETCH), call(b1_state, attributes.PASSIVE_OFF), ], ) def test_pending_combines_with_lazy(self): Post, Blog, lazy_posts = self._fixture() lazy_posts.return_value = attributes.PASSIVE_NO_RESULT b = Blog("blog 1") p = Post("post 1") p2 = Post("post 2") p.blog = b eq_(lazy_posts.call_count, 1) lazy_posts.return_value = [p, p2] # lazy loaded + pending get added together. # This isn't seen often with the ORM due # to usual practices surrounding the # load/flush/load cycle. eq_(b.posts, [p, p2, p]) eq_(lazy_posts.call_count, 2) def test_normal_load(self): Post, Blog, lazy_posts = self._fixture() lazy_posts.return_value = (p1, p2, p3) = [ Post("post 1"), Post("post 2"), Post("post 3"), ] b = Blog("blog 1") # assign without using backref system p2.__dict__["blog"] = b eq_(b.posts, [Post("post 1"), Post("post 2"), Post("post 3")]) eq_(lazy_posts.call_count, 1) p2.blog = None p4 = Post("post 4") p4.blog = b eq_(b.posts, [Post("post 1"), Post("post 3"), Post("post 4")]) b_state = attributes.instance_state(b) eq_(lazy_posts.call_count, 1) eq_(lazy_posts.mock_calls, [call(b_state, attributes.PASSIVE_OFF)]) def test_commit_removes_pending(self): Post, Blog, lazy_posts = self._fixture() p1 = Post("post 1") lazy_posts.return_value = attributes.PASSIVE_NO_RESULT b = Blog("blog 1") p1.blog = b b_state = attributes.instance_state(b) p1_state = attributes.instance_state(p1) b_state._commit_all(attributes.instance_dict(b)) p1_state._commit_all(attributes.instance_dict(p1)) lazy_posts.return_value = [p1] eq_(b.posts, [Post("post 1")]) eq_( lazy_posts.mock_calls, [ call(b_state, attributes.PASSIVE_NO_FETCH), call(b_state, attributes.PASSIVE_OFF), ], ) class HistoryTest(fixtures.TestBase): def _fixture(self, uselist, useobject, active_history, **kw): class Foo(fixtures.BasicEntity): pass instrumentation.register_class(Foo) _register_attribute( Foo, "someattr", uselist=uselist, useobject=useobject, active_history=active_history, **kw, ) return Foo def _two_obj_fixture(self, uselist, active_history=False): class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): def __bool__(self): assert False instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute( Foo, "someattr", uselist=uselist, useobject=True, active_history=active_history, ) return Foo, Bar def _someattr_history(self, f, **kw): passive = kw.pop("passive", None) if passive is True: kw["passive"] = attributes.PASSIVE_NO_INITIALIZE elif passive is False: kw["passive"] = attributes.PASSIVE_OFF return attributes.get_state_history( attributes.instance_state(f), "someattr", **kw ) def _commit_someattr(self, f): attributes.instance_state(f)._commit( attributes.instance_dict(f), ["someattr"] ) def _someattr_committed_state(self, f): Foo = f.__class__ return Foo.someattr.impl.get_committed_value( attributes.instance_state(f), attributes.instance_dict(f) ) def test_committed_value_init(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() eq_(self._someattr_committed_state(f), None) def test_committed_value_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = 3 eq_(self._someattr_committed_state(f), None) def test_committed_value_set_active_hist(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.someattr = 3 eq_(self._someattr_committed_state(f), None) def test_committed_value_set_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = 3 self._commit_someattr(f) eq_(self._someattr_committed_state(f), 3) def test_scalar_init(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() eq_(self._someattr_history(f), ((), (), ())) def test_object_init(self): Foo = self._fixture( uselist=False, useobject=True, active_history=False ) f = Foo() eq_(self._someattr_history(f), ((), (), ())) def test_object_init_active_history(self): Foo = self._fixture(uselist=False, useobject=True, active_history=True) f = Foo() eq_(self._someattr_history(f), ((), (), ())) def test_object_replace(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() b1, b2 = Bar(), Bar() f.someattr = b1 self._commit_someattr(f) f.someattr = b2 eq_(self._someattr_history(f), ([b2], (), [b1])) def test_object_set_none(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() b1 = Bar() f.someattr = b1 self._commit_someattr(f) f.someattr = None eq_(self._someattr_history(f), ([None], (), [b1])) def test_object_set_none_expired(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() b1 = Bar() f.someattr = b1 self._commit_someattr(f) attributes.instance_state(f).dict.pop("someattr", None) attributes.instance_state(f).expired_attributes.add("someattr") f.someattr = None eq_(self._someattr_history(f), ([None], (), ())) def test_object_del(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() b1 = Bar() f.someattr = b1 self._commit_someattr(f) del f.someattr eq_(self._someattr_history(f), ((), (), [b1])) def test_object_del_expired(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() b1 = Bar() f.someattr = b1 self._commit_someattr(f) # the "delete" handler checks if the object # is db-loaded when testing if an empty "del" is valid, # because there's nothing else to look at for a related # object, there's no "expired" status attributes.instance_state(f).key = ("foo",) attributes.instance_state(f)._expire_attributes( attributes.instance_dict(f), ["someattr"] ) del f.someattr eq_(self._someattr_history(f), ([None], (), ())) def test_scalar_no_init_side_effect(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() self._someattr_history(f) # no side effects assert "someattr" not in f.__dict__ assert "someattr" not in attributes.instance_state(f).committed_state def test_scalar_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = "hi" eq_(self._someattr_history(f), (["hi"], (), ())) def test_scalar_set_None(self): # note - compare: # test_scalar_set_None, # test_scalar_get_first_set_None, # test_use_object_set_None, # test_use_object_get_first_set_None Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = None eq_(self._someattr_history(f), ([None], (), ())) def test_scalar_del(self): # note - compare: # test_scalar_set_None, # test_scalar_get_first_set_None, # test_use_object_set_None, # test_use_object_get_first_set_None Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = 5 attributes.instance_state(f).key = ("foo",) self._commit_someattr(f) del f.someattr eq_(self._someattr_history(f), ((), (), [5])) def test_scalar_del_expired(self): # note - compare: # test_scalar_set_None, # test_scalar_get_first_set_None, # test_use_object_set_None, # test_use_object_get_first_set_None Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = 5 self._commit_someattr(f) attributes.instance_state(f)._expire_attributes( attributes.instance_dict(f), ["someattr"] ) del f.someattr eq_(self._someattr_history(f), ([None], (), ())) def test_scalar_get_first_set_None(self): # note - compare: # test_scalar_set_None, # test_scalar_get_first_set_None, # test_use_object_set_None, # test_use_object_get_first_set_None Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() assert f.someattr is None f.someattr = None eq_(self._someattr_history(f), ([None], (), ())) def test_scalar_set_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = "hi" self._commit_someattr(f) eq_(self._someattr_history(f), ((), ["hi"], ())) def test_scalar_set_commit_reset(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = "hi" self._commit_someattr(f) f.someattr = "there" eq_(self._someattr_history(f), (["there"], (), ["hi"])) def test_scalar_set_commit_reset_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = "hi" self._commit_someattr(f) f.someattr = "there" self._commit_someattr(f) eq_(self._someattr_history(f), ((), ["there"], ())) def test_scalar_set_commit_reset_commit_del(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = "there" self._commit_someattr(f) del f.someattr eq_(self._someattr_history(f), ((), (), ["there"])) def test_scalar_set_dict(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.__dict__["someattr"] = "new" eq_(self._someattr_history(f), ((), ["new"], ())) def test_scalar_set_dict_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.__dict__["someattr"] = "new" self._someattr_history(f) f.someattr = "old" eq_(self._someattr_history(f), (["old"], (), ["new"])) def test_scalar_set_dict_set_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.__dict__["someattr"] = "new" self._someattr_history(f) f.someattr = "old" self._commit_someattr(f) eq_(self._someattr_history(f), ((), ["old"], ())) def test_scalar_set_None_from_dict_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.__dict__["someattr"] = "new" f.someattr = None eq_(self._someattr_history(f), ([None], (), ["new"])) def test_scalar_set_twice_no_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = "one" eq_(self._someattr_history(f), (["one"], (), ())) f.someattr = "two" eq_(self._someattr_history(f), (["two"], (), ())) def test_scalar_active_init(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() eq_(self._someattr_history(f), ((), (), ())) def test_scalar_active_no_init_side_effect(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() self._someattr_history(f) # no side effects assert "someattr" not in f.__dict__ assert "someattr" not in attributes.instance_state(f).committed_state def test_collection_no_value(self): Foo = self._fixture(uselist=True, useobject=True, active_history=True) f = Foo() eq_(self._someattr_history(f, passive=True), ((), (), ())) def test_scalar_obj_no_value(self): Foo = self._fixture(uselist=False, useobject=True, active_history=True) f = Foo() eq_(self._someattr_history(f, passive=True), ((), (), ())) def test_scalar_no_value(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() eq_(self._someattr_history(f, passive=True), ((), (), ())) def test_scalar_active_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.someattr = "hi" eq_(self._someattr_history(f), (["hi"], (), ())) def test_scalar_active_set_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.someattr = "hi" self._commit_someattr(f) eq_(self._someattr_history(f), ((), ["hi"], ())) def test_scalar_active_set_commit_reset(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.someattr = "hi" self._commit_someattr(f) f.someattr = "there" eq_(self._someattr_history(f), (["there"], (), ["hi"])) def test_scalar_active_set_commit_reset_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.someattr = "hi" self._commit_someattr(f) f.someattr = "there" self._commit_someattr(f) eq_(self._someattr_history(f), ((), ["there"], ())) def test_scalar_active_set_commit_reset_commit_del(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.someattr = "there" self._commit_someattr(f) del f.someattr eq_(self._someattr_history(f), ((), (), ["there"])) def test_scalar_active_set_dict(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.__dict__["someattr"] = "new" eq_(self._someattr_history(f), ((), ["new"], ())) def test_scalar_active_set_dict_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.__dict__["someattr"] = "new" self._someattr_history(f) f.someattr = "old" eq_(self._someattr_history(f), (["old"], (), ["new"])) def test_scalar_active_set_dict_set_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.__dict__["someattr"] = "new" self._someattr_history(f) f.someattr = "old" self._commit_someattr(f) eq_(self._someattr_history(f), ((), ["old"], ())) def test_scalar_active_set_None(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.someattr = None eq_(self._someattr_history(f), ([None], (), ())) def test_scalar_active_set_None_from_dict_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.__dict__["someattr"] = "new" f.someattr = None eq_(self._someattr_history(f), ([None], (), ["new"])) def test_scalar_active_set_twice_no_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.someattr = "one" eq_(self._someattr_history(f), (["one"], (), ())) f.someattr = "two" eq_(self._someattr_history(f), (["two"], (), ())) def test_scalar_passive_flag(self): Foo = self._fixture( uselist=False, useobject=False, active_history=True ) f = Foo() f.someattr = "one" eq_(self._someattr_history(f), (["one"], (), ())) self._commit_someattr(f) state = attributes.instance_state(f) # do the same thing that # populators.expire.append((self.key, True)) # does in loading.py state.dict.pop("someattr", None) state.expired_attributes.add("someattr") def scalar_loader(state, toload, passive): state.dict["someattr"] = "one" state.manager.expired_attribute_loader = scalar_loader eq_(self._someattr_history(f), ((), ["one"], ())) def test_scalar_inplace_mutation_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = {"a": "b"} eq_(self._someattr_history(f), ([{"a": "b"}], (), ())) def test_scalar_inplace_mutation_set_commit(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = {"a": "b"} self._commit_someattr(f) eq_(self._someattr_history(f), ((), [{"a": "b"}], ())) def test_scalar_inplace_mutation_set_commit_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = {"a": "b"} self._commit_someattr(f) f.someattr["a"] = "c" eq_(self._someattr_history(f), ((), [{"a": "c"}], ())) def test_scalar_inplace_mutation_set_commit_flag_modified(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = {"a": "b"} self._commit_someattr(f) attributes.flag_modified(f, "someattr") eq_(self._someattr_history(f), ([{"a": "b"}], (), ())) def test_scalar_inplace_mutation_set_commit_set_flag_modified(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = {"a": "b"} self._commit_someattr(f) f.someattr["a"] = "c" attributes.flag_modified(f, "someattr") eq_(self._someattr_history(f), ([{"a": "c"}], (), ())) def test_scalar_inplace_mutation_set_commit_flag_modified_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = {"a": "b"} self._commit_someattr(f) attributes.flag_modified(f, "someattr") eq_(self._someattr_history(f), ([{"a": "b"}], (), ())) f.someattr = ["a"] eq_(self._someattr_history(f), ([["a"]], (), ())) def test_scalar_inplace_mutation_replace_self_flag_modified_set(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = {"a": "b"} self._commit_someattr(f) eq_(self._someattr_history(f), ((), [{"a": "b"}], ())) # set the attribute to itself; this places a copy # in committed_state f.someattr = f.someattr attributes.flag_modified(f, "someattr") eq_(self._someattr_history(f), ([{"a": "b"}], (), ())) def test_flag_modified_but_no_value_raises(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = "foo" self._commit_someattr(f) eq_(self._someattr_history(f), ((), ["foo"], ())) attributes.instance_state(f)._expire_attributes( attributes.instance_dict(f), ["someattr"] ) assert_raises_message( sa_exc.InvalidRequestError, "Can't flag attribute 'someattr' modified; it's " "not present in the object state", attributes.flag_modified, f, "someattr", ) def test_mark_dirty_no_attr(self): Foo = self._fixture( uselist=False, useobject=False, active_history=False ) f = Foo() f.someattr = "foo" attributes.instance_state(f)._commit_all(f.__dict__) eq_(self._someattr_history(f), ((), ["foo"], ())) attributes.instance_state(f)._expire_attributes( attributes.instance_dict(f), ["someattr"] ) is_false(attributes.instance_state(f).modified) attributes.flag_dirty(f) is_true(attributes.instance_state(f).modified) def test_use_object_init(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() eq_(self._someattr_history(f), ((), (), ())) def test_use_object_no_init_side_effect(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() self._someattr_history(f) assert "someattr" not in f.__dict__ assert "someattr" not in attributes.instance_state(f).committed_state def test_use_object_set(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") f.someattr = hi eq_(self._someattr_history(f), ([hi], (), ())) def test_use_object_set_commit(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") f.someattr = hi self._commit_someattr(f) eq_(self._someattr_history(f), ((), [hi], ())) def test_use_object_set_commit_set(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") f.someattr = hi self._commit_someattr(f) there = Bar(name="there") f.someattr = there eq_(self._someattr_history(f), ([there], (), [hi])) def test_use_object_set_commit_set_commit(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") f.someattr = hi self._commit_someattr(f) there = Bar(name="there") f.someattr = there self._commit_someattr(f) eq_(self._someattr_history(f), ((), [there], ())) def test_use_object_set_commit_del(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") f.someattr = hi self._commit_someattr(f) del f.someattr eq_(self._someattr_history(f), ((), (), [hi])) def test_use_object_set_dict(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") f.__dict__["someattr"] = hi eq_(self._someattr_history(f), ((), [hi], ())) def test_use_object_set_dict_set(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") f.__dict__["someattr"] = hi there = Bar(name="there") f.someattr = there eq_(self._someattr_history(f), ([there], (), [hi])) def test_use_object_set_dict_set_commit(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") f.__dict__["someattr"] = hi there = Bar(name="there") f.someattr = there self._commit_someattr(f) eq_(self._someattr_history(f), ((), [there], ())) def test_use_object_set_None(self): # note - compare: # test_scalar_set_None, # test_scalar_get_first_set_None, # test_use_object_set_None, # test_use_object_get_first_set_None Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() f.someattr = None eq_(self._someattr_history(f), ([None], (), ())) def test_use_object_get_first_set_None(self): # note - compare: # test_scalar_set_None, # test_scalar_get_first_set_None, # test_use_object_set_None, # test_use_object_get_first_set_None Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() assert f.someattr is None f.someattr = None eq_(self._someattr_history(f), ([None], (), ())) def test_use_object_set_dict_set_None(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") f.__dict__["someattr"] = hi f.someattr = None eq_(self._someattr_history(f), ([None], (), [hi])) def test_use_object_set_value_twice(self): Foo, Bar = self._two_obj_fixture(uselist=False) f = Foo() hi = Bar(name="hi") there = Bar(name="there") f.someattr = hi f.someattr = there eq_(self._someattr_history(f), ([there], (), ())) def test_object_collections_set(self): # TODO: break into individual tests Foo, Bar = self._two_obj_fixture(uselist=True) hi = Bar(name="hi") there = Bar(name="there") old = Bar(name="old") new = Bar(name="new") # case 1. new object f = Foo() eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [], ()), ) f.someattr = [hi] eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([hi], [], []), ) self._commit_someattr(f) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [hi], ()), ) f.someattr = [there] eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([there], [], [hi]), ) self._commit_someattr(f) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [there], ()), ) f.someattr = [hi] eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([hi], [], [there]), ) f.someattr = [old, new] eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([old, new], [], [there]), ) # case 2. object with direct settings (similar to a load # operation) f = Foo() collection = attributes.init_collection(f, "someattr") collection.append_without_event(new) attributes.instance_state(f)._commit_all(attributes.instance_dict(f)) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [new], ()), ) f.someattr = [old] eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([old], [], [new]), ) self._commit_someattr(f) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [old], ()), ) def test_dict_collections(self): # TODO: break into individual tests class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute( Foo, "someattr", uselist=True, useobject=True, typecallable=attribute_keyed_dict("name"), ) hi = Bar(name="hi") there = Bar(name="there") f = Foo() eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [], ()), ) f.someattr["hi"] = hi eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([hi], [], []), ) f.someattr["there"] = there eq_( tuple( [ set(x) for x in attributes.get_state_history( attributes.instance_state(f), "someattr" ) ] ), ({hi, there}, set(), set()), ) self._commit_someattr(f) eq_( tuple( [ set(x) for x in attributes.get_state_history( attributes.instance_state(f), "someattr" ) ] ), (set(), {hi, there}, set()), ) def test_object_collections_mutate(self): # TODO: break into individual tests class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): pass instrumentation.register_class(Foo) _register_attribute(Foo, "someattr", uselist=True, useobject=True) _register_attribute(Foo, "id", uselist=False, useobject=False) instrumentation.register_class(Bar) hi = Bar(name="hi") there = Bar(name="there") old = Bar(name="old") new = Bar(name="new") # case 1. new object f = Foo(id=1) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [], ()), ) f.someattr.append(hi) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([hi], [], []), ) self._commit_someattr(f) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [hi], ()), ) f.someattr.append(there) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([there], [hi], []), ) self._commit_someattr(f) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [hi, there], ()), ) f.someattr.remove(there) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([], [hi], [there]), ) f.someattr.append(old) f.someattr.append(new) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([old, new], [hi], [there]), ) attributes.instance_state(f)._commit( attributes.instance_dict(f), ["someattr"] ) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [hi, old, new], ()), ) f.someattr.pop(0) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([], [old, new], [hi]), ) # case 2. object with direct settings (similar to a load # operation) f = Foo() f.__dict__["id"] = 1 collection = attributes.init_collection(f, "someattr") collection.append_without_event(new) attributes.instance_state(f)._commit_all(attributes.instance_dict(f)) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [new], ()), ) f.someattr.append(old) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([old], [new], []), ) attributes.instance_state(f)._commit( attributes.instance_dict(f), ["someattr"] ) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [new, old], ()), ) f = Foo() collection = attributes.init_collection(f, "someattr") collection.append_without_event(new) attributes.instance_state(f)._commit_all(attributes.instance_dict(f)) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [new], ()), ) f.id = 1 f.someattr.remove(new) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([], [], [new]), ) # case 3. mixing appends with sets f = Foo() f.someattr.append(hi) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([hi], [], []), ) f.someattr.append(there) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([hi, there], [], []), ) f.someattr = [there] eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([there], [], []), ) # case 4. ensure duplicates show up, order is maintained f = Foo() f.someattr.append(hi) f.someattr.append(there) f.someattr.append(hi) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([hi, there, hi], [], []), ) attributes.instance_state(f)._commit_all(attributes.instance_dict(f)) eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ((), [hi, there, hi], ()), ) f.someattr = [] eq_( attributes.get_state_history( attributes.instance_state(f), "someattr" ), ([], [], [hi, there, hi]), ) def test_collections_via_backref(self): # TODO: break into individual tests class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute( Foo, "bars", uselist=True, backref="foo", trackparent=True, useobject=True, ) _register_attribute( Bar, "foo", uselist=False, backref="bars", trackparent=True, useobject=True, ) f1 = Foo() b1 = Bar() eq_( attributes.get_state_history( attributes.instance_state(f1), "bars" ), ((), [], ()), ) eq_( attributes.get_state_history(attributes.instance_state(b1), "foo"), ((), (), ()), ) # b1.foo = f1 f1.bars.append(b1) eq_( attributes.get_state_history( attributes.instance_state(f1), "bars" ), ([b1], [], []), ) eq_( attributes.get_state_history(attributes.instance_state(b1), "foo"), ([f1], (), ()), ) b2 = Bar() f1.bars.append(b2) eq_( attributes.get_state_history( attributes.instance_state(f1), "bars" ), ([b1, b2], [], []), ) eq_( attributes.get_state_history(attributes.instance_state(b1), "foo"), ([f1], (), ()), ) eq_( attributes.get_state_history(attributes.instance_state(b2), "foo"), ([f1], (), ()), ) class LazyloadHistoryTest(fixtures.TestBase): def test_lazy_backref_collections(self): # TODO: break into individual tests class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): pass lazy_load = [] def lazyload(state, passive): return lazy_load instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute( Foo, "bars", uselist=True, backref="foo", trackparent=True, callable_=lazyload, useobject=True, ) _register_attribute( Bar, "foo", uselist=False, backref="bars", trackparent=True, useobject=True, ) bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), Bar(id=4)] lazy_load = [bar1, bar2, bar3] f = Foo() bar4 = Bar() bar4.foo = f eq_( attributes.get_state_history(attributes.instance_state(f), "bars"), ([bar4], [bar1, bar2, bar3], []), ) lazy_load = None f = Foo() bar4 = Bar() bar4.foo = f eq_( attributes.get_state_history(attributes.instance_state(f), "bars"), ([bar4], [], []), ) lazy_load = [bar1, bar2, bar3] attributes.instance_state(f)._expire_attributes( attributes.instance_dict(f), ["bars"] ) eq_( attributes.get_state_history(attributes.instance_state(f), "bars"), ((), [bar1, bar2, bar3], ()), ) def test_collections_via_lazyload(self): # TODO: break into individual tests class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): pass lazy_load = [] def lazyload(state, passive): return lazy_load instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute( Foo, "bars", uselist=True, callable_=lazyload, trackparent=True, useobject=True, ) bar1, bar2, bar3, bar4 = [Bar(id=1), Bar(id=2), Bar(id=3), Bar(id=4)] lazy_load = [bar1, bar2, bar3] f = Foo() f.bars = [] eq_( attributes.get_state_history(attributes.instance_state(f), "bars"), ([], [], [bar1, bar2, bar3]), ) f = Foo() f.bars.append(bar4) eq_( attributes.get_state_history(attributes.instance_state(f), "bars"), ([bar4], [bar1, bar2, bar3], []), ) f = Foo() f.bars.remove(bar2) eq_( attributes.get_state_history(attributes.instance_state(f), "bars"), ([], [bar1, bar3], [bar2]), ) f.bars.append(bar4) eq_( attributes.get_state_history(attributes.instance_state(f), "bars"), ([bar4], [bar1, bar3], [bar2]), ) f = Foo() del f.bars[1] eq_( attributes.get_state_history(attributes.instance_state(f), "bars"), ([], [bar1, bar3], [bar2]), ) lazy_load = None f = Foo() f.bars.append(bar2) eq_( attributes.get_state_history(attributes.instance_state(f), "bars"), ([bar2], [], []), ) def test_scalar_via_lazyload(self): # TODO: break into individual tests class Foo(fixtures.BasicEntity): pass lazy_load = None def lazyload(state, passive): return lazy_load instrumentation.register_class(Foo) _register_attribute( Foo, "bar", uselist=False, callable_=lazyload, useobject=False ) lazy_load = "hi" # with scalar non-object and active_history=False, the lazy # callable is only executed on gets, not history operations f = Foo() eq_(f.bar, "hi") eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), ["hi"], ()), ) f = Foo() f.bar = None eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ([None], (), ()), ) f = Foo() f.bar = "there" eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), (["there"], (), ()), ) f.bar = "hi" eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), (["hi"], (), ()), ) f = Foo() eq_(f.bar, "hi") del f.bar eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), (), ["hi"]), ) assert f.bar is None eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), (), ["hi"]), ) def test_scalar_via_lazyload_with_active(self): # TODO: break into individual tests class Foo(fixtures.BasicEntity): pass lazy_load = None def lazyload(state, passive): return lazy_load instrumentation.register_class(Foo) _register_attribute( Foo, "bar", uselist=False, callable_=lazyload, useobject=False, active_history=True, ) lazy_load = "hi" # active_history=True means the lazy callable is executed on set # as well as get, causing the old value to appear in the history f = Foo() eq_(f.bar, "hi") eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), ["hi"], ()), ) f = Foo() f.bar = None eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ([None], (), ["hi"]), ) f = Foo() f.bar = "there" eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), (["there"], (), ["hi"]), ) f.bar = "hi" eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), ["hi"], ()), ) f = Foo() eq_(f.bar, "hi") del f.bar eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), (), ["hi"]), ) assert f.bar is None eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), (), ["hi"]), ) def test_scalar_object_via_lazyload(self): # TODO: break into individual tests class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): pass lazy_load = None def lazyload(state, passive): return lazy_load instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute( Foo, "bar", uselist=False, callable_=lazyload, trackparent=True, useobject=True, ) bar1, bar2 = [Bar(id=1), Bar(id=2)] lazy_load = bar1 # with scalar object, the lazy callable is only executed on gets # and history operations f = Foo() eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), [bar1], ()), ) f = Foo() f.bar = None eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ([None], (), [bar1]), ) f = Foo() f.bar = bar2 eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ([bar2], (), [bar1]), ) f.bar = bar1 eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), [bar1], ()), ) f = Foo() eq_(f.bar, bar1) del f.bar eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), (), [bar1]), ) assert f.bar is None eq_( attributes.get_state_history(attributes.instance_state(f), "bar"), ((), (), [bar1]), ) class CollectionKeyTest(fixtures.ORMTest): @testing.fixture def dict_collection(self): class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): def __init__(self, name): self.name = name instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute( Foo, "someattr", uselist=True, useobject=True, typecallable=attribute_keyed_dict("name"), ) _register_attribute( Bar, "name", uselist=False, useobject=False, ) return Foo, Bar @testing.fixture def list_collection(self): class Foo(fixtures.BasicEntity): pass class Bar(fixtures.BasicEntity): pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute( Foo, "someattr", uselist=True, useobject=True, ) return Foo, Bar def test_listen_w_list_key(self, list_collection): Foo, Bar = list_collection m1 = Mock() event.listen(Foo.someattr, "append", m1, include_key=True) event.listen(Foo.someattr, "remove", m1, include_key=True) f1 = Foo() b1, b2, b3 = Bar(), Bar(), Bar() f1.someattr.append(b1) f1.someattr.append(b2) f1.someattr[1] = b3 del f1.someattr[0] append_token, remove_token = ( Foo.someattr.impl._append_token, Foo.someattr.impl._remove_token, ) eq_( m1.mock_calls, [ call( f1, b1, append_token, key=NO_KEY, ), call( f1, b2, append_token, key=NO_KEY, ), call( f1, b2, remove_token, key=1, ), call( f1, b3, append_token, key=1, ), call( f1, b1, remove_token, key=0, ), ], ) def test_listen_w_dict_key(self, dict_collection): Foo, Bar = dict_collection m1 = Mock() event.listen(Foo.someattr, "append", m1, include_key=True) event.listen(Foo.someattr, "remove", m1, include_key=True) f1 = Foo() b1, b2, b3 = Bar("b1"), Bar("b2"), Bar("b3") f1.someattr["k1"] = b1 f1.someattr.update({"k2": b2, "k3": b3}) del f1.someattr["k2"] append_token, remove_token = ( Foo.someattr.impl._append_token, Foo.someattr.impl._remove_token, ) eq_( m1.mock_calls, [ call( f1, b1, append_token, key="k1", ), call( f1, b2, append_token, key="k2", ), call( f1, b3, append_token, key="k3", ), call( f1, b2, remove_token, key="k2", ), ], ) def test_dict_bulk_replace_w_key(self, dict_collection): Foo, Bar = dict_collection m1 = Mock() event.listen(Foo.someattr, "bulk_replace", m1, include_key=True) event.listen(Foo.someattr, "append", m1, include_key=True) event.listen(Foo.someattr, "remove", m1, include_key=True) f1 = Foo() b1, b2, b3, b4 = Bar("b1"), Bar("b2"), Bar("b3"), Bar("b4") f1.someattr = {"b1": b1, "b3": b3} f1.someattr = {"b2": b2, "b3": b3, "b4": b4} bulk_replace_token = Foo.someattr.impl._bulk_replace_token eq_( m1.mock_calls, [ call(f1, [b1, b3], bulk_replace_token, keys=["b1", "b3"]), call(f1, b1, bulk_replace_token, key="b1"), call(f1, b3, bulk_replace_token, key="b3"), call( f1, [b2, b3, b4], bulk_replace_token, keys=["b2", "b3", "b4"], ), call(f1, b2, bulk_replace_token, key="b2"), call(f1, b4, bulk_replace_token, key="b4"), call(f1, b1, bulk_replace_token, key=NO_KEY), ], ) def test_listen_wo_dict_key(self, dict_collection): Foo, Bar = dict_collection m1 = Mock() event.listen(Foo.someattr, "append", m1) event.listen(Foo.someattr, "remove", m1) f1 = Foo() b1, b2, b3 = Bar("b1"), Bar("b2"), Bar("b3") f1.someattr["k1"] = b1 f1.someattr.update({"k2": b2, "k3": b3}) del f1.someattr["k2"] append_token, remove_token = ( Foo.someattr.impl._append_token, Foo.someattr.impl._remove_token, ) eq_( m1.mock_calls, [ call( f1, b1, append_token, ), call( f1, b2, append_token, ), call( f1, b3, append_token, ), call( f1, b2, remove_token, ), ], ) class ListenerTest(fixtures.ORMTest): def test_receive_changes(self): """test that Listeners can mutate the given value.""" class Foo: pass class Bar: pass def append(state, child, initiator): b2 = Bar() b2.data = b1.data + " appended" return b2 def on_set(state, value, oldvalue, initiator): return value + " modified" instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "data", uselist=False, useobject=False) _register_attribute(Foo, "barlist", uselist=True, useobject=True) _register_attribute( Foo, "barset", typecallable=set, uselist=True, useobject=True ) _register_attribute(Bar, "data", uselist=False, useobject=False) event.listen(Foo.data, "set", on_set, retval=True) event.listen(Foo.barlist, "append", append, retval=True) event.listen(Foo.barset, "append", append, retval=True) f1 = Foo() f1.data = "some data" eq_(f1.data, "some data modified") b1 = Bar() b1.data = "some bar" f1.barlist.append(b1) assert b1.data == "some bar" assert f1.barlist[0].data == "some bar appended" f1.barset.add(b1) assert f1.barset.pop().data == "some bar appended" def test_named(self): canary = Mock() class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "data", uselist=False, useobject=False) _register_attribute(Foo, "barlist", uselist=True, useobject=True) event.listen(Foo.data, "set", canary.set, named=True) event.listen(Foo.barlist, "append", canary.append, named=True) event.listen(Foo.barlist, "remove", canary.remove, named=True) f1 = Foo() b1 = Bar() f1.data = 5 f1.barlist.append(b1) f1.barlist.remove(b1) eq_( canary.mock_calls, [ call.set( oldvalue=attributes.NO_VALUE, initiator=attributes.AttributeEventToken( Foo.data.impl, attributes.OP_REPLACE ), target=f1, value=5, ), call.append( initiator=attributes.AttributeEventToken( Foo.barlist.impl, attributes.OP_APPEND ), target=f1, value=b1, ), call.remove( initiator=attributes.AttributeEventToken( Foo.barlist.impl, attributes.OP_REMOVE ), target=f1, value=b1, ), ], ) def test_collection_link_events(self): class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "barlist", uselist=True, useobject=True) canary = Mock() event.listen(Foo.barlist, "init_collection", canary.init) event.listen(Foo.barlist, "dispose_collection", canary.dispose) f1 = Foo() eq_(f1.barlist, []) adapter_one = f1.barlist._sa_adapter eq_(canary.init.mock_calls, [call(f1, [], adapter_one)]) b1 = Bar() f1.barlist.append(b1) b2 = Bar() f1.barlist = [b2] adapter_two = f1.barlist._sa_adapter eq_( canary.init.mock_calls, [ call(f1, [b1], adapter_one), # note the f1.barlist that # we saved earlier has been mutated # in place, new as of [ticket:3913] call(f1, [b2], adapter_two), ], ) eq_(canary.dispose.mock_calls, [call(f1, [b1], adapter_one)]) def test_none_on_collection_event(self): """test that append/remove of None in collections emits events. This is new behavior as of 0.8. """ class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "barlist", uselist=True, useobject=True) canary = [] def append(state, child, initiator): canary.append((state, child)) def remove(state, child, initiator): canary.append((state, child)) event.listen(Foo.barlist, "append", append) event.listen(Foo.barlist, "remove", remove) b1, b2 = Bar(), Bar() f1 = Foo() f1.barlist.append(None) eq_(canary, [(f1, None)]) canary[:] = [] f1 = Foo() f1.barlist = [None, b2] eq_(canary, [(f1, None), (f1, b2)]) canary[:] = [] f1 = Foo() f1.barlist = [b1, None, b2] eq_(canary, [(f1, b1), (f1, None), (f1, b2)]) f1.barlist.remove(None) eq_(canary, [(f1, b1), (f1, None), (f1, b2), (f1, None)]) def test_flag_modified(self): canary = Mock() class Foo: pass instrumentation.register_class(Foo) _register_attribute(Foo, "bar") event.listen(Foo.bar, "modified", canary) f1 = Foo() f1.bar = "hi" attributes.flag_modified(f1, "bar") eq_( canary.mock_calls, [ call( f1, attributes.AttributeEventToken( Foo.bar.impl, attributes.OP_MODIFIED ), ) ], ) def test_none_init_scalar(self): canary = Mock() class Foo: pass instrumentation.register_class(Foo) _register_attribute(Foo, "bar") event.listen(Foo.bar, "set", canary) f1 = Foo() eq_(f1.bar, None) # reversal of approach in #3061 eq_(canary.mock_calls, []) def test_none_init_object(self): canary = Mock() class Foo: pass instrumentation.register_class(Foo) _register_attribute(Foo, "bar", useobject=True) event.listen(Foo.bar, "set", canary) f1 = Foo() eq_(f1.bar, None) # reversal of approach in #3061 eq_(canary.mock_calls, []) def test_none_init_collection(self): canary = Mock() class Foo: pass class Bar: pass instrumentation.register_class(Foo) instrumentation.register_class(Bar) _register_attribute(Foo, "bar", useobject=True, uselist=True) event.listen(Foo.bar, "set", canary) f1 = Foo() eq_(f1.bar, []) assert "bar" not in f1.__dict__ adapter = Foo.bar.impl.get_collection( attributes.instance_state(f1), attributes.instance_dict(f1) ) assert adapter.empty # reversal of approach in #3061 eq_(canary.mock_calls, []) f1.bar.append(Bar()) assert "bar" in f1.__dict__ assert not adapter.empty class EventPropagateTest(fixtures.TestBase): # tests that were expanded as of #4695 # in particular these reveal the inconsistency we have in returning the # "old" value between object and non-object. # this inconsistency might not be a bug, since the nature of scalar # SQL attributes and ORM related objects is fundamentally different. # the inconsistency is: # with active_history=False if old value is not present, for scalar we # return NO VALUE, for object we return NEVER SET # with active_history=True if old value is not present, for scalar we # return NEVER SET, for object we return None # so it is basically fully inconsistent across both directions. def test_propagate_active_history(self): for (A, B, C, D), canary in self._test_propagate_fixtures(True, False): b = B() b.attrib = "foo" eq_(b.attrib, "foo") eq_(canary, [("foo", attributes.NO_VALUE)]) c = C() c.attrib = "bar" eq_(c.attrib, "bar") eq_( canary, [("foo", attributes.NO_VALUE), ("bar", attributes.NO_VALUE)], ) def test_propagate(self): for (A, B, C, D), canary in self._test_propagate_fixtures( False, False ): b = B() b.attrib = "foo" eq_(b.attrib, "foo") eq_(canary, [("foo", attributes.NO_VALUE)]) c = C() c.attrib = "bar" eq_(c.attrib, "bar") eq_( canary, [("foo", attributes.NO_VALUE), ("bar", attributes.NO_VALUE)], ) def test_propagate_useobject_active_history(self): for (A, B, C, D), canary in self._test_propagate_fixtures(True, True): b = B() d1 = D() b.attrib = d1 is_(b.attrib, d1) eq_(canary, [(d1, None)]) c = C() d2 = D() c.attrib = d2 is_(c.attrib, d2) eq_(canary, [(d1, None), (d2, None)]) def test_propagate_useobject(self): for (A, B, C, D), canary in self._test_propagate_fixtures(False, True): b = B() d1 = D() b.attrib = d1 is_(b.attrib, d1) eq_(canary, [(d1, attributes.NO_VALUE)]) c = C() d2 = D() c.attrib = d2 is_(c.attrib, d2) eq_(canary, [(d1, attributes.NO_VALUE), (d2, attributes.NO_VALUE)]) def _test_propagate_fixtures(self, active_history, useobject): classes = [None, None, None, None] canary = [] def make_a(): class A: pass classes[0] = A def make_b(): class B(classes[0]): pass classes[1] = B def make_c(): class C(classes[1]): pass classes[2] = C def make_d(): class D: pass classes[3] = D return D def instrument_a(): instrumentation.register_class(classes[0]) def instrument_b(): instrumentation.register_class(classes[1]) def instrument_c(): instrumentation.register_class(classes[2]) def instrument_d(): instrumentation.register_class(classes[3]) def attr_a(): _register_attribute( classes[0], "attrib", uselist=False, useobject=useobject ) def attr_b(): _register_attribute( classes[1], "attrib", uselist=False, useobject=useobject ) def attr_c(): _register_attribute( classes[2], "attrib", uselist=False, useobject=useobject ) def set_(state, value, oldvalue, initiator): canary.append((value, oldvalue)) def events_a(): event.listen( classes[0].attrib, "set", set_, propagate=True, active_history=active_history, ) ordering = [ (instrument_a, instrument_b), (instrument_b, instrument_c), (attr_a, attr_b), (attr_b, attr_c), (make_a, instrument_a), (instrument_a, attr_a), (attr_a, events_a), (make_b, instrument_b), (instrument_b, attr_b), (make_c, instrument_c), (instrument_c, attr_c), (make_a, make_b), (make_b, make_c), ] elements = [ make_a, make_b, make_c, instrument_a, instrument_b, instrument_c, attr_a, attr_b, attr_c, events_a, ] for i, series in enumerate(all_partial_orderings(ordering, elements)): for fn in series: fn() if useobject: make_d() instrument_d() yield classes, canary classes[:] = [None, None, None, None] canary[:] = [] class CollectionInitTest(fixtures.TestBase): def setup_test(self): class A: pass class B: pass self.A = A self.B = B instrumentation.register_class(A) instrumentation.register_class(B) _register_attribute(A, "bs", uselist=True, useobject=True) def test_bulk_replace_resets_empty(self): A = self.A a1 = A() state = attributes.instance_state(a1) existing = a1.bs is_(state._empty_collections["bs"], existing) is_not(existing._sa_adapter, None) a1.bs = [] # replaces previous "empty" collection not_in("bs", state._empty_collections) # empty is replaced is_(existing._sa_adapter, None) def test_assert_false_on_default_value(self): A = self.A a1 = A() state = attributes.instance_state(a1) attributes.init_state_collection(state, state.dict, "bs") assert_raises( AssertionError, A.bs.impl._default_value, state, state.dict ) def test_loader_inits_collection_already_exists(self): A, B = self.A, self.B a1 = A() b1, b2 = B(), B() a1.bs = [b1, b2] eq_(a1.__dict__["bs"], [b1, b2]) old = a1.__dict__["bs"] is_not(old._sa_adapter, None) state = attributes.instance_state(a1) # this occurs during a load with populate_existing adapter = attributes.init_state_collection(state, state.dict, "bs") new = a1.__dict__["bs"] eq_(new, []) is_(new._sa_adapter, adapter) is_(old._sa_adapter, None) class TestUnlink(fixtures.TestBase): def setup_test(self): class A: pass class B: pass self.A = A self.B = B instrumentation.register_class(A) instrumentation.register_class(B) _register_attribute(A, "bs", uselist=True, useobject=True) def test_expired(self): A, B = self.A, self.B a1 = A() coll = a1.bs a1.bs.append(B()) state = attributes.instance_state(a1) state._expire(state.dict, set()) assert_warns(Warning, coll.append, B()) def test_replaced(self): A, B = self.A, self.B a1 = A() coll = a1.bs a1.bs.append(B()) a1.bs = [] # a bulk replace no longer empties the old collection # as of [ticket:3913] assert len(coll) == 1 coll.append(B()) assert len(coll) == 2 def test_pop_existing(self): A, B = self.A, self.B a1 = A() coll = a1.bs a1.bs.append(B()) state = attributes.instance_state(a1) state._reset(state.dict, "bs") assert_warns(Warning, coll.append, B()) def test_ad_hoc_lazy(self): A, B = self.A, self.B a1 = A() coll = a1.bs a1.bs.append(B()) state = attributes.instance_state(a1) _set_callable(state, state.dict, "bs", lambda: B()) assert_warns(Warning, coll.append, B())