diff options
author | kjnilsson <knilsson@pivotal.io> | 2021-04-14 17:37:47 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2021-04-15 15:01:22 +0100 |
commit | b35c29d7b2703389dcc9130690f0bf98d1f3b217 (patch) | |
tree | 48a457bd9208ee6758818f3e56d14c0e9a6d173f | |
parent | c975457199f80ad9ef2291756a075927b770f473 (diff) | |
download | rabbitmq-server-git-qq-in-memory-limit-bug-fix.tar.gz |
QQ: ensure that messages are delivered in orderqq-in-memory-limit-bug-fix
In the case where there are some messages kept in memory mixed with
some that are not it is possible that a messages are delivered to the
consuming channel with gaps/out of order which would in some cases cause
the channel to treat them as re-sends it has already seen and just
discard them. When this happens the messages get stuck in the consumer
state inside the queue and are never seen by the client consumer and
thus never acked. When this happen the release cursors can't be emitted
as the smallest raft index will be one of the stuck messages.
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 73 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 27 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_SUITE.erl | 24 |
3 files changed, 86 insertions, 38 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index b205eda0b7..618dc8f4f9 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -370,7 +370,7 @@ apply(#{index := Index}, #purge{}, Effects0 = [garbage_collection], Reply = {purge, Total}, {State, _, Effects} = evaluate_limit(Index, false, State0, - State1, Effects0), + State1, Effects0), update_smallest_raft_index(Index, Reply, State, Effects); apply(_Meta, #garbage_collection{}, State) -> {State, ok, [{aux, garbage_collection}]}; @@ -1196,7 +1196,8 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, case evaluate_memory_limit(Header, State0) of true -> % indexed message with header map - {State0, {RaftIdx, {Header, 'empty'}}}; + {State0, + {RaftIdx, {Header, 'empty'}}}; false -> {add_in_memory_counts(Header, State0), {RaftIdx, {Header, RawMsg}}} % indexed message with header map @@ -1358,9 +1359,9 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, #consumer{checked_out = Checked0} = Con0, Effects0, State0) -> Discarded = maps:with(MsgIds, Checked0), - {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0, + {State1, Effects1} = complete(Meta, ConsumerId, Discarded, Con0, Effects0, State0), - {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), + {State, ok, Effects} = checkout(Meta, State0, State1, Effects1, false), update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, @@ -1536,7 +1537,7 @@ checkout(Meta, OldState, State, Effects) -> checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0, Effects0, HandleConsumerChanges) -> {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0), - Effects0, {#{}, #{}}), + Effects0, #{}), case evaluate_limit(Index, false, OldState, State1, Effects1) of {State, true, Effects} -> case maybe_notify_decorators(State, HandleConsumerChanges) of @@ -1557,28 +1558,28 @@ checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldS end. checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, - Effects, {SendAcc, LogAcc0}) -> + Effects, SendAcc0) -> DelMsg = {RaftIdx, {MsgId, Header}}, - LogAcc = maps:update_with(ConsumerId, - fun (M) -> [DelMsg | M] end, - [DelMsg], LogAcc0), - checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc}); + SendAcc = maps:update_with(ConsumerId, + fun ({InMem, LogMsgs}) -> + {InMem, [DelMsg | LogMsgs]} + end, {[], [DelMsg]}, SendAcc0), + checkout0(Meta, checkout_one(Meta, State), Effects, SendAcc); checkout0(Meta, {success, ConsumerId, MsgId, Msg, State}, Effects, - {SendAcc0, LogAcc}) -> + SendAcc0) -> DelMsg = {MsgId, Msg}, SendAcc = maps:update_with(ConsumerId, - fun (M) -> [DelMsg | M] end, - [DelMsg], SendAcc0), - checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc}); -checkout0(_Meta, {Activity, State0}, Effects0, {SendAcc, LogAcc}) -> + fun ({InMem, LogMsgs}) -> + {[DelMsg | InMem], LogMsgs} + end, {[DelMsg], []}, SendAcc0), + checkout0(Meta, checkout_one(Meta, State), Effects, SendAcc); +checkout0(_Meta, {Activity, State0}, Effects0, SendAcc) -> Effects1 = case Activity of nochange -> - append_send_msg_effects( - append_log_effects(Effects0, LogAcc), SendAcc); + append_delivery_effects(Effects0, SendAcc); inactive -> [{aux, inactive} - | append_send_msg_effects( - append_log_effects(Effects0, LogAcc), SendAcc)] + | append_delivery_effects(Effects0, SendAcc)] end, {State0, ok, lists:reverse(Effects1)}. @@ -1648,17 +1649,9 @@ evaluate_memory_limit(Size, when is_integer(Size) -> (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes). -append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> - Effects; -append_send_msg_effects(Effects0, AccMap) -> - Effects = maps:fold(fun (C, Msgs, Ef) -> - [send_msg_effect(C, lists:reverse(Msgs)) | Ef] - end, Effects0, AccMap), - [{aux, active} | Effects]. - -append_log_effects(Effects0, AccMap) -> - maps:fold(fun (C, Msgs, Ef) -> - [send_log_effect(C, lists:reverse(Msgs)) | Ef] +append_delivery_effects(Effects0, AccMap) -> + maps:fold(fun (C, {InMemMsgs, LogMsgs}, Ef) -> + [delivery_effect(C, lists:reverse(LogMsgs), InMemMsgs) | Ef] end, Effects0, AccMap). %% next message is determined as follows: @@ -1709,16 +1702,22 @@ take_next_msg(#?MODULE{returns = Returns, end end. -send_msg_effect({CTag, CPid}, Msgs) -> - {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}. - -send_log_effect({CTag, CPid}, IdxMsgs) -> +delivery_effect({CTag, CPid}, [], InMemMsgs) -> + {send_msg, CPid, {delivery, CTag, lists:reverse(InMemMsgs)}, + [local, ra_event]}; +delivery_effect({CTag, CPid}, IdxMsgs, InMemMsgs) -> {RaftIdxs, Data} = lists:unzip(IdxMsgs), {log, RaftIdxs, fun(Log) -> - Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> - {MsgId, {Header, Msg}} - end, Log, Data), + Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> + {MsgId, {Header, Msg}} + end, Log, Data), + Msgs = case InMemMsgs of + [] -> + Msgs0; + _ -> + lists:sort(InMemMsgs ++ Msgs0) + end, [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}] end, {local, node(CPid)}}. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 92167c319a..fdd9b9e74a 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -2260,6 +2260,8 @@ queue_length_in_memory_limit(Config) -> Msg2 = <<"msg11">>, Msg3 = <<"msg111">>, Msg4 = <<"msg1111">>, + Msg5 = <<"msg1111">>, + publish(Ch, QQ, Msg1), publish(Ch, QQ, Msg2), @@ -2278,7 +2280,12 @@ queue_length_in_memory_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], - dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + publish(Ch, QQ, Msg5), + wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]), + ExpectedMsgs = [Msg2, Msg3, Msg4, Msg5], + validate_queue(Ch, QQ, ExpectedMsgs), + ok. queue_length_in_memory_limit_returns(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -2807,3 +2814,21 @@ queue_names(Records) -> #resource{name = Name} = amqqueue:get_name(Q), Name end || Q <- Records]. + + +validate_queue(Ch, Queue, ExpectedMsgs) -> + qos(Ch, length(ExpectedMsgs), false), + subscribe(Ch, Queue, false), + [begin + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = false}, + #amqp_msg{payload = M}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = false}) + after 2000 -> + flush(10), + exit({validate_queue_timeout, M}) + end + end || M <- ExpectedMsgs], + ok. diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index fb98047530..2c5b15295f 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -503,6 +503,27 @@ discarded_message_with_dead_letter_handler_emits_log_effect_test(_) -> ?ASSERT_EFF({log, _RaftIdxs, _}, Effects2), ok. +mixed_send_msg_and_log_effects_are_correctly_ordered_test(_) -> + Cid = {cid(?FUNCTION_NAME), self()}, + State00 = init(#{name => test, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + max_in_memory_length =>1, + dead_letter_handler => + {somemod, somefun, [somearg]}}), + %% enqueue two messages + {State0, _} = enq(1, 1, first, State00), + {State1, _} = enq(2, 2, snd, State0), + + {_State2, Effects1} = check_n(Cid, 3, 10, State1), + ct:pal("Effects ~w", [Effects1]), + %% in this case we expect no send_msg effect as any in memory messages + %% should be weaved into the send_msg effect emitted by the log effect + %% later. hence this is all we can assert on + %% as we need to send message is in the correct order to the consuming + %% channel or the channel may think a message has been lost in transit + ?ASSERT_NO_EFF({send_msg, _, _, _}, Effects1), + ok. + tick_test(_) -> Cid = {<<"c">>, self()}, Cid2 = {<<"c2">>, self()}, @@ -1712,3 +1733,6 @@ apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State). init_aux(Conf) -> rabbit_fifo:init_aux(Conf). handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M). make_checkout(C, S, M) -> rabbit_fifo:make_checkout(C, S, M). + +cid(A) when is_atom(A) -> + atom_to_binary(A, utf8). |