summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-04-30 11:33:02 +0200
committerGitHub <noreply@github.com>2019-04-30 11:33:02 +0200
commite854640a1851e44f7948a97066ef192eaf3085f1 (patch)
tree021c61a538cc25bf3336c5f192a8549ef0797d35
parent80df800a688440d047885c977578af38b6699760 (diff)
parentde686a07acf0a24bca14cfe076a6e2b0e6e810d7 (diff)
downloadrabbitmq-server-git-e854640a1851e44f7948a97066ef192eaf3085f1.tar.gz
Merge pull request #1995 from rabbitmq/sac-qq-fix
QQ SAC: process all consumers on noconnection
-rw-r--r--src/rabbit_fifo.erl53
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl55
2 files changed, 74 insertions, 34 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 609fa0111c..d5893e9d7f 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -331,32 +331,33 @@ apply(Meta, {down, Pid, noconnection},
waiting_consumers = Waiting0,
enqueuers = Enqs0} = State0) ->
Node = node(Pid),
- %% if the pid refers to the active consumer, mark it as suspected and return
- %% it to the waiting queue
+ %% if the pid refers to an active or cancelled consumer,
+ %% mark it as suspected and return it to the waiting queue
{State1, Effects0} =
- case maps:to_list(Cons0) of
- [{{_, P} = Cid, C0}] when node(P) =:= Node ->
- %% the consumer should be returned to waiting
- %% and checked out messages should be returned
- Effs = consumer_update_active_effects(
- State0, Cid, C0, false, suspected_down, []),
- Checked = C0#consumer.checked_out,
- Credit = increase_credit(C0, maps:size(Checked)),
- {St, Effs1} = return_all(State0, Effs,
- Cid, C0#consumer{credit = Credit}),
- %% if the consumer was cancelled there is a chance it got
- %% removed when returning hence we need to be defensive here
- Waiting = case St#?MODULE.consumers of
- #{Cid := C} ->
- Waiting0 ++ [{Cid, C}];
- _ ->
- Waiting0
- end,
- {St#?MODULE{consumers = #{},
- waiting_consumers = Waiting},
- Effs1};
- _ -> {State0, []}
- end,
+ maps:fold(fun({_, P} = Cid, C0, {S0, E0})
+ when node(P) =:= Node ->
+ %% the consumer should be returned to waiting
+ %% and checked out messages should be returned
+ Effs = consumer_update_active_effects(
+ S0, Cid, C0, false, suspected_down, E0),
+ Checked = C0#consumer.checked_out,
+ Credit = increase_credit(C0, maps:size(Checked)),
+ {St, Effs1} = return_all(S0, Effs,
+ Cid, C0#consumer{credit = Credit}),
+ %% if the consumer was cancelled there is a chance it got
+ %% removed when returning hence we need to be defensive here
+ Waiting = case St#?MODULE.consumers of
+ #{Cid := C} ->
+ Waiting0 ++ [{Cid, C}];
+ _ ->
+ Waiting0
+ end,
+ {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers),
+ waiting_consumers = Waiting},
+ Effs1};
+ (_, _, S) ->
+ S
+ end, {State0, []}, Cons0),
WaitingConsumers = update_waiting_consumer_status(Node, State1,
suspected_down),
@@ -822,7 +823,7 @@ cancel_consumer(ConsumerId,
Effects0, Reason),
activate_next_consumer(State1, Effects1);
false ->
- % The cancelled consumer is not the active one
+ % The cancelled consumer is not active or cancelled
% Just remove it from idle_consumers
Waiting = lists:keydelete(ConsumerId, 1, Waiting0),
Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 2472827ffd..949019a131 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -51,6 +51,7 @@ all_tests() ->
single_active_03,
single_active_ordering,
single_active_ordering_01,
+ single_active_ordering_03,
in_memory_limit
% single_active_ordering_02
].
@@ -488,7 +489,7 @@ single_active(_Config) ->
end, [], Size).
single_active_ordering(_Config) ->
- Size = 2000,
+ Size = 4000,
Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end},
run_proper(
fun () ->
@@ -546,6 +547,42 @@ single_active_ordering_02(_Config) ->
?assert(single_active_prop(Conf, Commands, true)),
ok.
+single_active_ordering_03(_Config) ->
+ C1Pid = test_util:fake_pid(node()),
+ C1 = {<<1>>, C1Pid},
+ C2Pid = test_util:fake_pid(rabbit@fake_node2),
+ C2 = {<<2>>, C2Pid},
+ E = test_util:fake_pid(rabbit@fake_node2),
+ Commands = [
+ make_enqueue(E, 1, 0),
+ make_enqueue(E, 2, 1),
+ make_enqueue(E, 3, 2),
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ make_settle(C1, [0]),
+ make_checkout(C1, cancel),
+ {down, C1Pid, noconnection}
+ ],
+ Conf0 = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0),
+ Conf = Conf0#{release_cursor_interval => 100},
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ try run_log(test_init(Conf), Entries) of
+ {State, Effects} ->
+ ct:pal("Effects: ~p~n", [Effects]),
+ ct:pal("State: ~p~n", [State]),
+ %% assert C1 has no messages
+ ?assertNotMatch(#{C1 := _}, State#rabbit_fifo.consumers),
+ true;
+ _ ->
+ true
+ catch
+ Err ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ ct:pal("Err: ~p~n", [Err]),
+ false
+ end.
+
in_memory_limit(_Config) ->
Size = 2000,
run_proper(
@@ -630,7 +667,9 @@ single_active_prop(Conf0, Commands, ValidateOrder) ->
map_size(Up) =< 1
end,
try run_log(test_init(Conf), Entries, Invariant) of
- {_State, Effects} when ValidateOrder ->
+ {State, Effects} when ValidateOrder ->
+ ct:pal("Effects: ~p~n", [Effects]),
+ ct:pal("State: ~p~n", [State]),
%% validate message ordering
lists:foldl(fun ({send_msg, Pid, {delivery, Tag, Msgs}, ra_event},
Acc) ->
@@ -718,19 +757,19 @@ log_gen_ordered(Size) ->
fakenode@fake2
],
?LET(EPids, vector(1, pid_gen(Nodes)),
- ?LET(CPids, vector(5, pid_gen(Nodes)),
+ ?LET(CPids, vector(8, pid_gen(Nodes)),
resize(Size,
list(
frequency(
[{20, enqueue_gen(oneof(EPids), 10, 0)},
{40, {input_event,
- frequency([{10, settle},
- {2, return},
+ frequency([{15, settle},
+ {1, return},
{1, discard},
{1, requeue}])}},
- {2, checkout_gen(oneof(CPids))},
- {1, checkout_cancel_gen(oneof(CPids))},
- {1, down_gen(oneof(EPids ++ CPids))},
+ {7, checkout_gen(oneof(CPids))},
+ {2, checkout_cancel_gen(oneof(CPids))},
+ {2, down_gen(oneof(EPids ++ CPids))},
{1, nodeup_gen(Nodes)}
]))))).