diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 83 |
1 files changed, 76 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index cc7568720d..6fe8677e94 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -448,7 +448,7 @@ apply(#{index := RaftIdx}, #purge{}, lists:reverse([garbage_collection | Effects])}; apply(_, {down, ConsumerPid, noconnection}, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), % mark all consumers and enqueuers as suspected down % and monitor the node so that we can find out the final state of the @@ -472,6 +472,9 @@ apply(_, {down, ConsumerPid, noconnection}, E#enqueuer{suspected_down = true}; (_, E) -> E end, Enqs0), + % mark waiting consumers as suspected if necessary + WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, true), + Effects = case maps:size(Cons) of 0 -> [{aux, inactive}, {monitor, node, Node}]; @@ -479,7 +482,7 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node}] end, %% TODO: should we run a checkout here? - {State#state{consumers = Cons, enqueuers = Enqs}, ok, Effects}; + {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects}; apply(_, {down, Pid, _Info}, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -500,9 +503,10 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0, {Effects2, State3} = lists:foldl(fun cancel_consumer/2, {Effects1, State2}, DownConsumers), checkout(State3, Effects2); -apply(_, {nodeup, Node}, #state{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0} = State0) -> +apply(_, {nodeup, Node}, #state{consumers = Cons0, + enqueuers = Enqs0, + service_queue = SQ0, + waiting_consumers = WaitingConsumers0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -517,7 +521,17 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, [P | Acc]; (_, _, Acc) -> Acc end, [], Enqs0), - Monitors = [{monitor, process, P} || P <- Cons ++ Enqs], + WaitingConsumers = lists:foldl(fun({{_, P}, #consumer{suspected_down = true}}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, Acc) -> Acc + end, [], WaitingConsumers0), + + Monitors = [{monitor, process, P} || P <- Cons ++ Enqs ++ WaitingConsumers], + + % un-suspect waiting consumers when necessary + WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, false), + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = false}; (_, E) -> E @@ -533,7 +547,7 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, end, {Cons0, SQ0, Monitors}, Cons0), % TODO: avoid list concat checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ}, Effects); + service_queue = SQ, waiting_consumers = WaitingConsumers1}, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(_, #update_config{config = Conf}, State) -> @@ -556,6 +570,22 @@ maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_st State2 = State0#state{waiting_consumers = WaitingConsumersStillUp}, {Effects1, State2}. +maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, _Suspected) -> + []; +maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = single_active, + waiting_consumers = []}, _Suspected) -> + []; +maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers}, Suspected) -> + [begin + case node(P) of + Node -> + {ConsumerId, Consumer#consumer{suspected_down = Suspected}}; + _ -> + {ConsumerId, Consumer} + end + end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers]. + -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, @@ -2108,6 +2138,45 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> ok. +single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, self()}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + % simulate node goes down + {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), + + % all the waiting consumers should be suspected down + ?assertEqual(3, length(State2#state.waiting_consumers)), + lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> + ?assert(SuspectedDown) + end, State2#state.waiting_consumers), + + % simulate node goes back up + {State3, _, _} = apply(#{}, {nodeup, node(self())}, State2), + + % all the waiting consumers should be un-suspected + ?assertEqual(3, length(State3#state.waiting_consumers)), + lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> + ?assertNot(SuspectedDown) + end, State3#state.waiting_consumers), + + ok. + meta(Idx) -> #{index => Idx, term => 1}. |
