diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-16 11:42:30 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-01-16 11:42:30 +0100 |
| commit | caef9d20782b30a44efa3d2707446018f4c32e7b (patch) | |
| tree | d459bb4e72f8289909c41024203ef9058969f2e0 | |
| parent | a80f6568321d596d5fbe26edddc05d070a87a114 (diff) | |
| download | rabbitmq-server-git-caef9d20782b30a44efa3d2707446018f4c32e7b.tar.gz | |
Monitor waiting consumers when entering state
For quorum queues. Otherwise dead waiting consumers can be kept in the state
between restarts.
| -rw-r--r-- | src/rabbit_fifo.erl | 73 |
1 files changed, 69 insertions, 4 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 48c9379970..94a4e7d709 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -591,11 +591,14 @@ maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_act -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, + waiting_consumers = WaitingConsumers, name = Name, prefix_msg_counts = {0, 0}, become_leader_handler = BLH}) -> % return effects to monitor all current consumers and enqueuers - Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), + Pids = lists:usort(maps:keys(Enqs) + ++ [P || {_, P} <- maps:keys(Cons)] + ++ [P || {{_, P}, _} <- WaitingConsumers]), Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), @@ -610,9 +613,11 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) when PrefixMsgCounts =/= {0, 0} -> %% TODO: remove assertion? exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); -state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) -> +state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), - [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))]; + WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0), + AllConsumers = maps:merge(Custs, WaitingConsumers1), + [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))]; state_enter(_, _) -> %% catch all as not handling all states []. @@ -1401,6 +1406,8 @@ test_init(Name) -> atom_to_binary(Name, utf8)), shadow_copy_interval => 0}). +% To launch these tests: make eunit EUNIT_MODS="rabbit_fifo" + enq_enq_checkout_test() -> Cid = {<<"enq_enq_checkout_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), @@ -2221,6 +2228,64 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti ok. +single_active_consumer_state_enter_leader_include_waiting_consumers_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = state_enter(leader, State1), + % 2 effects for each consumer process (channel process), 1 effect for the node + ?assertEqual(2 * 3 + 1, length(Effects)). + +single_active_consumer_state_enter_eol_include_waiting_consumers_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = state_enter(eol, State1), + % 1 effect for each consumer process (channel process) + ?assertEqual(3, length(Effects)). + query_consumers_test() -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, @@ -2243,7 +2308,7 @@ query_consumers_test() -> ?assertEqual(4, query_consumer_count(State1)), Consumers = query_consumers(State1), ?assertEqual(4, maps:size(Consumers)), - maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _}, _Acc) -> + maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _, _}, _Acc) -> ?assertEqual(self(), Pid) end, [], Consumers). |
