diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-27 11:31:09 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-27 11:31:09 +0000 |
| commit | 3b0adfda40edf59496ff9f6d994a11c27971a3f5 (patch) | |
| tree | 40c39f89a760c55d7cf8aee51a8ea0b279b4e0fe | |
| parent | b9873465666d143bd1fc70a828d417ce48b5b1c3 (diff) | |
| download | rabbitmq-server-git-3b0adfda40edf59496ff9f6d994a11c27971a3f5.tar.gz | |
rabbit_fifo: change single active consumer on noconnection
To ensure availability and progress when a node gets disconnected.
[#164135123]
| -rw-r--r-- | src/rabbit_fifo.erl | 219 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 7 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 244 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 17 | ||||
| -rw-r--r-- | test/test_util.erl | 28 |
5 files changed, 324 insertions, 191 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index b06d34e83a..64f5b09bb4 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -94,6 +94,7 @@ #credit{} | #purge{} | #update_config{}. + -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types supported by ra fifo @@ -134,7 +135,7 @@ update_config(Conf, State) -> true -> single_active; false -> - default + competing end, Cfg = State#?MODULE.cfg, State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI, @@ -248,7 +249,8 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, {State0, {dequeue, empty}}; Ready -> State1 = update_consumer(ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, State0), + {once, 1, simple_prefetch}, + State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), case Settlement of unsettled -> @@ -257,10 +259,9 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, [{monitor, process, Pid}]}; settled -> %% immediately settle the checkout - {State, _, Effects} = apply(Meta, - make_settle(ConsumerId, - [MsgId]), - State2), + {State, _, Effects} = + apply(Meta, make_settle(ConsumerId, [MsgId]), + State2), {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} end end; @@ -294,27 +295,64 @@ apply(#{index := RaftIdx}, #purge{}, %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; -apply(_, {down, ConsumerPid, noconnection}, +apply(_, {down, Pid, noconnection}, + #?MODULE{consumers = Cons0, + cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = Waiting0, + enqueuers = Enqs0} = State0) -> + Node = node(Pid), + %% if the pid refers to the active consumer, mark it as suspected and return + %% it to the waiting queue + {State1, Effects0} = case maps:to_list(Cons0) of + [{{_, P} = Cid, C}] when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% + Effs = consumer_update_active_effects( + State0, Cid, C, false, suspected_down, []), + {State0#?MODULE{consumers = #{}, + waiting_consumers = Waiting0 ++ [{Cid, C}]}, + Effs}; + _ -> {State0, []} + end, + WaitingConsumers = update_waiting_consumer_status(Node, State1, + suspected_down), + + %% select a new consumer from the waiting queue and run a checkout + State2 = State1#?MODULE{waiting_consumers = WaitingConsumers}, + {State, Effects1} = activate_next_consumer(State2, Effects0), + + %% mark any enquers as suspected + Enqs = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{status = suspected_down}; + (_, E) -> E + end, Enqs0), + Effects = [{monitor, node, Node} | Effects1], + {State#?MODULE{enqueuers = Enqs}, ok, Effects}; +apply(_, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> - Node = node(ConsumerPid), + %% A node has been disconnected. This doesn't necessarily mean that + %% any processes on this node are down, they _may_ come back so here + %% we just mark them as suspected (effectively deactivated) + %% and return all checked out messages to the main queue for delivery to any + %% live consumers + %% + %% all pids for the disconnected node will be marked as suspected not just + %% the one we got the `down' command for + Node = node(Pid), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - % mark all consumers and enqueuers as suspected down - % and monitor the node so that we can find out the final state of the - % process at some later point + {Cons, State, Effects1} = - maps:fold(fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0, Eff}) when (node(P) =:= Node) and - (C#consumer.status =/= cancelled)-> + maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0, + status = up} = C, + {Co, St0, Eff}) when node(P) =:= Node -> {St, Eff0} = return_all(St0, Checked0, Eff, K, C), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff0), - {maps:put(K, - C#consumer{status = suspected_down, - credit = Credit, - checked_out = #{}}, Co), + {maps:put(K, C#consumer{status = suspected_down, + credit = Credit, + checked_out = #{}}, Co), St, Eff1}; (K, C, {Co, St, Eff}) -> {maps:put(K, C, Co), St, Eff} @@ -323,10 +361,10 @@ apply(_, {down, ConsumerPid, noconnection}, E#enqueuer{status = suspected_down}; (_, E) -> E end, Enqs0), - % mark waiting consumers as suspected if necessary - WaitingConsumers = update_waiting_consumer_status(Node, State0, - suspected_down), + % Monitor the node so that we can "unsuspect" these processes when the node + % comes back, then re-issue all monitors and discover the final fate of + % these processes Effects2 = case maps:size(Cons) of 0 -> [{aux, inactive}, {monitor, node, Node}]; @@ -334,8 +372,7 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node}] end ++ Effects1, %% TODO: should we run a checkout here? - {State#?MODULE{consumers = Cons, enqueuers = Enqs, - waiting_consumers = WaitingConsumers}, ok, Effects2}; + {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2}; apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -367,36 +404,36 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Monitors = [{monitor, process, P} || P <- suspected_pids_for(Node, State0)], - % un-suspect waiting consumers when necessary - WaitingConsumers = update_waiting_consumer_status(Node, State0, up), - Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{status = up}; (_, E) -> E end, Enqs0), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - {Cons1, SQ, Effects} = - maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when (node(P) =:= Node) and - (C#consumer.status =/= cancelled) -> - EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, - true, up, EAcc), - update_or_remove_sub( - ConsumerId, C#consumer{status = up}, - CAcc, SQAcc, EAcc1); - (_, _, Acc) -> - Acc - end, {Cons0, SQ0, Monitors}, Cons0), - - checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ, - waiting_consumers = WaitingConsumers}, Effects); + %% mark all consumers as up + {Cons1, SQ, Effects1} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, + C, true, up, EAcc), + update_or_remove_sub(ConsumerId, + C#consumer{status = up}, CAcc, + SQAcc, EAcc1); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Monitors}, Cons0), + Waiting = update_waiting_consumer_status(Node, State0, up), + State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ, + waiting_consumers = Waiting}, + {State, Effects} = activate_next_consumer(State1, Effects1), + checkout(Meta, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = default}}) -> +consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) @@ -407,7 +444,7 @@ consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = sin end. handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State) -> + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) -> {[], State}; handle_waiting_consumer_down(_Pid, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, @@ -429,27 +466,18 @@ handle_waiting_consumer_down(Pid, State = State0#?MODULE{waiting_consumers = StillUp}, {Effects, State}. -update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = default}}, - _Status) -> - []; -update_waiting_consumer_status(_Node, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = []}, - _Status) -> - []; update_waiting_consumer_status(Node, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers}, + #?MODULE{waiting_consumers = WaitingConsumers}, Status) -> [begin - case node(P) of + case node(Pid) of Node -> {ConsumerId, Consumer#consumer{status = Status}}; _ -> {ConsumerId, Consumer} end - end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers, - Consumer#consumer.status =/= cancelled]. + end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers, + Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #?MODULE{consumers = Cons, @@ -583,7 +611,7 @@ query_consumers(#?MODULE{consumers = Consumers, cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = case ConsumerStrategy of - default -> + competing -> fun(_ConsumerId, #consumer{status = Status}) -> case Status of @@ -709,8 +737,8 @@ num_checked_out(#?MODULE{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) -> - %% general case, single active consumer off + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, + Effects, Reason) -> cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, @@ -721,41 +749,23 @@ cancel_consumer(ConsumerId, cancel_consumer(ConsumerId, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers0} = State0, + waiting_consumers = Waiting0} = State0, Effects0, Reason) -> %% single active consumer on, consumers are waiting - case maps:take(ConsumerId, Cons0) of - {Consumer, Cons1} -> + case maps:is_key(ConsumerId, Cons0) of + true -> % The active consumer is to be removed - % Cancel it - {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, - State0, Effects0, Reason), - Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1), - % Take another one from the waiting consumers and put it in consumers - [{NewActiveConsumerId, NewActiveConsumer} - | RemainingWaitingConsumers] = WaitingConsumers0, - #?MODULE{service_queue = ServiceQueue} = State1, - ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, - NewActiveConsumer, - ServiceQueue), - State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId, - NewActiveConsumer, - State1#?MODULE.consumers), - service_queue = ServiceQueue1, - waiting_consumers = RemainingWaitingConsumers}, - Effects = consumer_update_active_effects(State, NewActiveConsumerId, - NewActiveConsumer, true, - single_active, Effects2), - {State, Effects}; - error -> + {State1, Effects1} = cancel_consumer0(ConsumerId, State0, + Effects0, Reason), + activate_next_consumer(State1, Effects1); + false -> % The cancelled consumer is not the active one % Just remove it from idle_consumers - WaitingConsumers = lists:keydelete(ConsumerId, 1, - WaitingConsumers0), + Waiting = lists:keydelete(ConsumerId, 1, Waiting0), Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, % so nothing special to do here - {State0#?MODULE{waiting_consumers = WaitingConsumers}, Effects} + {State0#?MODULE{waiting_consumers = Waiting}, Effects} end. consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, @@ -765,9 +775,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, Ack = maps:get(ack, Meta, undefined), Prefetch = maps:get(prefetch, Meta, undefined), Args = maps:get(args, Meta, []), - [{mod_call, - rabbit_quorum_queue, - update_consumer_handler, + [{mod_call, rabbit_quorum_queue, update_consumer_handler, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. @@ -788,6 +796,32 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {S0, Effects0} end. +activate_next_consumer(#?MODULE{consumers = Cons} = State0, Effects) + when map_size(Cons) == 1 -> + {State0, Effects}; +activate_next_consumer(#?MODULE{waiting_consumers = Waiting0} = State0, + Effects0) -> + case lists:filter(fun ({_, #consumer{status = Status}}) -> + Status == up + end, Waiting0) of + [{NextConsumerId, NextConsumer} | _] -> + Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), + #?MODULE{service_queue = ServiceQueue} = State0, + ServiceQueue1 = maybe_queue_consumer(NextConsumerId, + NextConsumer, + ServiceQueue), + State = State0#?MODULE{consumers = #{NextConsumerId => NextConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = Remaining}, + Effects = consumer_update_active_effects(State, NextConsumerId, + NextConsumer, true, + single_active, Effects0), + {State, Effects}; + [] -> + {State0, Effects0} + end. + + maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, #?MODULE{consumers = C0, service_queue = SQ0} = S0, @@ -1296,7 +1330,7 @@ uniq_queue_in(Key, Queue) -> end. update_consumer(ConsumerId, Meta, Spec, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State0) -> + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, @@ -1331,7 +1365,6 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), ServiceQueue0), - State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index ebe5f3328a..968ae07739 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -88,6 +88,8 @@ -type consumer() :: #consumer{}. +-type consumer_strategy() :: competing | single_active. + -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list @@ -104,7 +106,8 @@ max_length :: maybe(non_neg_integer()), max_bytes :: maybe(non_neg_integer()), %% whether single active consumer is on or not for this queue - consumer_strategy = default :: default | single_active, + consumer_strategy = competing :: consumer_strategy(), + %% the maximum number of unsuccessful delivery attempts permitted delivery_limit :: maybe(non_neg_integer()) }). @@ -114,7 +117,7 @@ messages = #{} :: #{msg_in_id() => indexed_msg()}, % defines the lowest message in id available in the messages map % that isn't a return - low_msg_num :: msg_in_id() | undefined, + low_msg_num :: maybe(msg_in_id()), % defines the next message in id to be added to the messages map next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index ceed092d0f..6cc167b050 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -27,8 +27,7 @@ all() -> %% replicate eunit like test resultion all_tests() -> [F || {F, _} <- ?MODULE:module_info(functions), - re:run(atom_to_list(F), "_test$") /= nomatch] - . + re:run(atom_to_list(F), "_test$") /= nomatch]. groups() -> [ @@ -588,76 +587,89 @@ single_active_consumer_test(_) -> % adding some consumers AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - meta(1), - make_checkout({CTag, self()}, - {once, 1, simple_prefetch}, - #{}), - State), - NewState + {NewState, _, _} = apply( + meta(1), + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState end, - State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + C3 = {<<"ctag3">>, self()}, + C4 = {<<"ctag4">>, self()}, % the first registered consumer is the active one, the others are waiting ?assertEqual(1, map_size(State1#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag1">>, self()}, State1#rabbit_fifo.consumers)), + ?assertMatch(#{C1 := _}, State1#rabbit_fifo.consumers), ?assertEqual(3, length(State1#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C3, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State1#rabbit_fifo.waiting_consumers)), % cancelling a waiting consumer {State2, _, Effects1} = apply(meta(2), - make_checkout({<<"ctag3">>, self()}, - cancel, #{}), State1), + make_checkout(C3, cancel, #{}), + State1), % the active consumer should still be in place ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag1">>, self()}, State2#rabbit_fifo.consumers)), + ?assertMatch(#{C1 := _}, State2#rabbit_fifo.consumers), % the cancelled consumer has been removed from waiting consumers ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State2#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State2#rabbit_fifo.waiting_consumers)), % there are some effects to unregister the consumer - ?assertEqual(1, length(Effects1)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects1), % cancelling the active consumer {State3, _, Effects2} = apply(meta(3), - make_checkout({<<"ctag1">>, self()}, - cancel, #{}), + make_checkout(C1, cancel, #{}), State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag2">>, self()}, State3#rabbit_fifo.consumers)), + ?assertMatch(#{C2 := _}, State3#rabbit_fifo.consumers), % the new active consumer is no longer in the waiting list ?assertEqual(1, length(State3#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#rabbit_fifo.waiting_consumers)), - % there are some effects to unregister the consumer and to update the new active one (metrics) - ?assertEqual(2, length(Effects2)), + ?assertNotEqual(false, lists:keyfind(C4, 1, + State3#rabbit_fifo.waiting_consumers)), + %% should have a cancel consumer handler mod_call effect and + %% an active new consumer effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), % cancelling the active consumer {State4, _, Effects3} = apply(meta(4), - make_checkout({<<"ctag2">>, self()}, - cancel, #{}), + make_checkout(C2, cancel, #{}), State3), % the last waiting consumer became the active one ?assertEqual(1, map_size(State4#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag4">>, self()}, State4#rabbit_fifo.consumers)), + ?assertMatch(#{C4 := _}, State4#rabbit_fifo.consumers), % the waiting consumer list is now empty ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), - % there are some effects to unregister the consumer and to update the new active one (metrics) - ?assertEqual(2, length(Effects3)), + % there are some effects to unregister the consumer and + % to update the new active one (metrics) + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects3), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects3), % cancelling the last consumer {State5, _, Effects4} = apply(meta(5), - make_checkout({<<"ctag4">>, self()}, - cancel, #{}), + make_checkout(C4, cancel, #{}), State4), % no active consumer anymore ?assertEqual(0, map_size(State5#rabbit_fifo.consumers)), % still nothing in the waiting list ?assertEqual(0, length(State5#rabbit_fifo.waiting_consumers)), % there is an effect to unregister the consumer + queue inactive effect - ?assertEqual(1 + 1, length(Effects4)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, _}, Effects4), ok. @@ -673,6 +685,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> Pid2 = spawn(DummyFunction), Pid3 = spawn(DummyFunction), + [C1, C2, C3, C4] = Consumers = + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}], % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> {NewState, _, _} = apply( @@ -681,27 +696,34 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> State), NewState end, - State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + State1 = lists:foldl(AddConsumer, State0, Consumers), % the channel of the active consumer goes down - {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1), + {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1), % fell back to another consumer ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), % there are still waiting consumers ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), % effects to unregister the consumer and % to update the new active one (metrics) are there - ?assertEqual(2, length(Effects)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects), % the channel of the active consumer and a waiting consumer goes down - {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2), + {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2), % fell back to another consumer ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), % no more waiting consumer ?assertEqual(0, length(State3#rabbit_fifo.waiting_consumers)), % effects to cancel both consumers of this channel + effect to update the new active one (metrics) - ?assertEqual(3, length(Effects2)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), % the last channel goes down {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), @@ -709,48 +731,107 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> ?assertEqual(0, map_size(State4#rabbit_fifo.consumers)), ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), % there is an effect to unregister the consumer + queue inactive effect - ?assertEqual(1 + 1, length(Effects3)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C4, Effects3), ok. -single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test(_) -> +single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), Meta = #{index => 1}, + Nodes = [n1, n2, node()], + ConsumerIds = [C1 = {_, DownPid}, C2, _C3] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], % adding some consumers - AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - Meta, - make_checkout({CTag, self()}, - {once, 1, simple_prefetch}, #{}), - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, - [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, + State1#rabbit_fifo.consumers), % simulate node goes down - {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), + {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1), - % all the waiting consumers should be suspected down - ?assertEqual(3, length(State2#rabbit_fifo.waiting_consumers)), - lists:foreach(fun({_, #consumer{status = Status}}) -> - ?assert(Status == suspected_down) - end, State2#rabbit_fifo.waiting_consumers), + %% assert a new consumer is in place and it is up + ?assertMatch([{C2, #consumer{status = up}}], + maps:to_list(State2#rabbit_fifo.consumers)), - % simulate node goes back up - {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2), + %% the disconnected consumer has been returned to waiting + ?assert(lists:any(fun ({C,_}) -> C =:= C1 end, + State2#rabbit_fifo.waiting_consumers)), + ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), + + % simulate node comes back up + {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2), - % all the waiting consumers should be un-suspected - ?assertEqual(3, length(State3#rabbit_fifo.waiting_consumers)), + %% the consumer is still active and the same as before + ?assertMatch([{C2, #consumer{status = up}}], + maps:to_list(State3#rabbit_fifo.consumers)), + % the waiting consumers should be un-suspected + ?assertEqual(2, length(State3#rabbit_fifo.waiting_consumers)), lists:foreach(fun({_, #consumer{status = Status}}) -> ?assert(Status /= suspected_down) end, State3#rabbit_fifo.waiting_consumers), + ok. +single_active_consumer_all_disconnected_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1, n2], + ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, State1#rabbit_fifo.consumers), + + % simulate node goes down + {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1), + %% assert the consumer fails over to the consumer on n2 + ?assertMatch(#{C2 := #consumer{status = up}}, State2#rabbit_fifo.consumers), + {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2), + %% assert these no active consumer after both nodes are maked as down + ?assertMatch([], maps:to_list(State3#rabbit_fifo.consumers)), + %% n2 comes back + {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3), + %% ensure n2 is the active consumer as this node as been registered + %% as up again + ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up, + credit = 1}}], + maps:to_list(State4#rabbit_fifo.consumers)), ok. single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> @@ -783,11 +864,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> ?assertEqual(2 * 3 + 1, length(Effects)). single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> + Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), + queue_resource => Resource, + release_cursor_interval => 0, + single_active_consumer_on => true}), DummyFunction = fun() -> ok end, Pid1 = spawn(DummyFunction), @@ -805,7 +886,8 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> NewState end, State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), Effects = rabbit_fifo:state_enter(eol, State1), % 1 effect for each consumer process (channel process) @@ -918,10 +1000,10 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), DummyFunction = fun() -> ok end, Pid1 = spawn(DummyFunction), @@ -938,15 +1020,17 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co NewState end, State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), - % only 1 effect to monitor the node - ?assertEqual(1, length(Effects2)), + % one monitor and one consumer status update (deactivated) + ?assertEqual(2, length(Effects2)), {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID - ?assertEqual(4, length(Effects3)). + ct:pal("Effects3 ~w", [Effects3]), + ?assertEqual(5, length(Effects3)). meta(Idx) -> #{index => Idx, term => 1}. diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index da72c030cd..c61d85fbff 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -271,21 +271,6 @@ scenario16(_Config) -> delivery_limit => 1}, Commands), ok. -fake_pid(_Config) -> - Pid = fake_external_pid(<<"mynode@banana">>), - ?assertNotEqual(node(Pid), node()), - ?assert(is_pid(Pid)), - ok. - -fake_external_pid(Node) when is_binary(Node) -> - ThisNodeSize = size(term_to_binary(node())) + 1, - Pid = spawn(fun () -> ok end), - %% drop the local node data from a local pid - <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid), - S = size(Node), - %% replace it with the incoming node binary - Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>, - binary_to_term(Final). snapshots(_Config) -> run_proper( @@ -352,7 +337,7 @@ log_gen(Size) -> pid_gen(Nodes) -> ?LET(Node, oneof(Nodes), - fake_external_pid(atom_to_binary(Node, utf8))). + test_util:fake_pid(atom_to_binary(Node, utf8))). down_gen(Pid) -> ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). diff --git a/test/test_util.erl b/test/test_util.erl new file mode 100644 index 0000000000..863c094603 --- /dev/null +++ b/test/test_util.erl @@ -0,0 +1,28 @@ +-module(test_util). + +-export([ + fake_pid/1 + ]). + + +fake_pid(Node) -> + NodeBin = rabbit_data_coercion:to_binary(Node), + ThisNodeSize = size(term_to_binary(node())) + 1, + Pid = spawn(fun () -> ok end), + %% drop the local node data from a local pid + <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid), + S = size(NodeBin), + %% replace it with the incoming node binary + Final = <<131,103, 100, S:16/unsigned, NodeBin/binary, LocalPidData/binary>>, + binary_to_term(Final). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +fake_pid_test(_Config) -> + Pid = fake_pid(<<"mynode@banana">>), + ?assertNotEqual(node(Pid), node()), + ?assert(is_pid(Pid)), + ok. + +-endif. |
