diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-04-29 18:02:05 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-04-30 10:05:49 +0100 |
| commit | de686a07acf0a24bca14cfe076a6e2b0e6e810d7 (patch) | |
| tree | 021c61a538cc25bf3336c5f192a8549ef0797d35 | |
| parent | 80df800a688440d047885c977578af38b6699760 (diff) | |
| download | rabbitmq-server-git-de686a07acf0a24bca14cfe076a6e2b0e6e810d7.tar.gz | |
QQ SAC: process all consumers on noconnection
There can be multiple single active consumers in the consumers map if
all but one are cancelled. Take this into account when processing
noconnections.
[#165438843]
| -rw-r--r-- | src/rabbit_fifo.erl | 53 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 55 |
2 files changed, 74 insertions, 34 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 609fa0111c..d5893e9d7f 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -331,32 +331,33 @@ apply(Meta, {down, Pid, noconnection}, waiting_consumers = Waiting0, enqueuers = Enqs0} = State0) -> Node = node(Pid), - %% if the pid refers to the active consumer, mark it as suspected and return - %% it to the waiting queue + %% if the pid refers to an active or cancelled consumer, + %% mark it as suspected and return it to the waiting queue {State1, Effects0} = - case maps:to_list(Cons0) of - [{{_, P} = Cid, C0}] when node(P) =:= Node -> - %% the consumer should be returned to waiting - %% and checked out messages should be returned - Effs = consumer_update_active_effects( - State0, Cid, C0, false, suspected_down, []), - Checked = C0#consumer.checked_out, - Credit = increase_credit(C0, maps:size(Checked)), - {St, Effs1} = return_all(State0, Effs, - Cid, C0#consumer{credit = Credit}), - %% if the consumer was cancelled there is a chance it got - %% removed when returning hence we need to be defensive here - Waiting = case St#?MODULE.consumers of - #{Cid := C} -> - Waiting0 ++ [{Cid, C}]; - _ -> - Waiting0 - end, - {St#?MODULE{consumers = #{}, - waiting_consumers = Waiting}, - Effs1}; - _ -> {State0, []} - end, + maps:fold(fun({_, P} = Cid, C0, {S0, E0}) + when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% and checked out messages should be returned + Effs = consumer_update_active_effects( + S0, Cid, C0, false, suspected_down, E0), + Checked = C0#consumer.checked_out, + Credit = increase_credit(C0, maps:size(Checked)), + {St, Effs1} = return_all(S0, Effs, + Cid, C0#consumer{credit = Credit}), + %% if the consumer was cancelled there is a chance it got + %% removed when returning hence we need to be defensive here + Waiting = case St#?MODULE.consumers of + #{Cid := C} -> + Waiting0 ++ [{Cid, C}]; + _ -> + Waiting0 + end, + {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers), + waiting_consumers = Waiting}, + Effs1}; + (_, _, S) -> + S + end, {State0, []}, Cons0), WaitingConsumers = update_waiting_consumer_status(Node, State1, suspected_down), @@ -822,7 +823,7 @@ cancel_consumer(ConsumerId, Effects0, Reason), activate_next_consumer(State1, Effects1); false -> - % The cancelled consumer is not the active one + % The cancelled consumer is not active or cancelled % Just remove it from idle_consumers Waiting = lists:keydelete(ConsumerId, 1, Waiting0), Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 2472827ffd..949019a131 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -51,6 +51,7 @@ all_tests() -> single_active_03, single_active_ordering, single_active_ordering_01, + single_active_ordering_03, in_memory_limit % single_active_ordering_02 ]. @@ -488,7 +489,7 @@ single_active(_Config) -> end, [], Size). single_active_ordering(_Config) -> - Size = 2000, + Size = 4000, Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end}, run_proper( fun () -> @@ -546,6 +547,42 @@ single_active_ordering_02(_Config) -> ?assert(single_active_prop(Conf, Commands, true)), ok. +single_active_ordering_03(_Config) -> + C1Pid = test_util:fake_pid(node()), + C1 = {<<1>>, C1Pid}, + C2Pid = test_util:fake_pid(rabbit@fake_node2), + C2 = {<<2>>, C2Pid}, + E = test_util:fake_pid(rabbit@fake_node2), + Commands = [ + make_enqueue(E, 1, 0), + make_enqueue(E, 2, 1), + make_enqueue(E, 3, 2), + make_checkout(C1, {auto,1,simple_prefetch}), + make_checkout(C2, {auto,1,simple_prefetch}), + make_settle(C1, [0]), + make_checkout(C1, cancel), + {down, C1Pid, noconnection} + ], + Conf0 = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0), + Conf = Conf0#{release_cursor_interval => 100}, + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + try run_log(test_init(Conf), Entries) of + {State, Effects} -> + ct:pal("Effects: ~p~n", [Effects]), + ct:pal("State: ~p~n", [State]), + %% assert C1 has no messages + ?assertNotMatch(#{C1 := _}, State#rabbit_fifo.consumers), + true; + _ -> + true + catch + Err -> + ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]), + ct:pal("Err: ~p~n", [Err]), + false + end. + in_memory_limit(_Config) -> Size = 2000, run_proper( @@ -630,7 +667,9 @@ single_active_prop(Conf0, Commands, ValidateOrder) -> map_size(Up) =< 1 end, try run_log(test_init(Conf), Entries, Invariant) of - {_State, Effects} when ValidateOrder -> + {State, Effects} when ValidateOrder -> + ct:pal("Effects: ~p~n", [Effects]), + ct:pal("State: ~p~n", [State]), %% validate message ordering lists:foldl(fun ({send_msg, Pid, {delivery, Tag, Msgs}, ra_event}, Acc) -> @@ -718,19 +757,19 @@ log_gen_ordered(Size) -> fakenode@fake2 ], ?LET(EPids, vector(1, pid_gen(Nodes)), - ?LET(CPids, vector(5, pid_gen(Nodes)), + ?LET(CPids, vector(8, pid_gen(Nodes)), resize(Size, list( frequency( [{20, enqueue_gen(oneof(EPids), 10, 0)}, {40, {input_event, - frequency([{10, settle}, - {2, return}, + frequency([{15, settle}, + {1, return}, {1, discard}, {1, requeue}])}}, - {2, checkout_gen(oneof(CPids))}, - {1, checkout_cancel_gen(oneof(CPids))}, - {1, down_gen(oneof(EPids ++ CPids))}, + {7, checkout_gen(oneof(CPids))}, + {2, checkout_cancel_gen(oneof(CPids))}, + {2, down_gen(oneof(EPids ++ CPids))}, {1, nodeup_gen(Nodes)} ]))))). |
