diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-03-18 17:57:25 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-03-18 17:57:25 +0000 |
| commit | 7d4fcc644718f749f86b5207dac435d2cccb5664 (patch) | |
| tree | 10ce5076f1cb79a9fec6d3a8219de5f581a51fe0 | |
| parent | 568bbd358487195ee325e57c8b72ec7a5d7735ea (diff) | |
| download | rabbitmq-server-git-7d4fcc644718f749f86b5207dac435d2cccb5664.tar.gz | |
rabbit_fifo: Ensure checkout is performed
After a down command is processed
| -rw-r--r-- | src/rabbit_fifo.erl | 20 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 51 |
2 files changed, 42 insertions, 29 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index c09c8a4f22..b966dca82e 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -308,7 +308,7 @@ apply(#{index := RaftIdx}, #purge{}, %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; -apply(_, {down, Pid, noconnection}, +apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0, @@ -346,8 +346,8 @@ apply(_, {down, Pid, noconnection}, (_, E) -> E end, Enqs0), Effects = [{monitor, node, Node} | Effects1], - {State#?MODULE{enqueuers = Enqs}, ok, Effects}; -apply(_, {down, Pid, noconnection}, + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); +apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> %% A node has been disconnected. This doesn't necessarily mean that @@ -383,14 +383,14 @@ apply(_, {down, Pid, noconnection}, % Monitor the node so that we can "unsuspect" these processes when the node % comes back, then re-issue all monitors and discover the final fate of % these processes - Effects2 = case maps:size(State#?MODULE.consumers) of - 0 -> - [{aux, inactive}, {monitor, node, Node}]; - _ -> - [{monitor, node, Node}] - end ++ Effects1, + Effects = case maps:size(State#?MODULE.consumers) of + 0 -> + [{aux, inactive}, {monitor, node, Node}]; + _ -> + [{monitor, node, Node}] + end ++ Effects1, %% TODO: should we run a checkout here? - {State#?MODULE{enqueuers = Enqs}, ok, Effects2}; + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index fccda9772a..6582104708 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -395,14 +395,18 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) -> % monitor both enqueuer and consumer % because we received a noconnection we now need to monitor the node {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), - #consumer{credit = 1} = maps:get(Cid, State2a#rabbit_fifo.consumers), + #consumer{credit = 1, + checked_out = Ch, + status = suspected_down} = maps:get(Cid, State2a#rabbit_fifo.consumers), + ?assertEqual(#{}, Ch), %% validate consumer has credit {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), ?ASSERT_EFF({monitor, node, _}, Effects2), ?assertNoEffect({demonitor, process, _}, Effects2), % when the node comes up we need to retry the process monitors for the % disconnected processes - {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), + {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), + #consumer{status = up} = maps:get(Cid, State3#rabbit_fifo.consumers), % try to re-monitor the suspect processes ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), @@ -420,6 +424,10 @@ down_with_noconnection_returns_unack_test(_) -> {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), ?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)), ?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)), + ?assertMatch(#consumer{checked_out = Ch, + status = suspected_down} + when map_size(Ch) == 0, + maps:get(Cid, State2a#rabbit_fifo.consumers)), ok. down_with_noproc_enqueuer_is_cleaned_up_test(_) -> @@ -439,7 +447,8 @@ discarded_message_without_dead_letter_handler_is_removed_test(_) -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1), + {_State2, _, Effects2} = apply(meta(1), + rabbit_fifo:make_discard(Cid, [0]), State1), ?assertNoEffect({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects2), @@ -823,26 +832,30 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> test_util:fake_pid(N)} end || N <- Nodes], % adding some consumers - State1 = lists:foldl( - fun(CId, Acc0) -> - {Acc, _, _} = - apply(Meta, - make_checkout(CId, - {once, 1, simple_prefetch}, #{}), - Acc0), - Acc - end, State0, ConsumerIds), + State1a = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), %% assert the consumer is up ?assertMatch(#{C1 := #consumer{status = up}}, - State1#rabbit_fifo.consumers), + State1a#rabbit_fifo.consumers), + + {State1, _} = enq(10, 1, msg, State1a), % simulate node goes down {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1), %% assert a new consumer is in place and it is up - ?assertMatch([{C2, #consumer{status = up}}], - maps:to_list(State2#rabbit_fifo.consumers)), + ?assertMatch([{C2, #consumer{status = up, + checked_out = Ch}}] + when map_size(Ch) == 1, + maps:to_list(State2#rabbit_fifo.consumers)), %% the disconnected consumer has been returned to waiting ?assert(lists:any(fun ({C,_}) -> C =:= C1 end, @@ -1062,11 +1075,11 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> State1 = lists:foldl(AddConsumer, State0, [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), + {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1), % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node ?assertEqual(4 + 1, length(Effects2)), - {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), + {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2), % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID ?assertEqual(4 + 4, length(Effects3)). @@ -1095,11 +1108,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), + {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1), % one monitor and one consumer status update (deactivated) ?assertEqual(3, length(Effects2)), - {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), + {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID ?assertEqual(5, length(Effects3)). |
