diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-27 11:31:09 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-27 11:31:09 +0000 |
| commit | 3b0adfda40edf59496ff9f6d994a11c27971a3f5 (patch) | |
| tree | 40c39f89a760c55d7cf8aee51a8ea0b279b4e0fe /test | |
| parent | b9873465666d143bd1fc70a828d417ce48b5b1c3 (diff) | |
| download | rabbitmq-server-git-3b0adfda40edf59496ff9f6d994a11c27971a3f5.tar.gz | |
rabbit_fifo: change single active consumer on noconnection
To ensure availability and progress when a node gets disconnected.
[#164135123]
Diffstat (limited to 'test')
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 244 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 17 | ||||
| -rw-r--r-- | test/test_util.erl | 28 |
3 files changed, 193 insertions, 96 deletions
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index ceed092d0f..6cc167b050 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -27,8 +27,7 @@ all() -> %% replicate eunit like test resultion all_tests() -> [F || {F, _} <- ?MODULE:module_info(functions), - re:run(atom_to_list(F), "_test$") /= nomatch] - . + re:run(atom_to_list(F), "_test$") /= nomatch]. groups() -> [ @@ -588,76 +587,89 @@ single_active_consumer_test(_) -> % adding some consumers AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - meta(1), - make_checkout({CTag, self()}, - {once, 1, simple_prefetch}, - #{}), - State), - NewState + {NewState, _, _} = apply( + meta(1), + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState end, - State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + C3 = {<<"ctag3">>, self()}, + C4 = {<<"ctag4">>, self()}, % the first registered consumer is the active one, the others are waiting ?assertEqual(1, map_size(State1#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag1">>, self()}, State1#rabbit_fifo.consumers)), + ?assertMatch(#{C1 := _}, State1#rabbit_fifo.consumers), ?assertEqual(3, length(State1#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C3, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State1#rabbit_fifo.waiting_consumers)), % cancelling a waiting consumer {State2, _, Effects1} = apply(meta(2), - make_checkout({<<"ctag3">>, self()}, - cancel, #{}), State1), + make_checkout(C3, cancel, #{}), + State1), % the active consumer should still be in place ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag1">>, self()}, State2#rabbit_fifo.consumers)), + ?assertMatch(#{C1 := _}, State2#rabbit_fifo.consumers), % the cancelled consumer has been removed from waiting consumers ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State2#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State2#rabbit_fifo.waiting_consumers)), % there are some effects to unregister the consumer - ?assertEqual(1, length(Effects1)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects1), % cancelling the active consumer {State3, _, Effects2} = apply(meta(3), - make_checkout({<<"ctag1">>, self()}, - cancel, #{}), + make_checkout(C1, cancel, #{}), State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag2">>, self()}, State3#rabbit_fifo.consumers)), + ?assertMatch(#{C2 := _}, State3#rabbit_fifo.consumers), % the new active consumer is no longer in the waiting list ?assertEqual(1, length(State3#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#rabbit_fifo.waiting_consumers)), - % there are some effects to unregister the consumer and to update the new active one (metrics) - ?assertEqual(2, length(Effects2)), + ?assertNotEqual(false, lists:keyfind(C4, 1, + State3#rabbit_fifo.waiting_consumers)), + %% should have a cancel consumer handler mod_call effect and + %% an active new consumer effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), % cancelling the active consumer {State4, _, Effects3} = apply(meta(4), - make_checkout({<<"ctag2">>, self()}, - cancel, #{}), + make_checkout(C2, cancel, #{}), State3), % the last waiting consumer became the active one ?assertEqual(1, map_size(State4#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag4">>, self()}, State4#rabbit_fifo.consumers)), + ?assertMatch(#{C4 := _}, State4#rabbit_fifo.consumers), % the waiting consumer list is now empty ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), - % there are some effects to unregister the consumer and to update the new active one (metrics) - ?assertEqual(2, length(Effects3)), + % there are some effects to unregister the consumer and + % to update the new active one (metrics) + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects3), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects3), % cancelling the last consumer {State5, _, Effects4} = apply(meta(5), - make_checkout({<<"ctag4">>, self()}, - cancel, #{}), + make_checkout(C4, cancel, #{}), State4), % no active consumer anymore ?assertEqual(0, map_size(State5#rabbit_fifo.consumers)), % still nothing in the waiting list ?assertEqual(0, length(State5#rabbit_fifo.waiting_consumers)), % there is an effect to unregister the consumer + queue inactive effect - ?assertEqual(1 + 1, length(Effects4)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, _}, Effects4), ok. @@ -673,6 +685,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> Pid2 = spawn(DummyFunction), Pid3 = spawn(DummyFunction), + [C1, C2, C3, C4] = Consumers = + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}], % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> {NewState, _, _} = apply( @@ -681,27 +696,34 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> State), NewState end, - State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + State1 = lists:foldl(AddConsumer, State0, Consumers), % the channel of the active consumer goes down - {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1), + {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1), % fell back to another consumer ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), % there are still waiting consumers ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), % effects to unregister the consumer and % to update the new active one (metrics) are there - ?assertEqual(2, length(Effects)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects), % the channel of the active consumer and a waiting consumer goes down - {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2), + {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2), % fell back to another consumer ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), % no more waiting consumer ?assertEqual(0, length(State3#rabbit_fifo.waiting_consumers)), % effects to cancel both consumers of this channel + effect to update the new active one (metrics) - ?assertEqual(3, length(Effects2)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), % the last channel goes down {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), @@ -709,48 +731,107 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> ?assertEqual(0, map_size(State4#rabbit_fifo.consumers)), ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), % there is an effect to unregister the consumer + queue inactive effect - ?assertEqual(1 + 1, length(Effects3)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C4, Effects3), ok. -single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test(_) -> +single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), Meta = #{index => 1}, + Nodes = [n1, n2, node()], + ConsumerIds = [C1 = {_, DownPid}, C2, _C3] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], % adding some consumers - AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - Meta, - make_checkout({CTag, self()}, - {once, 1, simple_prefetch}, #{}), - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, - [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, + State1#rabbit_fifo.consumers), % simulate node goes down - {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), + {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1), - % all the waiting consumers should be suspected down - ?assertEqual(3, length(State2#rabbit_fifo.waiting_consumers)), - lists:foreach(fun({_, #consumer{status = Status}}) -> - ?assert(Status == suspected_down) - end, State2#rabbit_fifo.waiting_consumers), + %% assert a new consumer is in place and it is up + ?assertMatch([{C2, #consumer{status = up}}], + maps:to_list(State2#rabbit_fifo.consumers)), - % simulate node goes back up - {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2), + %% the disconnected consumer has been returned to waiting + ?assert(lists:any(fun ({C,_}) -> C =:= C1 end, + State2#rabbit_fifo.waiting_consumers)), + ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), + + % simulate node comes back up + {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2), - % all the waiting consumers should be un-suspected - ?assertEqual(3, length(State3#rabbit_fifo.waiting_consumers)), + %% the consumer is still active and the same as before + ?assertMatch([{C2, #consumer{status = up}}], + maps:to_list(State3#rabbit_fifo.consumers)), + % the waiting consumers should be un-suspected + ?assertEqual(2, length(State3#rabbit_fifo.waiting_consumers)), lists:foreach(fun({_, #consumer{status = Status}}) -> ?assert(Status /= suspected_down) end, State3#rabbit_fifo.waiting_consumers), + ok. +single_active_consumer_all_disconnected_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1, n2], + ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, State1#rabbit_fifo.consumers), + + % simulate node goes down + {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1), + %% assert the consumer fails over to the consumer on n2 + ?assertMatch(#{C2 := #consumer{status = up}}, State2#rabbit_fifo.consumers), + {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2), + %% assert these no active consumer after both nodes are maked as down + ?assertMatch([], maps:to_list(State3#rabbit_fifo.consumers)), + %% n2 comes back + {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3), + %% ensure n2 is the active consumer as this node as been registered + %% as up again + ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up, + credit = 1}}], + maps:to_list(State4#rabbit_fifo.consumers)), ok. single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> @@ -783,11 +864,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> ?assertEqual(2 * 3 + 1, length(Effects)). single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> + Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), + queue_resource => Resource, + release_cursor_interval => 0, + single_active_consumer_on => true}), DummyFunction = fun() -> ok end, Pid1 = spawn(DummyFunction), @@ -805,7 +886,8 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> NewState end, State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), Effects = rabbit_fifo:state_enter(eol, State1), % 1 effect for each consumer process (channel process) @@ -918,10 +1000,10 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), DummyFunction = fun() -> ok end, Pid1 = spawn(DummyFunction), @@ -938,15 +1020,17 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co NewState end, State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), - % only 1 effect to monitor the node - ?assertEqual(1, length(Effects2)), + % one monitor and one consumer status update (deactivated) + ?assertEqual(2, length(Effects2)), {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID - ?assertEqual(4, length(Effects3)). + ct:pal("Effects3 ~w", [Effects3]), + ?assertEqual(5, length(Effects3)). meta(Idx) -> #{index => Idx, term => 1}. diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index da72c030cd..c61d85fbff 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -271,21 +271,6 @@ scenario16(_Config) -> delivery_limit => 1}, Commands), ok. -fake_pid(_Config) -> - Pid = fake_external_pid(<<"mynode@banana">>), - ?assertNotEqual(node(Pid), node()), - ?assert(is_pid(Pid)), - ok. - -fake_external_pid(Node) when is_binary(Node) -> - ThisNodeSize = size(term_to_binary(node())) + 1, - Pid = spawn(fun () -> ok end), - %% drop the local node data from a local pid - <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid), - S = size(Node), - %% replace it with the incoming node binary - Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>, - binary_to_term(Final). snapshots(_Config) -> run_proper( @@ -352,7 +337,7 @@ log_gen(Size) -> pid_gen(Nodes) -> ?LET(Node, oneof(Nodes), - fake_external_pid(atom_to_binary(Node, utf8))). + test_util:fake_pid(atom_to_binary(Node, utf8))). down_gen(Pid) -> ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). diff --git a/test/test_util.erl b/test/test_util.erl new file mode 100644 index 0000000000..863c094603 --- /dev/null +++ b/test/test_util.erl @@ -0,0 +1,28 @@ +-module(test_util). + +-export([ + fake_pid/1 + ]). + + +fake_pid(Node) -> + NodeBin = rabbit_data_coercion:to_binary(Node), + ThisNodeSize = size(term_to_binary(node())) + 1, + Pid = spawn(fun () -> ok end), + %% drop the local node data from a local pid + <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid), + S = size(NodeBin), + %% replace it with the incoming node binary + Final = <<131,103, 100, S:16/unsigned, NodeBin/binary, LocalPidData/binary>>, + binary_to_term(Final). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +fake_pid_test(_Config) -> + Pid = fake_pid(<<"mynode@banana">>), + ?assertNotEqual(node(Pid), node()), + ?assert(is_pid(Pid)), + ok. + +-endif. |
