"""Test event registration and listening.""" from unittest.mock import call from unittest.mock import Mock from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import testing from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_deprecated from sqlalchemy.testing import expect_raises_message from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_not from sqlalchemy.testing.util import gc_collect class TearDownLocalEventsFixture: def teardown_test(self): classes = set() for entry in event.base._registrars.values(): for evt_cls in entry: if evt_cls.__module__ == __name__: classes.add(evt_cls) for evt_cls in classes: event.base._remove_dispatcher(evt_cls) class EventsTest(TearDownLocalEventsFixture, fixtures.TestBase): """Test class- and instance-level event registration.""" def setup_test(self): class TargetEvents(event.Events): def event_one(self, x, y): pass def event_two(self, x): pass def event_three(self, x): pass class Target: dispatch = event.dispatcher(TargetEvents) self.Target = Target def test_register_class(self): def listen(x, y): pass event.listen(self.Target, "event_one", listen) eq_(len(self.Target().dispatch.event_one), 1) eq_(len(self.Target().dispatch.event_two), 0) def test_register_instance(self): def listen(x, y): pass t1 = self.Target() event.listen(t1, "event_one", listen) eq_(len(self.Target().dispatch.event_one), 0) eq_(len(t1.dispatch.event_one), 1) eq_(len(self.Target().dispatch.event_two), 0) eq_(len(t1.dispatch.event_two), 0) def test_bool_clslevel(self): def listen_one(x, y): pass event.listen(self.Target, "event_one", listen_one) t = self.Target() assert t.dispatch.event_one def test_register_class_instance(self): def listen_one(x, y): pass def listen_two(x, y): pass event.listen(self.Target, "event_one", listen_one) t1 = self.Target() event.listen(t1, "event_one", listen_two) eq_(len(self.Target().dispatch.event_one), 1) eq_(len(t1.dispatch.event_one), 2) eq_(len(self.Target().dispatch.event_two), 0) eq_(len(t1.dispatch.event_two), 0) def listen_three(x, y): pass event.listen(self.Target, "event_one", listen_three) eq_(len(self.Target().dispatch.event_one), 2) eq_(len(t1.dispatch.event_one), 3) def test_append_vs_insert_cls(self): def listen_one(x, y): pass def listen_two(x, y): pass def listen_three(x, y): pass event.listen(self.Target, "event_one", listen_one) event.listen(self.Target, "event_one", listen_two) event.listen(self.Target, "event_one", listen_three, insert=True) eq_( list(self.Target().dispatch.event_one), [listen_three, listen_one, listen_two], ) def test_append_vs_insert_instance(self): def listen_one(x, y): pass def listen_two(x, y): pass def listen_three(x, y): pass target = self.Target() event.listen(target, "event_one", listen_one) event.listen(target, "event_one", listen_two) event.listen(target, "event_one", listen_three, insert=True) eq_( list(target.dispatch.event_one), [listen_three, listen_one, listen_two], ) def test_decorator(self): @event.listens_for(self.Target, "event_one") def listen_one(x, y): pass @event.listens_for(self.Target, "event_two") @event.listens_for(self.Target, "event_three") def listen_two(x, y): pass eq_(list(self.Target().dispatch.event_one), [listen_one]) eq_(list(self.Target().dispatch.event_two), [listen_two]) eq_(list(self.Target().dispatch.event_three), [listen_two]) def test_no_instance_level_collections(self): @event.listens_for(self.Target, "event_one") def listen_one(x, y): pass t1 = self.Target() t2 = self.Target() t1.dispatch.event_one(5, 6) t2.dispatch.event_one(5, 6) is_( self.Target.dispatch._empty_listener_reg[self.Target]["event_one"], t1.dispatch.event_one, ) @event.listens_for(t1, "event_one") def listen_two(x, y): pass is_not( self.Target.dispatch._empty_listener_reg[self.Target]["event_one"], t1.dispatch.event_one, ) is_( self.Target.dispatch._empty_listener_reg[self.Target]["event_one"], t2.dispatch.event_one, ) def test_exec_once(self): m1 = Mock() event.listen(self.Target, "event_one", m1) t1 = self.Target() t2 = self.Target() t1.dispatch.event_one.for_modify(t1.dispatch).exec_once(5, 6) t1.dispatch.event_one.for_modify(t1.dispatch).exec_once(7, 8) t2.dispatch.event_one.for_modify(t2.dispatch).exec_once(9, 10) eq_(m1.mock_calls, [call(5, 6), call(9, 10)]) def test_real_name_wrong_dispatch(self): m1 = Mock() class E1(event.Events): @classmethod def _accept_with(cls, target, identifier): if isinstance(target, T1): return target else: m1.yup() return None def event_one(self, x, y): pass def event_two(self, x): pass def event_three(self, x): pass class T1: dispatch = event.dispatcher(E1) class T2: pass class E2(event.Events): _dispatch_target = T2 def event_four(self, x): pass with expect_raises_message( exc.InvalidRequestError, "No such event 'event_three'" ): @event.listens_for(E2, "event_three") def go(*arg): pass eq_(m1.mock_calls, [call.yup()]) def test_exec_once_exception(self): m1 = Mock() m1.side_effect = ValueError event.listen(self.Target, "event_one", m1) t1 = self.Target() assert_raises( ValueError, t1.dispatch.event_one.for_modify(t1.dispatch).exec_once, 5, 6, ) t1.dispatch.event_one.for_modify(t1.dispatch).exec_once(7, 8) eq_(m1.mock_calls, [call(5, 6)]) def test_exec_once_unless_exception(self): m1 = Mock() m1.side_effect = ValueError event.listen(self.Target, "event_one", m1) t1 = self.Target() assert_raises( ValueError, t1.dispatch.event_one.for_modify( t1.dispatch ).exec_once_unless_exception, 5, 6, ) assert_raises( ValueError, t1.dispatch.event_one.for_modify( t1.dispatch ).exec_once_unless_exception, 7, 8, ) m1.side_effect = None t1.dispatch.event_one.for_modify( t1.dispatch ).exec_once_unless_exception(9, 10) t1.dispatch.event_one.for_modify( t1.dispatch ).exec_once_unless_exception(11, 12) eq_(m1.mock_calls, [call(5, 6), call(7, 8), call(9, 10)]) def test_immutable_methods(self): t1 = self.Target() for meth in [ t1.dispatch.event_one.exec_once, t1.dispatch.event_one.insert, t1.dispatch.event_one.append, t1.dispatch.event_one.remove, t1.dispatch.event_one.clear, ]: assert_raises_message( NotImplementedError, r"need to call for_modify\(\)", meth ) class SlotsEventsTest(fixtures.TestBase): def test_no_slots_dispatch(self): class Target: __slots__ = () class TargetEvents(event.Events): _dispatch_target = Target def event_one(self, x, y): pass def event_two(self, x): pass def event_three(self, x): pass t1 = Target() with testing.expect_raises_message( TypeError, r"target .*Target.* doesn't have __dict__, should it " "be defining _slots_dispatch", ): event.listen(t1, "event_one", Mock()) def test_slots_dispatch(self): class Target: __slots__ = ("_slots_dispatch",) class TargetEvents(event.Events): _dispatch_target = Target def event_one(self, x, y): pass def event_two(self, x): pass def event_three(self, x): pass t1 = Target() m1 = Mock() event.listen(t1, "event_one", m1) t1.dispatch.event_one(2, 4) eq_(m1.mock_calls, [call(2, 4)]) class NamedCallTest(TearDownLocalEventsFixture, fixtures.TestBase): def _fixture(self): class TargetEventsOne(event.Events): def event_one(self, x, y): pass def event_two(self, x, y, **kw): pass def event_five(self, x, y, z, q): pass class TargetOne: dispatch = event.dispatcher(TargetEventsOne) return TargetOne def _wrapped_fixture(self): class TargetEvents(event.Events): @classmethod def _listen(cls, event_key): fn = event_key._listen_fn def adapt(*args): fn(*["adapted %s" % arg for arg in args]) event_key = event_key.with_wrapper(adapt) event_key.base_listen() def event_one(self, x, y): pass def event_five(self, x, y, z, q): pass class Target: dispatch = event.dispatcher(TargetEvents) return Target def test_kw_accept(self): TargetOne = self._fixture() canary = Mock() @event.listens_for(TargetOne, "event_one", named=True) def handler1(**kw): canary(kw) TargetOne().dispatch.event_one(4, 5) eq_(canary.mock_calls, [call({"x": 4, "y": 5})]) def test_kw_accept_wrapped(self): TargetOne = self._wrapped_fixture() canary = Mock() @event.listens_for(TargetOne, "event_one", named=True) def handler1(**kw): canary(kw) TargetOne().dispatch.event_one(4, 5) eq_(canary.mock_calls, [call({"y": "adapted 5", "x": "adapted 4"})]) def test_partial_kw_accept(self): TargetOne = self._fixture() canary = Mock() @event.listens_for(TargetOne, "event_five", named=True) def handler1(z, y, **kw): canary(z, y, kw) TargetOne().dispatch.event_five(4, 5, 6, 7) eq_(canary.mock_calls, [call(6, 5, {"x": 4, "q": 7})]) def test_partial_kw_accept_wrapped(self): TargetOne = self._wrapped_fixture() canary = Mock() @event.listens_for(TargetOne, "event_five", named=True) def handler1(z, y, **kw): canary(z, y, kw) TargetOne().dispatch.event_five(4, 5, 6, 7) eq_( canary.mock_calls, [ call( "adapted 6", "adapted 5", {"q": "adapted 7", "x": "adapted 4"}, ) ], ) def test_kw_accept_plus_kw(self): TargetOne = self._fixture() canary = Mock() @event.listens_for(TargetOne, "event_two", named=True) def handler1(**kw): canary(kw) TargetOne().dispatch.event_two(4, 5, z=8, q=5) eq_(canary.mock_calls, [call({"x": 4, "y": 5, "z": 8, "q": 5})]) class LegacySignatureTest(TearDownLocalEventsFixture, fixtures.TestBase): """test adaption of legacy args""" def setup_test(self): class TargetEventsOne(event.Events): @event._legacy_signature("0.9", ["x", "y"]) def event_three(self, x, y, z, q): pass @event._legacy_signature("0.9", ["x", "y", "**kw"]) def event_four(self, x, y, z, q, **kw): pass @event._legacy_signature( "0.9", ["x", "y", "z", "q"], lambda x, y: (x, y, x + y, x * y) ) def event_six(self, x, y): pass class TargetOne: dispatch = event.dispatcher(TargetEventsOne) self.TargetOne = TargetOne def test_legacy_accept(self): canary = Mock() @event.listens_for(self.TargetOne, "event_three") def handler1(x, y): canary(x, y) with expect_deprecated( 'The argument signature for the "TargetEventsOne.event_three" ' "event listener has changed as of version 0.9, and conversion " "for the old argument signature will be removed in a future " r'release. The new signature is "def event_three\(x, y, z, q\)"' ): self.TargetOne().dispatch.event_three(4, 5, 6, 7) eq_(canary.mock_calls, [call(4, 5)]) def test_legacy_accept_kw_cls(self): canary = Mock() @event.listens_for(self.TargetOne, "event_four") def handler1(x, y, **kw): canary(x, y, kw) self._test_legacy_accept_kw(self.TargetOne(), canary) def test_legacy_accept_kw_instance(self): canary = Mock() inst = self.TargetOne() @event.listens_for(inst, "event_four") def handler1(x, y, **kw): canary(x, y, kw) self._test_legacy_accept_kw(inst, canary) def test_legacy_accept_partial(self): canary = Mock() def evt(a, x, y, **kw): canary(a, x, y, **kw) from functools import partial evt_partial = partial(evt, 5) target = self.TargetOne() event.listen(target, "event_four", evt_partial) # can't do legacy accept on a partial; we can't inspect it assert_raises( TypeError, target.dispatch.event_four, 4, 5, 6, 7, foo="bar" ) target.dispatch.event_four(4, 5, foo="bar") eq_(canary.mock_calls, [call(5, 4, 5, foo="bar")]) def _test_legacy_accept_kw(self, target, canary): with expect_deprecated( 'The argument signature for the "TargetEventsOne.event_four" ' "event listener has changed as of version 0.9, and conversion " "for the old argument signature will be removed in a future " r"release. The new signature is " r'"def event_four\(x, y, z, q, \*\*kw\)"' ): target.dispatch.event_four(4, 5, 6, 7, foo="bar") eq_(canary.mock_calls, [call(4, 5, {"foo": "bar"})]) def test_complex_legacy_accept(self): canary = Mock() @event.listens_for(self.TargetOne, "event_six") def handler1(x, y, z, q): canary(x, y, z, q) with expect_deprecated( 'The argument signature for the "TargetEventsOne.event_six" ' "event listener has changed as of version 0.9, and " "conversion for the old argument signature will be removed in " "a future release. The new signature is " r'"def event_six\(x, y\)' ): self.TargetOne().dispatch.event_six(4, 5) eq_(canary.mock_calls, [call(4, 5, 9, 20)]) def test_complex_new_accept(self): canary = Mock() @event.listens_for(self.TargetOne, "event_six") def handler1(x, y): canary(x, y) # new version does not emit a warning self.TargetOne().dispatch.event_six(4, 5) eq_(canary.mock_calls, [call(4, 5)]) def test_legacy_accept_from_method(self): canary = Mock() class MyClass: def handler1(self, x, y): canary(x, y) event.listen(self.TargetOne, "event_three", MyClass().handler1) with expect_deprecated( 'The argument signature for the "TargetEventsOne.event_three" ' "event listener has changed as of version 0.9, and conversion " "for the old argument signature will be removed in a future " r'release. The new signature is "def event_three\(x, y, z, q\)"' ): self.TargetOne().dispatch.event_three(4, 5, 6, 7) eq_(canary.mock_calls, [call(4, 5)]) def test_standard_accept_has_legacies(self): canary = Mock() event.listen(self.TargetOne, "event_three", canary) self.TargetOne().dispatch.event_three(4, 5) eq_(canary.mock_calls, [call(4, 5)]) def test_kw_accept_has_legacies(self): canary = Mock() @event.listens_for(self.TargetOne, "event_three", named=True) def handler1(**kw): canary(kw) self.TargetOne().dispatch.event_three(4, 5, 6, 7) eq_(canary.mock_calls, [call({"x": 4, "y": 5, "z": 6, "q": 7})]) def test_kw_accept_plus_kw_has_legacies(self): canary = Mock() @event.listens_for(self.TargetOne, "event_four", named=True) def handler1(**kw): canary(kw) self.TargetOne().dispatch.event_four(4, 5, 6, 7, foo="bar") eq_( canary.mock_calls, [call({"x": 4, "y": 5, "z": 6, "q": 7, "foo": "bar"})], ) class ClsLevelListenTest(TearDownLocalEventsFixture, fixtures.TestBase): def setup_test(self): class TargetEventsOne(event.Events): def event_one(self, x, y): pass class TargetOne: dispatch = event.dispatcher(TargetEventsOne) self.TargetOne = TargetOne def test_lis_subcalss_lis(self): @event.listens_for(self.TargetOne, "event_one") def handler1(x, y): pass class SubTarget(self.TargetOne): pass @event.listens_for(self.TargetOne, "event_one") def handler2(x, y): pass eq_(len(SubTarget().dispatch.event_one), 2) @testing.combinations(True, False, argnames="m1") @testing.combinations(True, False, argnames="m2") @testing.combinations(True, False, argnames="m3") @testing.combinations(True, False, argnames="use_insert") def test_subclass_gen_after_clslisten(self, m1, m2, m3, use_insert): """test #8467""" m1 = Mock() if m1 else None m2 = Mock() if m2 else None m3 = Mock() if m3 else None if m1: event.listen(self.TargetOne, "event_one", m1, insert=use_insert) class SubTarget(self.TargetOne): pass if m2: event.listen(SubTarget, "event_one", m2, insert=use_insert) if m3: event.listen(self.TargetOne, "event_one", m3, insert=use_insert) st = SubTarget() st.dispatch.event_one() for m in m1, m2, m3: if m: eq_(m.mock_calls, [call()]) def test_lis_multisub_lis(self): @event.listens_for(self.TargetOne, "event_one") def handler1(x, y): pass class SubTarget(self.TargetOne): pass class SubSubTarget(SubTarget): pass @event.listens_for(self.TargetOne, "event_one") def handler2(x, y): pass eq_(len(SubTarget().dispatch.event_one), 2) eq_(len(SubSubTarget().dispatch.event_one), 2) def test_two_sub_lis(self): class SubTarget1(self.TargetOne): pass class SubTarget2(self.TargetOne): pass @event.listens_for(self.TargetOne, "event_one") def handler1(x, y): pass @event.listens_for(SubTarget1, "event_one") def handler2(x, y): pass s1 = SubTarget1() assert handler1 in s1.dispatch.event_one assert handler2 in s1.dispatch.event_one s2 = SubTarget2() assert handler1 in s2.dispatch.event_one assert handler2 not in s2.dispatch.event_one class AcceptTargetsTest(TearDownLocalEventsFixture, fixtures.TestBase): """Test default target acceptance.""" def setup_test(self): class TargetEventsOne(event.Events): def event_one(self, x, y): pass class TargetEventsTwo(event.Events): def event_one(self, x, y): pass class TargetOne: dispatch = event.dispatcher(TargetEventsOne) class TargetTwo: dispatch = event.dispatcher(TargetEventsTwo) self.TargetOne = TargetOne self.TargetTwo = TargetTwo def test_target_accept(self): """Test that events of the same name are routed to the correct collection based on the type of target given. """ def listen_one(x, y): pass def listen_two(x, y): pass def listen_three(x, y): pass def listen_four(x, y): pass event.listen(self.TargetOne, "event_one", listen_one) event.listen(self.TargetTwo, "event_one", listen_two) eq_(list(self.TargetOne().dispatch.event_one), [listen_one]) eq_(list(self.TargetTwo().dispatch.event_one), [listen_two]) t1 = self.TargetOne() t2 = self.TargetTwo() event.listen(t1, "event_one", listen_three) event.listen(t2, "event_one", listen_four) eq_(list(t1.dispatch.event_one), [listen_one, listen_three]) eq_(list(t2.dispatch.event_one), [listen_two, listen_four]) class CustomTargetsTest(TearDownLocalEventsFixture, fixtures.TestBase): """Test custom target acceptance.""" def setup_test(self): class TargetEvents(event.Events): @classmethod def _accept_with(cls, target, identifier): if target == "one": return Target else: return None def event_one(self, x, y): pass class Target: dispatch = event.dispatcher(TargetEvents) self.Target = Target def test_indirect(self): def listen(x, y): pass event.listen("one", "event_one", listen) eq_(list(self.Target().dispatch.event_one), [listen]) assert_raises( exc.InvalidRequestError, event.listen, listen, "event_one", self.Target, ) class SubclassGrowthTest(TearDownLocalEventsFixture, fixtures.TestBase): """test that ad-hoc subclasses are garbage collected.""" def setup_test(self): class TargetEvents(event.Events): def some_event(self, x, y): pass class Target: dispatch = event.dispatcher(TargetEvents) self.Target = Target def test_subclass(self): class SubTarget(self.Target): pass st = SubTarget() st.dispatch.some_event(1, 2) del st del SubTarget gc_collect() eq_(self.Target.__subclasses__(), []) class ListenOverrideTest(TearDownLocalEventsFixture, fixtures.TestBase): """Test custom listen functions which change the listener function signature.""" def setup_test(self): class TargetEvents(event.Events): @classmethod def _listen(cls, event_key, add=False): fn = event_key.fn if add: def adapt(x, y): fn(x + y) event_key = event_key.with_wrapper(adapt) event_key.base_listen() def event_one(self, x, y): pass class Target: dispatch = event.dispatcher(TargetEvents) self.Target = Target def test_listen_override(self): listen_one = Mock() listen_two = Mock() event.listen(self.Target, "event_one", listen_one, add=True) event.listen(self.Target, "event_one", listen_two) t1 = self.Target() t1.dispatch.event_one(5, 7) t1.dispatch.event_one(10, 5) eq_(listen_one.mock_calls, [call(12), call(15)]) eq_(listen_two.mock_calls, [call(5, 7), call(10, 5)]) def test_remove_clslevel(self): listen_one = Mock() event.listen(self.Target, "event_one", listen_one, add=True) t1 = self.Target() t1.dispatch.event_one(5, 7) eq_(listen_one.mock_calls, [call(12)]) event.remove(self.Target, "event_one", listen_one) t1.dispatch.event_one(10, 5) eq_(listen_one.mock_calls, [call(12)]) def test_remove_instancelevel(self): listen_one = Mock() t1 = self.Target() event.listen(t1, "event_one", listen_one, add=True) t1.dispatch.event_one(5, 7) eq_(listen_one.mock_calls, [call(12)]) event.remove(t1, "event_one", listen_one) t1.dispatch.event_one(10, 5) eq_(listen_one.mock_calls, [call(12)]) class PropagateTest(TearDownLocalEventsFixture, fixtures.TestBase): def setup_test(self): class TargetEvents(event.Events): def event_one(self, arg): pass def event_two(self, arg): pass class Target: dispatch = event.dispatcher(TargetEvents) self.Target = Target def test_propagate(self): listen_one = Mock() listen_two = Mock() t1 = self.Target() event.listen(t1, "event_one", listen_one, propagate=True) event.listen(t1, "event_two", listen_two) t2 = self.Target() t2.dispatch._update(t1.dispatch) t2.dispatch.event_one(t2, 1) t2.dispatch.event_two(t2, 2) eq_(listen_one.mock_calls, [call(t2, 1)]) eq_(listen_two.mock_calls, []) class JoinTest(TearDownLocalEventsFixture, fixtures.TestBase): def setup_test(self): class TargetEvents(event.Events): def event_one(self, target, arg): pass class BaseTarget: dispatch = event.dispatcher(TargetEvents) class TargetFactory(BaseTarget): def create(self): return TargetElement(self) class TargetElement(BaseTarget): def __init__(self, parent): self.dispatch = self.dispatch._join(parent.dispatch) def run_event(self, arg): list(self.dispatch.event_one) self.dispatch.event_one(self, arg) self.BaseTarget = BaseTarget self.TargetFactory = TargetFactory self.TargetElement = TargetElement def test_neither(self): element = self.TargetFactory().create() element.run_event(1) element.run_event(2) element.run_event(3) def test_kw_ok(self): l1 = Mock() def listen(**kw): l1(kw) event.listen(self.TargetFactory, "event_one", listen, named=True) element = self.TargetFactory().create() element.run_event(1) element.run_event(2) eq_( l1.mock_calls, [ call({"target": element, "arg": 1}), call({"target": element, "arg": 2}), ], ) def test_parent_class_only(self): l1 = Mock() event.listen(self.TargetFactory, "event_one", l1) element = self.TargetFactory().create() element.run_event(1) element.run_event(2) element.run_event(3) eq_( l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) def test_parent_class_child_class(self): l1 = Mock() l2 = Mock() event.listen(self.TargetFactory, "event_one", l1) event.listen(self.TargetElement, "event_one", l2) element = self.TargetFactory().create() element.run_event(1) element.run_event(2) element.run_event(3) eq_( l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) eq_( l2.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) def test_parent_class_child_instance_apply_after(self): l1 = Mock() l2 = Mock() event.listen(self.TargetFactory, "event_one", l1) element = self.TargetFactory().create() element.run_event(1) event.listen(element, "event_one", l2) element.run_event(2) element.run_event(3) eq_( l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) eq_(l2.mock_calls, [call(element, 2), call(element, 3)]) def test_parent_class_child_instance_apply_before(self): l1 = Mock() l2 = Mock() event.listen(self.TargetFactory, "event_one", l1) element = self.TargetFactory().create() event.listen(element, "event_one", l2) element.run_event(1) element.run_event(2) element.run_event(3) eq_( l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) eq_( l2.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) def test_parent_instance_child_class_apply_before(self): l1 = Mock() l2 = Mock() event.listen(self.TargetElement, "event_one", l2) factory = self.TargetFactory() event.listen(factory, "event_one", l1) element = factory.create() element.run_event(1) element.run_event(2) element.run_event(3) eq_( l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) eq_( l2.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) def test_parent_instance_child_class_apply_after(self): l1 = Mock() l2 = Mock() event.listen(self.TargetElement, "event_one", l2) factory = self.TargetFactory() element = factory.create() element.run_event(1) event.listen(factory, "event_one", l1) element.run_event(2) element.run_event(3) # if _JoinedListener fixes .listeners # at construction time, then we don't get # the new listeners. # eq_(l1.mock_calls, []) # alternatively, if _JoinedListener shares the list # using a @property, then we get them, at the arguable # expense of the extra method call to access the .listeners # collection eq_(l1.mock_calls, [call(element, 2), call(element, 3)]) eq_( l2.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) def test_parent_instance_child_instance_apply_before(self): l1 = Mock() l2 = Mock() factory = self.TargetFactory() event.listen(factory, "event_one", l1) element = factory.create() event.listen(element, "event_one", l2) element.run_event(1) element.run_event(2) element.run_event(3) eq_( l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) eq_( l2.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) def test_parent_events_child_no_events(self): l1 = Mock() factory = self.TargetFactory() event.listen(self.TargetElement, "event_one", l1) element = factory.create() element.run_event(1) element.run_event(2) element.run_event(3) eq_( l1.mock_calls, [call(element, 1), call(element, 2), call(element, 3)], ) class DisableClsPropagateTest(TearDownLocalEventsFixture, fixtures.TestBase): def setup_test(self): class TargetEvents(event.Events): def event_one(self, target, arg): pass class BaseTarget: dispatch = event.dispatcher(TargetEvents) class SubTarget(BaseTarget): _sa_propagate_class_events = False def __init__(self, parent): self.dispatch = self.dispatch._join(parent.dispatch) self.BaseTarget = BaseTarget self.SubTarget = SubTarget def test_listen_invoke_clslevel(self): canary = Mock() event.listen(self.BaseTarget, "event_one", canary) s1 = self.SubTarget(self.BaseTarget()) s1.dispatch.event_one() eq_(canary.mock_calls, [call.event_one()]) def test_insert_invoke_clslevel(self): canary = Mock() event.listen(self.BaseTarget, "event_one", canary, insert=True) s1 = self.SubTarget(self.BaseTarget()) s1.dispatch.event_one() eq_(canary.mock_calls, [call.event_one()]) def test_remove_invoke_clslevel(self): canary = Mock() event.listen(self.BaseTarget, "event_one", canary) s1 = self.SubTarget(self.BaseTarget()) event.remove(self.BaseTarget, "event_one", canary) s1.dispatch.event_one() eq_(canary.mock_calls, []) class RemovalTest(TearDownLocalEventsFixture, fixtures.TestBase): def _fixture(self): class TargetEvents(event.Events): def event_one(self, x, y): pass def event_two(self, x): pass def event_three(self, x): pass class Target: dispatch = event.dispatcher(TargetEvents) return Target def _wrapped_fixture(self): class TargetEvents(event.Events): @classmethod def _listen(cls, event_key): fn = event_key._listen_fn def adapt(value): fn("adapted " + value) event_key = event_key.with_wrapper(adapt) event_key.base_listen() def event_one(self, x): pass class Target: dispatch = event.dispatcher(TargetEvents) return Target def test_clslevel(self): Target = self._fixture() m1 = Mock() event.listen(Target, "event_two", m1) t1 = Target() t1.dispatch.event_two("x") event.remove(Target, "event_two", m1) t1.dispatch.event_two("y") eq_(m1.mock_calls, [call("x")]) def test_clslevel_subclass(self): Target = self._fixture() class SubTarget(Target): pass m1 = Mock() event.listen(Target, "event_two", m1) t1 = SubTarget() t1.dispatch.event_two("x") event.remove(Target, "event_two", m1) t1.dispatch.event_two("y") eq_(m1.mock_calls, [call("x")]) def test_instance(self): Target = self._fixture() class Foo: def __init__(self): self.mock = Mock() def evt(self, arg): self.mock(arg) f1 = Foo() f2 = Foo() event.listen(Target, "event_one", f1.evt) event.listen(Target, "event_one", f2.evt) t1 = Target() t1.dispatch.event_one("x") event.remove(Target, "event_one", f1.evt) t1.dispatch.event_one("y") eq_(f1.mock.mock_calls, [call("x")]) eq_(f2.mock.mock_calls, [call("x"), call("y")]) def test_once(self): Target = self._fixture() m1 = Mock() m2 = Mock() m3 = Mock() m4 = Mock() event.listen(Target, "event_one", m1) event.listen(Target, "event_one", m2, once=True) event.listen(Target, "event_one", m3, once=True) t1 = Target() t1.dispatch.event_one("x") t1.dispatch.event_one("y") event.listen(Target, "event_one", m4, once=True) t1.dispatch.event_one("z") t1.dispatch.event_one("q") eq_(m1.mock_calls, [call("x"), call("y"), call("z"), call("q")]) eq_(m2.mock_calls, [call("x")]) eq_(m3.mock_calls, [call("x")]) eq_(m4.mock_calls, [call("z")]) def test_once_unless_exception(self): Target = self._fixture() m1 = Mock() m2 = Mock() m3 = Mock() m4 = Mock() m1.side_effect = ValueError m2.side_effect = ValueError m3.side_effect = ValueError event.listen(Target, "event_one", m1) event.listen(Target, "event_one", m2, _once_unless_exception=True) event.listen(Target, "event_one", m3, _once_unless_exception=True) t1 = Target() # only m1 is called, raises assert_raises(ValueError, t1.dispatch.event_one, "x") # now m1 and m2 can be called but not m3 m1.side_effect = None assert_raises(ValueError, t1.dispatch.event_one, "y") # now m3 can be called m2.side_effect = None event.listen(Target, "event_one", m4, _once_unless_exception=True) assert_raises(ValueError, t1.dispatch.event_one, "z") assert_raises(ValueError, t1.dispatch.event_one, "q") eq_(m1.mock_calls, [call("x"), call("y"), call("z"), call("q")]) eq_(m2.mock_calls, [call("y"), call("z")]) eq_(m3.mock_calls, [call("z"), call("q")]) eq_(m4.mock_calls, []) # m4 never got called because m3 blocked it # now m4 can be called m3.side_effect = None t1.dispatch.event_one("p") eq_( m1.mock_calls, [call("x"), call("y"), call("z"), call("q"), call("p")], ) # m2 already got called, so no "p" eq_(m2.mock_calls, [call("y"), call("z")]) eq_(m3.mock_calls, [call("z"), call("q"), call("p")]) eq_(m4.mock_calls, [call("p")]) t1.dispatch.event_one("j") eq_( m1.mock_calls, [call("x"), call("y"), call("z"), call("q"), call("p"), call("j")], ) # nobody got "j" because they've all been successful eq_(m2.mock_calls, [call("y"), call("z")]) eq_(m3.mock_calls, [call("z"), call("q"), call("p")]) eq_(m4.mock_calls, [call("p")]) def test_once_doesnt_dereference_listener(self): # test for [ticket:4794] Target = self._fixture() canary = Mock() def go(target, given_id): def anonymous(run_id): canary(run_id, given_id) event.listen(target, "event_one", anonymous, once=True) t1 = Target() assert_calls = [] given_ids = [] for given_id in range(100): given_ids.append(given_id) go(t1, given_id) if given_id % 10 == 0: t1.dispatch.event_one(given_id) assert_calls.extend(call(given_id, i) for i in given_ids) given_ids[:] = [] eq_(canary.mock_calls, assert_calls) def test_propagate(self): Target = self._fixture() m1 = Mock() t1 = Target() t2 = Target() event.listen(t1, "event_one", m1, propagate=True) event.listen(t1, "event_two", m1, propagate=False) t2.dispatch._update(t1.dispatch) t1.dispatch.event_one("t1e1x") t1.dispatch.event_two("t1e2x") t2.dispatch.event_one("t2e1x") t2.dispatch.event_two("t2e2x") event.remove(t1, "event_one", m1) event.remove(t1, "event_two", m1) t1.dispatch.event_one("t1e1y") t1.dispatch.event_two("t1e2y") t2.dispatch.event_one("t2e1y") t2.dispatch.event_two("t2e2y") eq_(m1.mock_calls, [call("t1e1x"), call("t1e2x"), call("t2e1x")]) @testing.requires.predictable_gc def test_listener_collection_removed_cleanup(self): from sqlalchemy.event import registry Target = self._fixture() m1 = Mock() t1 = Target() event.listen(t1, "event_one", m1) key = (id(t1), "event_one", id(m1)) assert key in registry._key_to_collection collection_ref = list(registry._key_to_collection[key])[0] assert collection_ref in registry._collection_to_key t1.dispatch.event_one("t1") del t1 gc_collect() assert key not in registry._key_to_collection assert collection_ref not in registry._collection_to_key def test_remove_not_listened(self): Target = self._fixture() m1 = Mock() t1 = Target() event.listen(t1, "event_one", m1, propagate=True) event.listen(t1, "event_three", m1) event.remove(t1, "event_one", m1) assert_raises_message( exc.InvalidRequestError, r"No listeners found for event <.*Target.*> / " r"'event_two' / ", event.remove, t1, "event_two", m1, ) event.remove(t1, "event_three", m1) def test_no_remove_in_event(self): Target = self._fixture() t1 = Target() def evt(): event.remove(t1, "event_one", evt) event.listen(t1, "event_one", evt) assert_raises_message( Exception, "deque mutated during iteration", t1.dispatch.event_one ) def test_no_add_in_event(self): Target = self._fixture() t1 = Target() m1 = Mock() def evt(): event.listen(t1, "event_one", m1) event.listen(t1, "event_one", evt) assert_raises_message( Exception, "deque mutated during iteration", t1.dispatch.event_one ) def test_remove_plain_named(self): Target = self._fixture() listen_one = Mock() t1 = Target() event.listen(t1, "event_one", listen_one, named=True) t1.dispatch.event_one("t1") eq_(listen_one.mock_calls, [call(x="t1")]) event.remove(t1, "event_one", listen_one) t1.dispatch.event_one("t2") eq_(listen_one.mock_calls, [call(x="t1")]) def test_remove_wrapped_named(self): Target = self._wrapped_fixture() listen_one = Mock() t1 = Target() event.listen(t1, "event_one", listen_one, named=True) t1.dispatch.event_one("t1") eq_(listen_one.mock_calls, [call(x="adapted t1")]) event.remove(t1, "event_one", listen_one) t1.dispatch.event_one("t2") eq_(listen_one.mock_calls, [call(x="adapted t1")]) def test_double_event_nonwrapped(self): Target = self._fixture() listen_one = Mock() t1 = Target() event.listen(t1, "event_one", listen_one) event.listen(t1, "event_one", listen_one) t1.dispatch.event_one("t1") # doubles are eliminated eq_(listen_one.mock_calls, [call("t1")]) # only one remove needed event.remove(t1, "event_one", listen_one) t1.dispatch.event_one("t2") eq_(listen_one.mock_calls, [call("t1")]) def test_double_event_wrapped(self): # this is issue #3199 Target = self._wrapped_fixture() listen_one = Mock() t1 = Target() event.listen(t1, "event_one", listen_one) event.listen(t1, "event_one", listen_one) t1.dispatch.event_one("t1") # doubles are eliminated eq_(listen_one.mock_calls, [call("adapted t1")]) # only one remove needed event.remove(t1, "event_one", listen_one) t1.dispatch.event_one("t2") eq_(listen_one.mock_calls, [call("adapted t1")])