summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-03-18 16:30:15 +0000
committerkjnilsson <knilsson@pivotal.io>2019-03-18 16:30:15 +0000
commit568bbd358487195ee325e57c8b72ec7a5d7735ea (patch)
treef493bf4e35d89f2803979112505c857d9e963066 /test
parenta58bc89f73c9af27fda43adcb2639b1f474b5aa5 (diff)
downloadrabbitmq-server-git-568bbd358487195ee325e57c8b72ec7a5d7735ea.tar.gz
rabbit_fifo bugfixes
Bugfixes around noconnect consumer handling as well as refactoring to make these kinds of errors less likely. [#164691411]
Diffstat (limited to 'test')
-rw-r--r--test/rabbit_fifo_SUITE.erl74
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl2
2 files changed, 75 insertions, 1 deletions
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index cf14a68d9d..fccda9772a 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -283,6 +283,19 @@ duplicate_enqueue_test(_) ->
?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3),
ok.
+return_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Cid2 = {<<"cid2">>, self()},
+ {State0, _} = enq(1, 1, msg, test_init(test)),
+ {State1, _} = check_auto(Cid, 2, State0),
+ {State2, _} = check_auto(Cid2, 3, State1),
+ {State3, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid, [0]), State2),
+ ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0,
+ State3#rabbit_fifo.consumers),
+ ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1,
+ State3#rabbit_fifo.consumers),
+ ok.
+
return_non_existent_test(_) ->
Cid = {<<"cid">>, self()},
{State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
@@ -556,7 +569,7 @@ purge_with_checkout_test(_) ->
?assertEqual(1, maps:size(Checked)),
ok.
-down_returns_checked_out_in_order_test(_) ->
+down_noproc_returns_checked_out_in_order_test(_) ->
S0 = test_init(?FUNCTION_NAME),
%% enqueue 100
S1 = lists:foldl(fun (Num, FS0) ->
@@ -572,6 +585,30 @@ down_returns_checked_out_in_order_test(_) ->
{S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
Returns = lqueue:to_list(S#rabbit_fifo.returns),
?assertEqual(100, length(Returns)),
+ ?assertEqual(0, maps:size(S#rabbit_fifo.consumers)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
+down_noconnection_returns_checked_out_test(_) ->
+ S0 = test_init(?FUNCTION_NAME),
+ NumMsgs = 20,
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, NumMsgs)),
+ ?assertEqual(NumMsgs, maps:size(S1#rabbit_fifo.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
+ ?assertEqual(NumMsgs, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2),
+ Returns = lqueue:to_list(S#rabbit_fifo.returns),
+ ?assertEqual(NumMsgs, length(Returns)),
+ ?assertMatch(#consumer{checked_out = Ch}
+ when map_size(Ch) == 0,
+ maps:get(Cid, S#rabbit_fifo.consumers)),
%% validate returns are in order
?assertEqual(lists:sort(Returns), Returns),
ok.
@@ -736,6 +773,41 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
ok.
+single_active_returns_messages_on_noconnection_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1],
+ ConsumerIds = [{_, DownPid}] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+ {State2, _} = enq(4, 1, msg1, State1),
+ % simulate node goes down
+ {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2),
+ %% assert the consumer is up
+ ?assertMatch([_], lqueue:to_list(State3#rabbit_fifo.returns)),
+ ?assertMatch([{_, #consumer{checked_out = Checked}}]
+ when map_size(Checked) == 0,
+ State3#rabbit_fifo.waiting_consumers),
+
+ ok.
+
single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
State0 = init(#{name => ?FUNCTION_NAME,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 76b8d7e715..0f5fb8bf58 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -285,7 +285,9 @@ scenario17(_Config) ->
make_checkout(C2, cancel),
make_enqueue(E,2,<<"two">>),
{nodeup,rabbit@fake_node1},
+ %% this has no effect as was returned
make_settle(C1, [0]),
+ %% this should settle "one"
make_settle(C1, [1])
],
run_snapshot_test(#{name => ?FUNCTION_NAME,