summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-27 11:31:09 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-27 11:31:09 +0000
commit3b0adfda40edf59496ff9f6d994a11c27971a3f5 (patch)
tree40c39f89a760c55d7cf8aee51a8ea0b279b4e0fe /test
parentb9873465666d143bd1fc70a828d417ce48b5b1c3 (diff)
downloadrabbitmq-server-git-3b0adfda40edf59496ff9f6d994a11c27971a3f5.tar.gz
rabbit_fifo: change single active consumer on noconnection
To ensure availability and progress when a node gets disconnected. [#164135123]
Diffstat (limited to 'test')
-rw-r--r--test/rabbit_fifo_SUITE.erl244
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl17
-rw-r--r--test/test_util.erl28
3 files changed, 193 insertions, 96 deletions
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index ceed092d0f..6cc167b050 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -27,8 +27,7 @@ all() ->
%% replicate eunit like test resultion
all_tests() ->
[F || {F, _} <- ?MODULE:module_info(functions),
- re:run(atom_to_list(F), "_test$") /= nomatch]
- .
+ re:run(atom_to_list(F), "_test$") /= nomatch].
groups() ->
[
@@ -588,76 +587,89 @@ single_active_consumer_test(_) ->
% adding some consumers
AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- meta(1),
- make_checkout({CTag, self()},
- {once, 1, simple_prefetch},
- #{}),
- State),
- NewState
+ {NewState, _, _} = apply(
+ meta(1),
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch},
+ #{}),
+ State),
+ NewState
end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ State1 = lists:foldl(AddConsumer, State0,
+ [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ C3 = {<<"ctag3">>, self()},
+ C4 = {<<"ctag4">>, self()},
% the first registered consumer is the active one, the others are waiting
?assertEqual(1, map_size(State1#rabbit_fifo.consumers)),
- ?assert(maps:is_key({<<"ctag1">>, self()}, State1#rabbit_fifo.consumers)),
+ ?assertMatch(#{C1 := _}, State1#rabbit_fifo.consumers),
?assertEqual(3, length(State1#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State1#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C3, 1, State1#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State1#rabbit_fifo.waiting_consumers)),
% cancelling a waiting consumer
{State2, _, Effects1} = apply(meta(2),
- make_checkout({<<"ctag3">>, self()},
- cancel, #{}), State1),
+ make_checkout(C3, cancel, #{}),
+ State1),
% the active consumer should still be in place
?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
- ?assert(maps:is_key({<<"ctag1">>, self()}, State2#rabbit_fifo.consumers)),
+ ?assertMatch(#{C1 := _}, State2#rabbit_fifo.consumers),
% the cancelled consumer has been removed from waiting consumers
?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State2#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State2#rabbit_fifo.waiting_consumers)),
% there are some effects to unregister the consumer
- ?assertEqual(1, length(Effects1)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C3, Effects1),
% cancelling the active consumer
{State3, _, Effects2} = apply(meta(3),
- make_checkout({<<"ctag1">>, self()},
- cancel, #{}),
+ make_checkout(C1, cancel, #{}),
State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#rabbit_fifo.consumers)),
- ?assert(maps:is_key({<<"ctag2">>, self()}, State3#rabbit_fifo.consumers)),
+ ?assertMatch(#{C2 := _}, State3#rabbit_fifo.consumers),
% the new active consumer is no longer in the waiting list
?assertEqual(1, length(State3#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#rabbit_fifo.waiting_consumers)),
- % there are some effects to unregister the consumer and to update the new active one (metrics)
- ?assertEqual(2, length(Effects2)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1,
+ State3#rabbit_fifo.waiting_consumers)),
+ %% should have a cancel consumer handler mod_call effect and
+ %% an active new consumer effect
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C1, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects2),
% cancelling the active consumer
{State4, _, Effects3} = apply(meta(4),
- make_checkout({<<"ctag2">>, self()},
- cancel, #{}),
+ make_checkout(C2, cancel, #{}),
State3),
% the last waiting consumer became the active one
?assertEqual(1, map_size(State4#rabbit_fifo.consumers)),
- ?assert(maps:is_key({<<"ctag4">>, self()}, State4#rabbit_fifo.consumers)),
+ ?assertMatch(#{C4 := _}, State4#rabbit_fifo.consumers),
% the waiting consumer list is now empty
?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)),
- % there are some effects to unregister the consumer and to update the new active one (metrics)
- ?assertEqual(2, length(Effects3)),
+ % there are some effects to unregister the consumer and
+ % to update the new active one (metrics)
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C2, Effects3),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects3),
% cancelling the last consumer
{State5, _, Effects4} = apply(meta(5),
- make_checkout({<<"ctag4">>, self()},
- cancel, #{}),
+ make_checkout(C4, cancel, #{}),
State4),
% no active consumer anymore
?assertEqual(0, map_size(State5#rabbit_fifo.consumers)),
% still nothing in the waiting list
?assertEqual(0, length(State5#rabbit_fifo.waiting_consumers)),
% there is an effect to unregister the consumer + queue inactive effect
- ?assertEqual(1 + 1, length(Effects4)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, _}, Effects4),
ok.
@@ -673,6 +685,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ [C1, C2, C3, C4] = Consumers =
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}],
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
@@ -681,27 +696,34 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
State),
NewState
end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+ State1 = lists:foldl(AddConsumer, State0, Consumers),
% the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1),
+ {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1),
% fell back to another consumer
?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
% there are still waiting consumers
?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)),
% effects to unregister the consumer and
% to update the new active one (metrics) are there
- ?assertEqual(2, length(Effects)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C1, Effects),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects),
% the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2),
+ {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2),
% fell back to another consumer
?assertEqual(1, map_size(State3#rabbit_fifo.consumers)),
% no more waiting consumer
?assertEqual(0, length(State3#rabbit_fifo.waiting_consumers)),
% effects to cancel both consumers of this channel + effect to update the new active one (metrics)
- ?assertEqual(3, length(Effects2)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C2, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C3, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects2),
% the last channel goes down
{State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
@@ -709,48 +731,107 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
?assertEqual(0, map_size(State4#rabbit_fifo.consumers)),
?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)),
% there is an effect to unregister the consumer + queue inactive effect
- ?assertEqual(1 + 1, length(Effects3)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C4, Effects3),
ok.
-single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test(_) ->
+single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
Meta = #{index => 1},
+ Nodes = [n1, n2, node()],
+ ConsumerIds = [C1 = {_, DownPid}, C2, _C3] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
% adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- Meta,
- make_checkout({CTag, self()},
- {once, 1, simple_prefetch}, #{}),
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+
+ %% assert the consumer is up
+ ?assertMatch(#{C1 := #consumer{status = up}},
+ State1#rabbit_fifo.consumers),
% simulate node goes down
- {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1),
+ {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1),
- % all the waiting consumers should be suspected down
- ?assertEqual(3, length(State2#rabbit_fifo.waiting_consumers)),
- lists:foreach(fun({_, #consumer{status = Status}}) ->
- ?assert(Status == suspected_down)
- end, State2#rabbit_fifo.waiting_consumers),
+ %% assert a new consumer is in place and it is up
+ ?assertMatch([{C2, #consumer{status = up}}],
+ maps:to_list(State2#rabbit_fifo.consumers)),
- % simulate node goes back up
- {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2),
+ %% the disconnected consumer has been returned to waiting
+ ?assert(lists:any(fun ({C,_}) -> C =:= C1 end,
+ State2#rabbit_fifo.waiting_consumers)),
+ ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)),
+
+ % simulate node comes back up
+ {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2),
- % all the waiting consumers should be un-suspected
- ?assertEqual(3, length(State3#rabbit_fifo.waiting_consumers)),
+ %% the consumer is still active and the same as before
+ ?assertMatch([{C2, #consumer{status = up}}],
+ maps:to_list(State3#rabbit_fifo.consumers)),
+ % the waiting consumers should be un-suspected
+ ?assertEqual(2, length(State3#rabbit_fifo.waiting_consumers)),
lists:foreach(fun({_, #consumer{status = Status}}) ->
?assert(Status /= suspected_down)
end, State3#rabbit_fifo.waiting_consumers),
+ ok.
+single_active_consumer_all_disconnected_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1, n2],
+ ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+
+ %% assert the consumer is up
+ ?assertMatch(#{C1 := #consumer{status = up}}, State1#rabbit_fifo.consumers),
+
+ % simulate node goes down
+ {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1),
+ %% assert the consumer fails over to the consumer on n2
+ ?assertMatch(#{C2 := #consumer{status = up}}, State2#rabbit_fifo.consumers),
+ {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2),
+ %% assert these no active consumer after both nodes are maked as down
+ ?assertMatch([], maps:to_list(State3#rabbit_fifo.consumers)),
+ %% n2 comes back
+ {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3),
+ %% ensure n2 is the active consumer as this node as been registered
+ %% as up again
+ ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up,
+ credit = 1}}],
+ maps:to_list(State4#rabbit_fifo.consumers)),
ok.
single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
@@ -783,11 +864,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
?assertEqual(2 * 3 + 1, length(Effects)).
single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
+ Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
+ queue_resource => Resource,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
Pid1 = spawn(DummyFunction),
@@ -805,7 +886,8 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
NewState
end,
State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
Effects = rabbit_fifo:state_enter(eol, State1),
% 1 effect for each consumer process (channel process)
@@ -918,10 +1000,10 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
Pid1 = spawn(DummyFunction),
@@ -938,15 +1020,17 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
NewState
end,
State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
{State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
- % only 1 effect to monitor the node
- ?assertEqual(1, length(Effects2)),
+ % one monitor and one consumer status update (deactivated)
+ ?assertEqual(2, length(Effects2)),
{_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
- ?assertEqual(4, length(Effects3)).
+ ct:pal("Effects3 ~w", [Effects3]),
+ ?assertEqual(5, length(Effects3)).
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index da72c030cd..c61d85fbff 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -271,21 +271,6 @@ scenario16(_Config) ->
delivery_limit => 1}, Commands),
ok.
-fake_pid(_Config) ->
- Pid = fake_external_pid(<<"mynode@banana">>),
- ?assertNotEqual(node(Pid), node()),
- ?assert(is_pid(Pid)),
- ok.
-
-fake_external_pid(Node) when is_binary(Node) ->
- ThisNodeSize = size(term_to_binary(node())) + 1,
- Pid = spawn(fun () -> ok end),
- %% drop the local node data from a local pid
- <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid),
- S = size(Node),
- %% replace it with the incoming node binary
- Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>,
- binary_to_term(Final).
snapshots(_Config) ->
run_proper(
@@ -352,7 +337,7 @@ log_gen(Size) ->
pid_gen(Nodes) ->
?LET(Node, oneof(Nodes),
- fake_external_pid(atom_to_binary(Node, utf8))).
+ test_util:fake_pid(atom_to_binary(Node, utf8))).
down_gen(Pid) ->
?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
diff --git a/test/test_util.erl b/test/test_util.erl
new file mode 100644
index 0000000000..863c094603
--- /dev/null
+++ b/test/test_util.erl
@@ -0,0 +1,28 @@
+-module(test_util).
+
+-export([
+ fake_pid/1
+ ]).
+
+
+fake_pid(Node) ->
+ NodeBin = rabbit_data_coercion:to_binary(Node),
+ ThisNodeSize = size(term_to_binary(node())) + 1,
+ Pid = spawn(fun () -> ok end),
+ %% drop the local node data from a local pid
+ <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid),
+ S = size(NodeBin),
+ %% replace it with the incoming node binary
+ Final = <<131,103, 100, S:16/unsigned, NodeBin/binary, LocalPidData/binary>>,
+ binary_to_term(Final).
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+fake_pid_test(_Config) ->
+ Pid = fake_pid(<<"mynode@banana">>),
+ ?assertNotEqual(node(Pid), node()),
+ ?assert(is_pid(Pid)),
+ ok.
+
+-endif.