summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-16 11:42:30 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-16 11:42:30 +0100
commitcaef9d20782b30a44efa3d2707446018f4c32e7b (patch)
treed459bb4e72f8289909c41024203ef9058969f2e0
parenta80f6568321d596d5fbe26edddc05d070a87a114 (diff)
downloadrabbitmq-server-git-caef9d20782b30a44efa3d2707446018f4c32e7b.tar.gz
Monitor waiting consumers when entering state
For quorum queues. Otherwise dead waiting consumers can be kept in the state between restarts.
-rw-r--r--src/rabbit_fifo.erl73
1 files changed, 69 insertions, 4 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 48c9379970..94a4e7d709 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -591,11 +591,14 @@ maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_act
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
+ waiting_consumers = WaitingConsumers,
name = Name,
prefix_msg_counts = {0, 0},
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
- Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
+ Pids = lists:usort(maps:keys(Enqs)
+ ++ [P || {_, P} <- maps:keys(Cons)]
+ ++ [P || {{_, P}, _} <- WaitingConsumers]),
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
@@ -610,9 +613,11 @@ state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
when PrefixMsgCounts =/= {0, 0} ->
%% TODO: remove assertion?
exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
-state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) ->
+state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0, waiting_consumers = WaitingConsumers0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
- [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
+ WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end, #{}, WaitingConsumers0),
+ AllConsumers = maps:merge(Custs, WaitingConsumers1),
+ [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, AllConsumers))];
state_enter(_, _) ->
%% catch all as not handling all states
[].
@@ -1401,6 +1406,8 @@ test_init(Name) ->
atom_to_binary(Name, utf8)),
shadow_copy_interval => 0}).
+% To launch these tests: make eunit EUNIT_MODS="rabbit_fifo"
+
enq_enq_checkout_test() ->
Cid = {<<"enq_enq_checkout_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
@@ -2221,6 +2228,64 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
ok.
+single_active_consumer_state_enter_leader_include_waiting_consumers_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = state_enter(leader, State1),
+ % 2 effects for each consumer process (channel process), 1 effect for the node
+ ?assertEqual(2 * 3 + 1, length(Effects)).
+
+single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ DummyFunction = fun() -> ok end,
+ Pid1 = spawn(DummyFunction),
+ Pid2 = spawn(DummyFunction),
+ Pid3 = spawn(DummyFunction),
+
+ % adding some consumers
+ AddConsumer = fun({CTag, ChannelId}, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, ChannelId}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0,
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+
+ Effects = state_enter(eol, State1),
+ % 1 effect for each consumer process (channel process)
+ ?assertEqual(3, length(Effects)).
+
query_consumers_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
@@ -2243,7 +2308,7 @@ query_consumers_test() ->
?assertEqual(4, query_consumer_count(State1)),
Consumers = query_consumers(State1),
?assertEqual(4, maps:size(Consumers)),
- maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _}, _Acc) ->
+ maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _, _}, _Acc) ->
?assertEqual(self(), Pid)
end, [], Consumers).