diff options
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 73 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 27 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_SUITE.erl | 24 |
3 files changed, 86 insertions, 38 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index b205eda0b7..618dc8f4f9 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -370,7 +370,7 @@ apply(#{index := Index}, #purge{}, Effects0 = [garbage_collection], Reply = {purge, Total}, {State, _, Effects} = evaluate_limit(Index, false, State0, - State1, Effects0), + State1, Effects0), update_smallest_raft_index(Index, Reply, State, Effects); apply(_Meta, #garbage_collection{}, State) -> {State, ok, [{aux, garbage_collection}]}; @@ -1196,7 +1196,8 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, case evaluate_memory_limit(Header, State0) of true -> % indexed message with header map - {State0, {RaftIdx, {Header, 'empty'}}}; + {State0, + {RaftIdx, {Header, 'empty'}}}; false -> {add_in_memory_counts(Header, State0), {RaftIdx, {Header, RawMsg}}} % indexed message with header map @@ -1358,9 +1359,9 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, #consumer{checked_out = Checked0} = Con0, Effects0, State0) -> Discarded = maps:with(MsgIds, Checked0), - {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0, + {State1, Effects1} = complete(Meta, ConsumerId, Discarded, Con0, Effects0, State0), - {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false), + {State, ok, Effects} = checkout(Meta, State0, State1, Effects1, false), update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, @@ -1536,7 +1537,7 @@ checkout(Meta, OldState, State, Effects) -> checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0, Effects0, HandleConsumerChanges) -> {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0), - Effects0, {#{}, #{}}), + Effects0, #{}), case evaluate_limit(Index, false, OldState, State1, Effects1) of {State, true, Effects} -> case maybe_notify_decorators(State, HandleConsumerChanges) of @@ -1557,28 +1558,28 @@ checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldS end. checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State}, - Effects, {SendAcc, LogAcc0}) -> + Effects, SendAcc0) -> DelMsg = {RaftIdx, {MsgId, Header}}, - LogAcc = maps:update_with(ConsumerId, - fun (M) -> [DelMsg | M] end, - [DelMsg], LogAcc0), - checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc}); + SendAcc = maps:update_with(ConsumerId, + fun ({InMem, LogMsgs}) -> + {InMem, [DelMsg | LogMsgs]} + end, {[], [DelMsg]}, SendAcc0), + checkout0(Meta, checkout_one(Meta, State), Effects, SendAcc); checkout0(Meta, {success, ConsumerId, MsgId, Msg, State}, Effects, - {SendAcc0, LogAcc}) -> + SendAcc0) -> DelMsg = {MsgId, Msg}, SendAcc = maps:update_with(ConsumerId, - fun (M) -> [DelMsg | M] end, - [DelMsg], SendAcc0), - checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc}); -checkout0(_Meta, {Activity, State0}, Effects0, {SendAcc, LogAcc}) -> + fun ({InMem, LogMsgs}) -> + {[DelMsg | InMem], LogMsgs} + end, {[DelMsg], []}, SendAcc0), + checkout0(Meta, checkout_one(Meta, State), Effects, SendAcc); +checkout0(_Meta, {Activity, State0}, Effects0, SendAcc) -> Effects1 = case Activity of nochange -> - append_send_msg_effects( - append_log_effects(Effects0, LogAcc), SendAcc); + append_delivery_effects(Effects0, SendAcc); inactive -> [{aux, inactive} - | append_send_msg_effects( - append_log_effects(Effects0, LogAcc), SendAcc)] + | append_delivery_effects(Effects0, SendAcc)] end, {State0, ok, lists:reverse(Effects1)}. @@ -1648,17 +1649,9 @@ evaluate_memory_limit(Size, when is_integer(Size) -> (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes). -append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> - Effects; -append_send_msg_effects(Effects0, AccMap) -> - Effects = maps:fold(fun (C, Msgs, Ef) -> - [send_msg_effect(C, lists:reverse(Msgs)) | Ef] - end, Effects0, AccMap), - [{aux, active} | Effects]. - -append_log_effects(Effects0, AccMap) -> - maps:fold(fun (C, Msgs, Ef) -> - [send_log_effect(C, lists:reverse(Msgs)) | Ef] +append_delivery_effects(Effects0, AccMap) -> + maps:fold(fun (C, {InMemMsgs, LogMsgs}, Ef) -> + [delivery_effect(C, lists:reverse(LogMsgs), InMemMsgs) | Ef] end, Effects0, AccMap). %% next message is determined as follows: @@ -1709,16 +1702,22 @@ take_next_msg(#?MODULE{returns = Returns, end end. -send_msg_effect({CTag, CPid}, Msgs) -> - {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}. - -send_log_effect({CTag, CPid}, IdxMsgs) -> +delivery_effect({CTag, CPid}, [], InMemMsgs) -> + {send_msg, CPid, {delivery, CTag, lists:reverse(InMemMsgs)}, + [local, ra_event]}; +delivery_effect({CTag, CPid}, IdxMsgs, InMemMsgs) -> {RaftIdxs, Data} = lists:unzip(IdxMsgs), {log, RaftIdxs, fun(Log) -> - Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> - {MsgId, {Header, Msg}} - end, Log, Data), + Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> + {MsgId, {Header, Msg}} + end, Log, Data), + Msgs = case InMemMsgs of + [] -> + Msgs0; + _ -> + lists:sort(InMemMsgs ++ Msgs0) + end, [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}] end, {local, node(CPid)}}. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 92167c319a..fdd9b9e74a 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -2260,6 +2260,8 @@ queue_length_in_memory_limit(Config) -> Msg2 = <<"msg11">>, Msg3 = <<"msg111">>, Msg4 = <<"msg1111">>, + Msg5 = <<"msg1111">>, + publish(Ch, QQ, Msg1), publish(Ch, QQ, Msg2), @@ -2278,7 +2280,12 @@ queue_length_in_memory_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], - dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + publish(Ch, QQ, Msg5), + wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]), + ExpectedMsgs = [Msg2, Msg3, Msg4, Msg5], + validate_queue(Ch, QQ, ExpectedMsgs), + ok. queue_length_in_memory_limit_returns(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -2807,3 +2814,21 @@ queue_names(Records) -> #resource{name = Name} = amqqueue:get_name(Q), Name end || Q <- Records]. + + +validate_queue(Ch, Queue, ExpectedMsgs) -> + qos(Ch, length(ExpectedMsgs), false), + subscribe(Ch, Queue, false), + [begin + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = false}, + #amqp_msg{payload = M}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = false}) + after 2000 -> + flush(10), + exit({validate_queue_timeout, M}) + end + end || M <- ExpectedMsgs], + ok. diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index fb98047530..2c5b15295f 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -503,6 +503,27 @@ discarded_message_with_dead_letter_handler_emits_log_effect_test(_) -> ?ASSERT_EFF({log, _RaftIdxs, _}, Effects2), ok. +mixed_send_msg_and_log_effects_are_correctly_ordered_test(_) -> + Cid = {cid(?FUNCTION_NAME), self()}, + State00 = init(#{name => test, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + max_in_memory_length =>1, + dead_letter_handler => + {somemod, somefun, [somearg]}}), + %% enqueue two messages + {State0, _} = enq(1, 1, first, State00), + {State1, _} = enq(2, 2, snd, State0), + + {_State2, Effects1} = check_n(Cid, 3, 10, State1), + ct:pal("Effects ~w", [Effects1]), + %% in this case we expect no send_msg effect as any in memory messages + %% should be weaved into the send_msg effect emitted by the log effect + %% later. hence this is all we can assert on + %% as we need to send message is in the correct order to the consuming + %% channel or the channel may think a message has been lost in transit + ?ASSERT_NO_EFF({send_msg, _, _, _}, Effects1), + ok. + tick_test(_) -> Cid = {<<"c">>, self()}, Cid2 = {<<"c2">>, self()}, @@ -1712,3 +1733,6 @@ apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State). init_aux(Conf) -> rabbit_fifo:init_aux(Conf). handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M). make_checkout(C, S, M) -> rabbit_fifo:make_checkout(C, S, M). + +cid(A) when is_atom(A) -> + atom_to_binary(A, utf8). |