summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl83
1 files changed, 76 insertions, 7 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index cc7568720d..6fe8677e94 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -448,7 +448,7 @@ apply(#{index := RaftIdx}, #purge{},
lists:reverse([garbage_collection | Effects])};
apply(_, {down, ConsumerPid, noconnection},
#state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
% mark all consumers and enqueuers as suspected down
% and monitor the node so that we can find out the final state of the
@@ -472,6 +472,9 @@ apply(_, {down, ConsumerPid, noconnection},
E#enqueuer{suspected_down = true};
(_, E) -> E
end, Enqs0),
+ % mark waiting consumers as suspected if necessary
+ WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, true),
+
Effects = case maps:size(Cons) of
0 ->
[{aux, inactive}, {monitor, node, Node}];
@@ -479,7 +482,7 @@ apply(_, {down, ConsumerPid, noconnection},
[{monitor, node, Node}]
end,
%% TODO: should we run a checkout here?
- {State#state{consumers = Cons, enqueuers = Enqs}, ok, Effects};
+ {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects};
apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
@@ -500,9 +503,10 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
{Effects2, State3} = lists:foldl(fun cancel_consumer/2, {Effects1, State2},
DownConsumers),
checkout(State3, Effects2);
-apply(_, {nodeup, Node}, #state{consumers = Cons0,
- enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+apply(_, {nodeup, Node}, #state{consumers = Cons0,
+ enqueuers = Enqs0,
+ service_queue = SQ0,
+ waiting_consumers = WaitingConsumers0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -517,7 +521,17 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
[P | Acc];
(_, _, Acc) -> Acc
end, [], Enqs0),
- Monitors = [{monitor, process, P} || P <- Cons ++ Enqs],
+ WaitingConsumers = lists:foldl(fun({{_, P}, #consumer{suspected_down = true}}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, [], WaitingConsumers0),
+
+ Monitors = [{monitor, process, P} || P <- Cons ++ Enqs ++ WaitingConsumers],
+
+ % un-suspect waiting consumers when necessary
+ WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, false),
+
Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{suspected_down = false};
(_, E) -> E
@@ -533,7 +547,7 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
end, {Cons0, SQ0, Monitors}, Cons0),
% TODO: avoid list concat
checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ}, Effects);
+ service_queue = SQ, waiting_consumers = WaitingConsumers1}, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
apply(_, #update_config{config = Conf}, State) ->
@@ -556,6 +570,22 @@ maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_st
State2 = State0#state{waiting_consumers = WaitingConsumersStillUp},
{Effects1, State2}.
+maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, _Suspected) ->
+ [];
+maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = single_active,
+ waiting_consumers = []}, _Suspected) ->
+ [];
+maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers}, Suspected) ->
+ [begin
+ case node(P) of
+ Node ->
+ {ConsumerId, Consumer#consumer{suspected_down = Suspected}};
+ _ ->
+ {ConsumerId, Consumer}
+ end
+ end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers].
+
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
@@ -2108,6 +2138,45 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
ok.
+single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, self()}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ % simulate node goes down
+ {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1),
+
+ % all the waiting consumers should be suspected down
+ ?assertEqual(3, length(State2#state.waiting_consumers)),
+ lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) ->
+ ?assert(SuspectedDown)
+ end, State2#state.waiting_consumers),
+
+ % simulate node goes back up
+ {State3, _, _} = apply(#{}, {nodeup, node(self())}, State2),
+
+ % all the waiting consumers should be un-suspected
+ ?assertEqual(3, length(State3#state.waiting_consumers)),
+ lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) ->
+ ?assertNot(SuspectedDown)
+ end, State3#state.waiting_consumers),
+
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.