summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-06-30 15:40:31 +0100
committerkjnilsson <knilsson@pivotal.io>2020-09-07 09:42:10 +0100
commitcb2b424fa947b0d885ccf7d21a20f9d240cccb1f (patch)
tree225b1b499ee1f757e5a0baff83142f109aa51059 /test
parentbd3827b0cf5f2199268a5b579f27483a9c581ab2 (diff)
downloadrabbitmq-server-git-cb2b424fa947b0d885ccf7d21a20f9d240cccb1f.tar.gz
rabbit_fifo: Fixes for move to lqueue
Also rename v0 state back to rabbit_fifo else it could never work with old state!
Diffstat (limited to 'test')
-rw-r--r--test/rabbit_fifo_v0_SUITE.erl181
1 files changed, 89 insertions, 92 deletions
diff --git a/test/rabbit_fifo_v0_SUITE.erl b/test/rabbit_fifo_v0_SUITE.erl
index 6b84911d7f..fcb84377de 100644
--- a/test/rabbit_fifo_v0_SUITE.erl
+++ b/test/rabbit_fifo_v0_SUITE.erl
@@ -128,12 +128,12 @@ credit_with_drained_test(_) ->
{State1, _, _} =
apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited},#{}),
State0),
- ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 1,
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 1,
delivery_count = 0}}},
State1),
{State, Result, _} =
apply(meta(3), rabbit_fifo_v0:make_credit(Cid, 0, 5, true), State1),
- ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 0,
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 5}}},
State),
?assertEqual({multi, [{send_credit_reply, 0},
@@ -154,7 +154,7 @@ credit_and_drain_test(_) ->
{State4, {multi, [{send_credit_reply, 0},
{send_drained, {?FUNCTION_NAME, 2}}]},
Effects} = apply(meta(4), rabbit_fifo_v0:make_credit(Cid, 4, 0, true), State3),
- ?assertMatch(#?RABBIT_FIFO{consumers = #{Cid := #consumer{credit = 0,
+ ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 4}}},
State4),
@@ -290,9 +290,9 @@ return_test(_) ->
{State2, _} = check_auto(Cid2, 3, State1),
{State3, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [0]), State2),
?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0,
- State3#?RABBIT_FIFO.consumers),
+ State3#?STATE.consumers),
?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1,
- State3#?RABBIT_FIFO.consumers),
+ State3#?STATE.consumers),
ok.
return_dequeue_delivery_limit_test(_) ->
@@ -350,7 +350,7 @@ return_checked_out_limit_test(_) ->
{State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _},
{aux, active}]} =
apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1),
- {#?RABBIT_FIFO{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} =
+ {#?STATE{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} =
apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId2]), State2),
?assertEqual(0, rabbit_fifo_index:size(RaIdxs)),
ok.
@@ -380,8 +380,8 @@ cancelled_checkout_out_test(_) ->
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should not return pending messages to queue
{State2, _, _} = apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(1, maps:size(State2#?RABBIT_FIFO.messages)),
- ?assertEqual(0, lqueue:len(State2#?RABBIT_FIFO.returns)),
+ ?assertEqual(1, maps:size(State2#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State2#?STATE.returns)),
{State3, {dequeue, empty}} =
apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State2),
@@ -410,14 +410,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) ->
{State0, Effects0} = enq(1, 1, second, test_init(test)),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0),
{State1, Effects1} = check_auto(Cid, 2, State0),
- #consumer{credit = 0} = maps:get(Cid, State1#?RABBIT_FIFO.consumers),
+ #consumer{credit = 0} = maps:get(Cid, State1#?STATE.consumers),
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
% monitor both enqueuer and consumer
% because we received a noconnection we now need to monitor the node
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
#consumer{credit = 1,
checked_out = Ch,
- status = suspected_down} = maps:get(Cid, State2a#?RABBIT_FIFO.consumers),
+ status = suspected_down} = maps:get(Cid, State2a#?STATE.consumers),
?assertEqual(#{}, Ch),
%% validate consumer has credit
{State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
@@ -426,7 +426,7 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) ->
% when the node comes up we need to retry the process monitors for the
% disconnected processes
{State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
- #consumer{status = up} = maps:get(Cid, State3#?RABBIT_FIFO.consumers),
+ #consumer{status = up} = maps:get(Cid, State3#?STATE.consumers),
% try to re-monitor the suspect processes
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
@@ -436,18 +436,18 @@ down_with_noconnection_returns_unack_test(_) ->
Pid = spawn(fun() -> ok end),
Cid = {<<"down_with_noconnect">>, Pid},
{State0, _} = enq(1, 1, second, test_init(test)),
- ?assertEqual(1, maps:size(State0#?RABBIT_FIFO.messages)),
- ?assertEqual(0, lqueue:len(State0#?RABBIT_FIFO.returns)),
+ ?assertEqual(1, maps:size(State0#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State0#?STATE.returns)),
{State1, {_, _}} = deq(2, Cid, unsettled, State0),
- ?assertEqual(0, maps:size(State1#?RABBIT_FIFO.messages)),
- ?assertEqual(0, lqueue:len(State1#?RABBIT_FIFO.returns)),
+ ?assertEqual(0, maps:size(State1#?STATE.messages)),
+ ?assertEqual(0, lqueue:len(State1#?STATE.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(0, maps:size(State2a#?RABBIT_FIFO.messages)),
- ?assertEqual(1, lqueue:len(State2a#?RABBIT_FIFO.returns)),
+ ?assertEqual(0, maps:size(State2a#?STATE.messages)),
+ ?assertEqual(1, lqueue:len(State2a#?STATE.returns)),
?assertMatch(#consumer{checked_out = Ch,
status = suspected_down}
when map_size(Ch) == 0,
- maps:get(Cid, State2a#?RABBIT_FIFO.consumers)),
+ maps:get(Cid, State2a#?STATE.consumers)),
ok.
down_with_noproc_enqueuer_is_cleaned_up_test(_) ->
@@ -457,7 +457,7 @@ down_with_noproc_enqueuer_is_cleaned_up_test(_) ->
?ASSERT_EFF({monitor, process, _}, Effects0),
{State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
% ensure there are no enqueuers
- ?assert(0 =:= maps:size(State1#?RABBIT_FIFO.enqueuers)),
+ ?assert(0 =:= maps:size(State1#?STATE.enqueuers)),
ok.
discarded_message_without_dead_letter_handler_is_removed_test(_) ->
@@ -536,7 +536,7 @@ pending_enqueue_is_enqueued_on_down_test(_) ->
duplicate_delivery_test(_) ->
{State0, _} = enq(1, 1, first, test_init(test)),
- {#?RABBIT_FIFO{ra_indexes = RaIdxs,
+ {#?STATE{ra_indexes = RaIdxs,
messages = Messages}, _} = enq(2, 1, first, State0),
?assertEqual(1, rabbit_fifo_index:size(RaIdxs)),
?assertEqual(1, maps:size(Messages)),
@@ -605,13 +605,13 @@ purge_with_checkout_test(_) ->
{State1, _} = enq(2, 1, <<"first">>, State0),
{State2, _} = enq(3, 2, <<"second">>, State1),
%% assert message bytes are non zero
- ?assert(State2#?RABBIT_FIFO.msg_bytes_checkout > 0),
- ?assert(State2#?RABBIT_FIFO.msg_bytes_enqueue > 0),
+ ?assert(State2#?STATE.msg_bytes_checkout > 0),
+ ?assert(State2#?STATE.msg_bytes_enqueue > 0),
{State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State2),
- ?assert(State2#?RABBIT_FIFO.msg_bytes_checkout > 0),
- ?assertEqual(0, State3#?RABBIT_FIFO.msg_bytes_enqueue),
- ?assertEqual(1, rabbit_fifo_index:size(State3#?RABBIT_FIFO.ra_indexes)),
- #consumer{checked_out = Checked} = maps:get(Cid, State3#?RABBIT_FIFO.consumers),
+ ?assert(State2#?STATE.msg_bytes_checkout > 0),
+ ?assertEqual(0, State3#?STATE.msg_bytes_enqueue),
+ ?assertEqual(1, rabbit_fifo_index:size(State3#?STATE.ra_indexes)),
+ #consumer{checked_out = Checked} = maps:get(Cid, State3#?STATE.consumers),
?assertEqual(1, maps:size(Checked)),
ok.
@@ -622,16 +622,16 @@ down_noproc_returns_checked_out_in_order_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, 100)),
- ?assertEqual(100, maps:size(S1#?RABBIT_FIFO.messages)),
+ ?assertEqual(100, maps:size(S1#?STATE.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
- #consumer{checked_out = Checked} = maps:get(Cid, S2#?RABBIT_FIFO.consumers),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers),
?assertEqual(100, maps:size(Checked)),
%% simulate down
{S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
- Returns = lqueue:to_list(S#?RABBIT_FIFO.returns),
+ Returns = lqueue:to_list(S#?STATE.returns),
?assertEqual(100, length(Returns)),
- ?assertEqual(0, maps:size(S#?RABBIT_FIFO.consumers)),
+ ?assertEqual(0, maps:size(S#?STATE.consumers)),
%% validate returns are in order
?assertEqual(lists:sort(Returns), Returns),
ok.
@@ -643,18 +643,18 @@ down_noconnection_returns_checked_out_test(_) ->
{FS, _} = enq(Num, Num, Num, FS0),
FS
end, S0, lists:seq(1, NumMsgs)),
- ?assertEqual(NumMsgs, maps:size(S1#?RABBIT_FIFO.messages)),
+ ?assertEqual(NumMsgs, maps:size(S1#?STATE.messages)),
Cid = {<<"cid">>, self()},
{S2, _} = check(Cid, 101, 1000, S1),
- #consumer{checked_out = Checked} = maps:get(Cid, S2#?RABBIT_FIFO.consumers),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers),
?assertEqual(NumMsgs, maps:size(Checked)),
%% simulate down
{S, _, _} = apply(meta(102), {down, self(), noconnection}, S2),
- Returns = lqueue:to_list(S#?RABBIT_FIFO.returns),
+ Returns = lqueue:to_list(S#?STATE.returns),
?assertEqual(NumMsgs, length(Returns)),
?assertMatch(#consumer{checked_out = Ch}
when map_size(Ch) == 0,
- maps:get(Cid, S#?RABBIT_FIFO.consumers)),
+ maps:get(Cid, S#?STATE.consumers)),
%% validate returns are in order
?assertEqual(lists:sort(Returns), Returns),
ok.
@@ -666,8 +666,8 @@ single_active_consumer_basic_get_test(_) ->
atom_to_binary(?FUNCTION_NAME, utf8)),
release_cursor_interval => 0,
single_active_consumer_on => true}),
- ?assertEqual(single_active, State0#?RABBIT_FIFO.cfg#cfg.consumer_strategy),
- ?assertEqual(0, map_size(State0#?RABBIT_FIFO.consumers)),
+ ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#?STATE.consumers)),
{State1, _} = enq(1, 1, first, State0),
{_State, {error, unsupported}} =
apply(meta(2), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}),
@@ -680,8 +680,8 @@ single_active_consumer_test(_) ->
atom_to_binary(?FUNCTION_NAME, utf8)),
release_cursor_interval => 0,
single_active_consumer_on => true}),
- ?assertEqual(single_active, State0#?RABBIT_FIFO.cfg#cfg.consumer_strategy),
- ?assertEqual(0, map_size(State0#?RABBIT_FIFO.consumers)),
+ ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy),
+ ?assertEqual(0, map_size(State0#?STATE.consumers)),
% adding some consumers
AddConsumer = fun(CTag, State) ->
@@ -701,24 +701,24 @@ single_active_consumer_test(_) ->
C4 = {<<"ctag4">>, self()},
% the first registered consumer is the active one, the others are waiting
- ?assertEqual(1, map_size(State1#?RABBIT_FIFO.consumers)),
- ?assertMatch(#{C1 := _}, State1#?RABBIT_FIFO.consumers),
- ?assertEqual(3, length(State1#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(1, map_size(State1#?STATE.consumers)),
+ ?assertMatch(#{C1 := _}, State1#?STATE.consumers),
+ ?assertEqual(3, length(State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?STATE.waiting_consumers)),
% cancelling a waiting consumer
{State2, _, Effects1} = apply(meta(2),
make_checkout(C3, cancel, #{}),
State1),
% the active consumer should still be in place
- ?assertEqual(1, map_size(State2#?RABBIT_FIFO.consumers)),
- ?assertMatch(#{C1 := _}, State2#?RABBIT_FIFO.consumers),
+ ?assertEqual(1, map_size(State2#?STATE.consumers)),
+ ?assertMatch(#{C1 := _}, State2#?STATE.consumers),
% the cancelled consumer has been removed from waiting consumers
- ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?RABBIT_FIFO.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?STATE.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?STATE.waiting_consumers)),
% there are some effects to unregister the consumer
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [_, C]}, C == C3, Effects1),
@@ -728,12 +728,12 @@ single_active_consumer_test(_) ->
make_checkout(C1, cancel, #{}),
State2),
% the second registered consumer is now the active one
- ?assertEqual(1, map_size(State3#?RABBIT_FIFO.consumers)),
- ?assertMatch(#{C2 := _}, State3#?RABBIT_FIFO.consumers),
+ ?assertEqual(1, map_size(State3#?STATE.consumers)),
+ ?assertMatch(#{C2 := _}, State3#?STATE.consumers),
% the new active consumer is no longer in the waiting list
- ?assertEqual(1, length(State3#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(1, length(State3#?STATE.waiting_consumers)),
?assertNotEqual(false, lists:keyfind(C4, 1,
- State3#?RABBIT_FIFO.waiting_consumers)),
+ State3#?STATE.waiting_consumers)),
%% should have a cancel consumer handler mod_call effect and
%% an active new consumer effect
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
@@ -746,10 +746,10 @@ single_active_consumer_test(_) ->
make_checkout(C2, cancel, #{}),
State3),
% the last waiting consumer became the active one
- ?assertEqual(1, map_size(State4#?RABBIT_FIFO.consumers)),
- ?assertMatch(#{C4 := _}, State4#?RABBIT_FIFO.consumers),
+ ?assertEqual(1, map_size(State4#?STATE.consumers)),
+ ?assertMatch(#{C4 := _}, State4#?STATE.consumers),
% the waiting consumer list is now empty
- ?assertEqual(0, length(State4#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(0, length(State4#?STATE.waiting_consumers)),
% there are some effects to unregister the consumer and
% to update the new active one (metrics)
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
@@ -762,9 +762,9 @@ single_active_consumer_test(_) ->
make_checkout(C4, cancel, #{}),
State4),
% no active consumer anymore
- ?assertEqual(0, map_size(State5#?RABBIT_FIFO.consumers)),
+ ?assertEqual(0, map_size(State5#?STATE.consumers)),
% still nothing in the waiting list
- ?assertEqual(0, length(State5#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(0, length(State5#?STATE.waiting_consumers)),
% there is an effect to unregister the consumer + queue inactive effect
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
cancel_consumer_handler, _}, Effects4),
@@ -799,9 +799,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
% the channel of the active consumer goes down
{State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1),
% fell back to another consumer
- ?assertEqual(1, map_size(State2#?RABBIT_FIFO.consumers)),
+ ?assertEqual(1, map_size(State2#?STATE.consumers)),
% there are still waiting consumers
- ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
% effects to unregister the consumer and
% to update the new active one (metrics) are there
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
@@ -812,9 +812,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
% the channel of the active consumer and a waiting consumer goes down
{State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2),
% fell back to another consumer
- ?assertEqual(1, map_size(State3#?RABBIT_FIFO.consumers)),
+ ?assertEqual(1, map_size(State3#?STATE.consumers)),
% no more waiting consumer
- ?assertEqual(0, length(State3#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(0, length(State3#?STATE.waiting_consumers)),
% effects to cancel both consumers of this channel + effect to update the new active one (metrics)
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [_, C]}, C == C2, Effects2),
@@ -826,8 +826,8 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
% the last channel goes down
{State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
% no more consumers
- ?assertEqual(0, map_size(State4#?RABBIT_FIFO.consumers)),
- ?assertEqual(0, length(State4#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(0, map_size(State4#?STATE.consumers)),
+ ?assertEqual(0, length(State4#?STATE.waiting_consumers)),
% there is an effect to unregister the consumer + queue inactive effect
?ASSERT_EFF({mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [_, C]}, C == C4, Effects3),
@@ -862,10 +862,10 @@ single_active_returns_messages_on_noconnection_test(_) ->
% simulate node goes down
{State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2),
%% assert the consumer is up
- ?assertMatch([_], lqueue:to_list(State3#?RABBIT_FIFO.returns)),
+ ?assertMatch([_], lqueue:to_list(State3#?STATE.returns)),
?assertMatch([{_, #consumer{checked_out = Checked}}]
when map_size(Checked) == 0,
- State3#?RABBIT_FIFO.waiting_consumers),
+ State3#?STATE.waiting_consumers),
ok.
@@ -896,7 +896,7 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
%% assert the consumer is up
?assertMatch(#{C1 := #consumer{status = up}},
- State1a#?RABBIT_FIFO.consumers),
+ State1a#?STATE.consumers),
{State1, _} = enq(10, 1, msg, State1a),
@@ -907,24 +907,24 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
?assertMatch([{C2, #consumer{status = up,
checked_out = Ch}}]
when map_size(Ch) == 1,
- maps:to_list(State2#?RABBIT_FIFO.consumers)),
+ maps:to_list(State2#?STATE.consumers)),
%% the disconnected consumer has been returned to waiting
?assert(lists:any(fun ({C,_}) -> C =:= C1 end,
- State2#?RABBIT_FIFO.waiting_consumers)),
- ?assertEqual(2, length(State2#?RABBIT_FIFO.waiting_consumers)),
+ State2#?STATE.waiting_consumers)),
+ ?assertEqual(2, length(State2#?STATE.waiting_consumers)),
% simulate node comes back up
{State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2),
%% the consumer is still active and the same as before
?assertMatch([{C2, #consumer{status = up}}],
- maps:to_list(State3#?RABBIT_FIFO.consumers)),
+ maps:to_list(State3#?STATE.consumers)),
% the waiting consumers should be un-suspected
- ?assertEqual(2, length(State3#?RABBIT_FIFO.waiting_consumers)),
+ ?assertEqual(2, length(State3#?STATE.waiting_consumers)),
lists:foreach(fun({_, #consumer{status = Status}}) ->
?assert(Status /= suspected_down)
- end, State3#?RABBIT_FIFO.waiting_consumers),
+ end, State3#?STATE.waiting_consumers),
ok.
single_active_consumer_all_disconnected_test(_) ->
@@ -953,22 +953,22 @@ single_active_consumer_all_disconnected_test(_) ->
end, State0, ConsumerIds),
%% assert the consumer is up
- ?assertMatch(#{C1 := #consumer{status = up}}, State1#?RABBIT_FIFO.consumers),
+ ?assertMatch(#{C1 := #consumer{status = up}}, State1#?STATE.consumers),
% simulate node goes down
{State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1),
%% assert the consumer fails over to the consumer on n2
- ?assertMatch(#{C2 := #consumer{status = up}}, State2#?RABBIT_FIFO.consumers),
+ ?assertMatch(#{C2 := #consumer{status = up}}, State2#?STATE.consumers),
{State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2),
%% assert these no active consumer after both nodes are maked as down
- ?assertMatch([], maps:to_list(State3#?RABBIT_FIFO.consumers)),
+ ?assertMatch([], maps:to_list(State3#?STATE.consumers)),
%% n2 comes back
{State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3),
%% ensure n2 is the active consumer as this node as been registered
%% as up again
?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up,
credit = 1}}],
- maps:to_list(State4#?RABBIT_FIFO.consumers)),
+ maps:to_list(State4#?STATE.consumers)),
ok.
single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
@@ -1050,11 +1050,11 @@ query_consumers_test(_) ->
NewState
end,
State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
- Consumers0 = State1#?RABBIT_FIFO.consumers,
+ Consumers0 = State1#?STATE.consumers,
Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
Consumers1 = maps:put({<<"ctag2">>, self()},
Consumer#consumer{status = suspected_down}, Consumers0),
- State2 = State1#?RABBIT_FIFO{consumers = Consumers1},
+ State2 = State1#?STATE{consumers = Consumers1},
?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State2)),
Consumers2 = rabbit_fifo_v0:query_consumers(State2),
@@ -1203,13 +1203,13 @@ single_active_cancelled_with_unacked_test(_) ->
%% C2 should be the active consumer
?assertMatch(#{C2 := #consumer{status = up,
checked_out = #{0 := _}}},
- State4#?RABBIT_FIFO.consumers),
+ State4#?STATE.consumers),
%% C1 should be a cancelled consumer
?assertMatch(#{C1 := #consumer{status = cancelled,
lifetime = once,
checked_out = #{0 := _}}},
- State4#?RABBIT_FIFO.consumers),
- ?assertMatch([], State4#?RABBIT_FIFO.waiting_consumers),
+ State4#?STATE.consumers),
+ ?assertMatch([], State4#?STATE.waiting_consumers),
%% Ack both messages
{State5, _Effects5} = settle(C1, 1, 0, State4),
@@ -1218,11 +1218,11 @@ single_active_cancelled_with_unacked_test(_) ->
%% C2 should remain
?assertMatch(#{C2 := #consumer{status = up}},
- State6#?RABBIT_FIFO.consumers),
+ State6#?STATE.consumers),
%% C1 should be gone
?assertNotMatch(#{C1 := _},
- State6#?RABBIT_FIFO.consumers),
- ?assertMatch([], State6#?RABBIT_FIFO.waiting_consumers),
+ State6#?STATE.consumers),
+ ?assertMatch([], State6#?STATE.waiting_consumers),
ok.
single_active_with_credited_test(_) ->
@@ -1253,9 +1253,9 @@ single_active_with_credited_test(_) ->
{State3, _} = apply(meta(4), C2Cred, State2),
%% both consumers should have credit
?assertMatch(#{C1 := #consumer{credit = 5}},
- State3#?RABBIT_FIFO.consumers),
+ State3#?STATE.consumers),
?assertMatch([{C2, #consumer{credit = 4}}],
- State3#?RABBIT_FIFO.waiting_consumers),
+ State3#?STATE.waiting_consumers),
ok.
purge_nodes_test(_) ->
@@ -1292,11 +1292,8 @@ purge_nodes_test(_) ->
State4),
%% assert there are no enqueuers nor consumers
- ?assertMatch(#?RABBIT_FIFO{enqueuers = Enqs} when map_size(Enqs) == 1,
- State),
-
- ?assertMatch(#?RABBIT_FIFO{consumers = Cons} when map_size(Cons) == 0,
- State),
+ ?assertMatch(#?STATE{enqueuers = Enqs} when map_size(Enqs) == 1, State),
+ ?assertMatch(#?STATE{consumers = Cons} when map_size(Cons) == 0, State),
?assertMatch(
[{mod_call, rabbit_quorum_queue, handle_tick,
[#resource{}, _Metrics,