summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-03-18 17:57:25 +0000
committerkjnilsson <knilsson@pivotal.io>2019-03-18 17:57:25 +0000
commit7d4fcc644718f749f86b5207dac435d2cccb5664 (patch)
tree10ce5076f1cb79a9fec6d3a8219de5f581a51fe0
parent568bbd358487195ee325e57c8b72ec7a5d7735ea (diff)
downloadrabbitmq-server-git-7d4fcc644718f749f86b5207dac435d2cccb5664.tar.gz
rabbit_fifo: Ensure checkout is performed
After a down command is processed
-rw-r--r--src/rabbit_fifo.erl20
-rw-r--r--test/rabbit_fifo_SUITE.erl51
2 files changed, 42 insertions, 29 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index c09c8a4f22..b966dca82e 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -308,7 +308,7 @@ apply(#{index := RaftIdx}, #purge{},
%% reverse the effects ourselves
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
-apply(_, {down, Pid, noconnection},
+apply(Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0,
@@ -346,8 +346,8 @@ apply(_, {down, Pid, noconnection},
(_, E) -> E
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
- {State#?MODULE{enqueuers = Enqs}, ok, Effects};
-apply(_, {down, Pid, noconnection},
+ checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+apply(Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
%% A node has been disconnected. This doesn't necessarily mean that
@@ -383,14 +383,14 @@ apply(_, {down, Pid, noconnection},
% Monitor the node so that we can "unsuspect" these processes when the node
% comes back, then re-issue all monitors and discover the final fate of
% these processes
- Effects2 = case maps:size(State#?MODULE.consumers) of
- 0 ->
- [{aux, inactive}, {monitor, node, Node}];
- _ ->
- [{monitor, node, Node}]
- end ++ Effects1,
+ Effects = case maps:size(State#?MODULE.consumers) of
+ 0 ->
+ [{aux, inactive}, {monitor, node, Node}];
+ _ ->
+ [{monitor, node, Node}]
+ end ++ Effects1,
%% TODO: should we run a checkout here?
- {State#?MODULE{enqueuers = Enqs}, ok, Effects2};
+ checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index fccda9772a..6582104708 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -395,14 +395,18 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) ->
% monitor both enqueuer and consumer
% because we received a noconnection we now need to monitor the node
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- #consumer{credit = 1} = maps:get(Cid, State2a#rabbit_fifo.consumers),
+ #consumer{credit = 1,
+ checked_out = Ch,
+ status = suspected_down} = maps:get(Cid, State2a#rabbit_fifo.consumers),
+ ?assertEqual(#{}, Ch),
%% validate consumer has credit
{State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
?ASSERT_EFF({monitor, node, _}, Effects2),
?assertNoEffect({demonitor, process, _}, Effects2),
% when the node comes up we need to retry the process monitors for the
% disconnected processes
- {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
+ {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
+ #consumer{status = up} = maps:get(Cid, State3#rabbit_fifo.consumers),
% try to re-monitor the suspect processes
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
@@ -420,6 +424,10 @@ down_with_noconnection_returns_unack_test(_) ->
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)),
?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)),
+ ?assertMatch(#consumer{checked_out = Ch,
+ status = suspected_down}
+ when map_size(Ch) == 0,
+ maps:get(Cid, State2a#rabbit_fifo.consumers)),
ok.
down_with_noproc_enqueuer_is_cleaned_up_test(_) ->
@@ -439,7 +447,8 @@ discarded_message_without_dead_letter_handler_is_removed_test(_) ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1),
+ {_State2, _, Effects2} = apply(meta(1),
+ rabbit_fifo:make_discard(Cid, [0]), State1),
?assertNoEffect({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects2),
@@ -823,26 +832,30 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
test_util:fake_pid(N)}
end || N <- Nodes],
% adding some consumers
- State1 = lists:foldl(
- fun(CId, Acc0) ->
- {Acc, _, _} =
- apply(Meta,
- make_checkout(CId,
- {once, 1, simple_prefetch}, #{}),
- Acc0),
- Acc
- end, State0, ConsumerIds),
+ State1a = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
%% assert the consumer is up
?assertMatch(#{C1 := #consumer{status = up}},
- State1#rabbit_fifo.consumers),
+ State1a#rabbit_fifo.consumers),
+
+ {State1, _} = enq(10, 1, msg, State1a),
% simulate node goes down
{State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1),
%% assert a new consumer is in place and it is up
- ?assertMatch([{C2, #consumer{status = up}}],
- maps:to_list(State2#rabbit_fifo.consumers)),
+ ?assertMatch([{C2, #consumer{status = up,
+ checked_out = Ch}}]
+ when map_size(Ch) == 1,
+ maps:to_list(State2#rabbit_fifo.consumers)),
%% the disconnected consumer has been returned to waiting
?assert(lists:any(fun ({C,_}) -> C =:= C1 end,
@@ -1062,11 +1075,11 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
State1 = lists:foldl(AddConsumer, State0,
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1),
% 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
?assertEqual(4 + 1, length(Effects2)),
- {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
?assertEqual(4 + 4, length(Effects3)).
@@ -1095,11 +1108,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
{<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1),
% one monitor and one consumer status update (deactivated)
?assertEqual(3, length(Effects2)),
- {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
?assertEqual(5, length(Effects3)).