summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-08 17:16:33 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-08 17:16:33 +0100
commit1613055c33a224ccd2101c8cf53466756e45bf43 (patch)
tree5fc010033adb53d115f8f453997d43831422ba62 /src
parent377661dbd1d608ec09ad81d5873944ab551f50d0 (diff)
downloadrabbitmq-server-git-1613055c33a224ccd2101c8cf53466756e45bf43.tar.gz
Add unit test for single active consumer in quorum queues
References #1799
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl75
1 files changed, 67 insertions, 8 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 33e843c7e9..d5b13a26e6 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -753,7 +753,7 @@ cancel_consumer(ConsumerId,
lists:keytake(ConsumerId, 1, WaitingConsumers0),
S = return_all(State0, Checked0),
Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
- {Effects, #state{waiting_consumers = WaitingConsumers1}}
+ {Effects, State0#state{waiting_consumers = WaitingConsumers1}}
end.
cancel_consumer0(ConsumerId,
@@ -1571,7 +1571,6 @@ cancelled_checkout_out_test() ->
{State3, {dequeue, {0, {_, first}}}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
- ?debugFmt("State3 ~p", [State3]),
{_State, {dequeue, {_, {_, second}}}, _} =
apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
ok.
@@ -1769,8 +1768,7 @@ return_prefix_msg_count_test() ->
],
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
- {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
- ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]),
+ {_State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
ok.
@@ -1842,7 +1840,6 @@ run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
[begin
- ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
run_snapshot_test0(Name, C)
end || C <- prefixes(Commands, 1, [])].
@@ -1855,11 +1852,8 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
- ?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- ?debugFmt("Name ~p~nS~p~nState~p~nn",
- [Name, S, State]),
?assertEqual(State, S)
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
@@ -1981,6 +1975,71 @@ down_returns_checked_out_in_order_test() ->
?assertEqual(lists:sort(Returns), Returns),
ok.
+single_active_consumer_test() ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ shadow_copy_interval => 0,
+ single_active_consumer_on => true}),
+ ?assertEqual(single_active, State0#state.consumer_strategy),
+ ?assertEqual(0, map_size(State0#state.consumers)),
+
+ % adding some consumers
+ AddConsumer = fun(CTag, State) ->
+ {NewState, _, _} = apply(
+ #{},
+ #checkout{spec = {once, 1, simple_prefetch},
+ meta = #{},
+ consumer_id = {CTag, self()}},
+ State),
+ NewState
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+
+ % the first registered consumer is the active one, the others are waiting
+ ?assertEqual(1, map_size(State1#state.consumers)),
+ ?assert(maps:is_key({<<"ctag1">>, self()}, State1#state.consumers)),
+ ?assertEqual(3, length(State1#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
+
+ % cancelling a waiting consumer
+ {State2, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
+ % the active consumer should still be in place
+ ?assertEqual(1, map_size(State2#state.consumers)),
+ ?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
+ % the cancelled consumer has been removed from waiting consumers
+ ?assertEqual(2, length(State2#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#state.waiting_consumers)),
+
+ % cancelling the active consumer
+ {State3, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ % the second registered consumer is now the active one
+ ?assertEqual(1, map_size(State3#state.consumers)),
+ ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
+ % the new active consumer is no longer in the waiting list
+ ?assertEqual(1, length(State3#state.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#state.waiting_consumers)),
+
+ % cancelling the active consumer
+ {State4, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
+ % the last waiting consumer became the active one
+ ?assertEqual(1, map_size(State4#state.consumers)),
+ ?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
+ % the waiting consumer list is now empty
+ ?assertEqual(0, length(State4#state.waiting_consumers)),
+
+ % cancelling the last consumer
+ {State5, _, _} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
+ % no active consumer anymore
+ ?assertEqual(0, map_size(State5#state.consumers)),
+ % still nothing in the waiting list
+ ?assertEqual(0, length(State5#state.waiting_consumers)),
+
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.