diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 241 |
1 files changed, 212 insertions, 29 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index c7a3c21138..4fe4d954b9 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -449,7 +449,7 @@ apply(#{index := RaftIdx}, #purge{}, lists:reverse([garbage_collection | Effects])}; apply(_, {down, ConsumerPid, noconnection}, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), % mark all consumers and enqueuers as suspected down % and monitor the node so that we can find out the final state of the @@ -473,6 +473,9 @@ apply(_, {down, ConsumerPid, noconnection}, E#enqueuer{suspected_down = true}; (_, E) -> E end, Enqs0), + % mark waiting consumers as suspected if necessary + WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, true), + Effects = case maps:size(Cons) of 0 -> [{aux, inactive}, {monitor, node, Node}]; @@ -480,9 +483,9 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node}] end, %% TODO: should we run a checkout here? - {State#state{consumers = Cons, enqueuers = Enqs}, ok, Effects}; -apply(_, {down, Pid, _Info}, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + {State#state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers1}, ok, Effects}; +apply(_, {down, Pid, _Info}, #state{consumers = Cons0, + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of @@ -493,16 +496,18 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0, error -> State0 end, + {Effects1, State2} = maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, State1), % return checked out messages to main queue % Find the consumers for the down pid DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), - {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {[], State1}, + {Effects2, State3} = lists:foldl(fun cancel_consumer/2, {Effects1, State2}, DownConsumers), - checkout(State2, Effects1); -apply(_, {nodeup, Node}, #state{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0} = State0) -> + checkout(State3, Effects2); +apply(_, {nodeup, Node}, #state{consumers = Cons0, + enqueuers = Enqs0, + service_queue = SQ0, + waiting_consumers = WaitingConsumers0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -517,7 +522,17 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, [P | Acc]; (_, _, Acc) -> Acc end, [], Enqs0), - Monitors = [{monitor, process, P} || P <- Cons ++ Enqs], + WaitingConsumers = lists:foldl(fun({{_, P}, #consumer{suspected_down = true}}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, Acc) -> Acc + end, [], WaitingConsumers0), + + Monitors = [{monitor, process, P} || P <- Cons ++ Enqs ++ WaitingConsumers], + + % un-suspect waiting consumers when necessary + WaitingConsumers1 = maybe_mark_suspect_waiting_consumers(Node, State0, false), + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = false}; (_, E) -> E @@ -533,12 +548,45 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0, end, {Cons0, SQ0, Monitors}, Cons0), % TODO: avoid list concat checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ}, Effects); + service_queue = SQ, waiting_consumers = WaitingConsumers1}, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(_, #update_config{config = Conf}, State) -> {update_config(Conf, State), ok}. +maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = default} = State) -> + {[], State}; +maybe_deal_with_waiting_consumers_when_channel_goes_down(_Pid, #state{consumer_strategy = single_active, + waiting_consumers = []} = State) -> + {[], State}; +maybe_deal_with_waiting_consumers_when_channel_goes_down(Pid, #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers0} = State0) -> + % get cancel effects for down waiting consumers + DownWaitingConsumers = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, WaitingConsumers0), + Effects1 = lists:foldl(fun ({ConsumerId, _}, Effects) -> + cancel_consumer_effects(ConsumerId, State0, Effects) + end, [], DownWaitingConsumers), + % update state to have only up waiting consumers + WaitingConsumersStillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end, WaitingConsumers0), + State2 = State0#state{waiting_consumers = WaitingConsumersStillUp}, + {Effects1, State2}. + +maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, _Suspected) -> + []; +maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = single_active, + waiting_consumers = []}, _Suspected) -> + []; +maybe_mark_suspect_waiting_consumers(Node, #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers}, Suspected) -> + [begin + case node(P) of + Node -> + {ConsumerId, Consumer#consumer{suspected_down = Suspected}}; + _ -> + {ConsumerId, Consumer} + end + end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers]. + -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, @@ -653,24 +701,32 @@ query_processes(#state{enqueuers = Enqs, consumers = Cons0}) -> query_ra_indexes(#state{ra_indexes = RaIndexes}) -> RaIndexes. -query_consumer_count(#state{consumers = Consumers}) -> - maps:size(Consumers). - -query_consumers(#state{consumers = Consumers}) -> - maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> - {Pid, Tag, - maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), - maps:get(args, Meta, []), - maps:get(username, Meta, undefined)} - end, Consumers). +query_consumer_count(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> + maps:size(Consumers) + length(WaitingConsumers). + +query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers}) -> + FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)} + end, Consumers), + FromWaitingConsumers = lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta}}, Acc) -> + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, WaitingConsumers), + maps:merge(FromConsumers, FromWaitingConsumers). query_stat(#state{messages = M, consumers = Consumers}) -> {maps:size(M), maps:size(Consumers)}. -%% other - -spec usage(atom()) -> float(). usage(Name) when is_atom(Name) -> case ets:lookup(rabbit_fifo_usage, Name) of @@ -745,8 +801,9 @@ cancel_consumer(ConsumerId, % The cancelled consumer is not the active one % Just remove it from idle_consumers {value, _Consumer, WaitingConsumers1} = lists:keytake(ConsumerId, 1, WaitingConsumers0), + Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, so nothing special to do here - {Effects0, State0#state{waiting_consumers = WaitingConsumers1}} + {Effects, State0#state{waiting_consumers = WaitingConsumers1}} end. cancel_consumer0(ConsumerId, @@ -1999,7 +2056,7 @@ single_active_consumer_test() -> ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)), % cancelling a waiting consumer - {State2, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), + {State2, _, Effects1} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1), % the active consumer should still be in place ?assertEqual(1, map_size(State2#state.consumers)), ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)), @@ -2007,33 +2064,159 @@ single_active_consumer_test() -> ?assertEqual(2, length(State2#state.waiting_consumers)), ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)), ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)), + % there are some effects to unregister the consumer + ?assertEqual(1, length(Effects1)), % cancelling the active consumer - {State3, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), + {State3, _, Effects2} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#state.consumers)), ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)), % the new active consumer is no longer in the waiting list ?assertEqual(1, length(State3#state.waiting_consumers)), ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)), + % there are some effects to unregister the consumer + ?assertEqual(1, length(Effects2)), % cancelling the active consumer - {State4, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), + {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3), % the last waiting consumer became the active one ?assertEqual(1, map_size(State4#state.consumers)), ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)), % the waiting consumer list is now empty ?assertEqual(0, length(State4#state.waiting_consumers)), + % there are some effects to unregister the consumer + ?assertEqual(1, length(Effects3)), % cancelling the last consumer - {State5, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), + {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4), % no active consumer anymore ?assertEqual(0, map_size(State5#state.consumers)), % still nothing in the waiting list ?assertEqual(0, length(State5#state.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?assertEqual(1 + 1, length(Effects4)), + + ok. + +single_active_consumer_cancel_consumer_when_channel_is_down_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, ChannelId}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + % the channel of the active consumer goes down + {State2, _, Effects} = apply(#{}, {down, Pid1, doesnotmatter}, State1), + % fell back to another consumer + ?assertEqual(1, map_size(State2#state.consumers)), + % there are still waiting consumers + ?assertEqual(2, length(State2#state.waiting_consumers)), + % the effect to unregister the consumer is there + ?assertEqual(1, length(Effects)), + + % the channel of the active consumer and a waiting consumer goes down + {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2), + % fell back to another consumer + ?assertEqual(1, map_size(State3#state.consumers)), + % no more waiting consumer + ?assertEqual(0, length(State3#state.waiting_consumers)), + % effects to cancel both consumers of this channel + ?assertEqual(2, length(Effects2)), + + % the last channel goes down + {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3), + % no more consumers + ?assertEqual(0, map_size(State4#state.consumers)), + ?assertEqual(0, length(State4#state.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?assertEqual(1 + 1, length(Effects3)), ok. +single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, self()}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + % simulate node goes down + {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), + + % all the waiting consumers should be suspected down + ?assertEqual(3, length(State2#state.waiting_consumers)), + lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> + ?assert(SuspectedDown) + end, State2#state.waiting_consumers), + + % simulate node goes back up + {State3, _, _} = apply(#{}, {nodeup, node(self())}, State2), + + % all the waiting consumers should be un-suspected + ?assertEqual(3, length(State3#state.waiting_consumers)), + lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> + ?assertNot(SuspectedDown) + end, State3#state.waiting_consumers), + + ok. + +query_consumers_test() -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + shadow_copy_interval => 0, + single_active_consumer_on => true}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{}, + #checkout{spec = {once, 1, simple_prefetch}, + meta = #{}, + consumer_id = {CTag, self()}}, + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + ?assertEqual(4, query_consumer_count(State1)), + Consumers = query_consumers(State1), + ?assertEqual(4, maps:size(Consumers)), + maps:fold(fun({_Tag, Pid}, {Pid, _Tag, _, _, _, _}, _Acc) -> + ?assertEqual(self(), Pid) + end, [], Consumers). + meta(Idx) -> #{index => Idx, term => 1}. |
