summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl73
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl27
-rw-r--r--deps/rabbit/test/rabbit_fifo_SUITE.erl24
3 files changed, 86 insertions, 38 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index b205eda0b7..618dc8f4f9 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -370,7 +370,7 @@ apply(#{index := Index}, #purge{},
Effects0 = [garbage_collection],
Reply = {purge, Total},
{State, _, Effects} = evaluate_limit(Index, false, State0,
- State1, Effects0),
+ State1, Effects0),
update_smallest_raft_index(Index, Reply, State, Effects);
apply(_Meta, #garbage_collection{}, State) ->
{State, ok, [{aux, garbage_collection}]};
@@ -1196,7 +1196,8 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
case evaluate_memory_limit(Header, State0) of
true ->
% indexed message with header map
- {State0, {RaftIdx, {Header, 'empty'}}};
+ {State0,
+ {RaftIdx, {Header, 'empty'}}};
false ->
{add_in_memory_counts(Header, State0),
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
@@ -1358,9 +1359,9 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, State0) ->
Discarded = maps:with(MsgIds, Checked0),
- {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
+ {State1, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
Effects0, State0),
- {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
+ {State, ok, Effects} = checkout(Meta, State0, State1, Effects1, false),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
@@ -1536,7 +1537,7 @@ checkout(Meta, OldState, State, Effects) ->
checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0,
Effects0, HandleConsumerChanges) ->
{State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
- Effects0, {#{}, #{}}),
+ Effects0, #{}),
case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} ->
case maybe_notify_decorators(State, HandleConsumerChanges) of
@@ -1557,28 +1558,28 @@ checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldS
end.
checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
- Effects, {SendAcc, LogAcc0}) ->
+ Effects, SendAcc0) ->
DelMsg = {RaftIdx, {MsgId, Header}},
- LogAcc = maps:update_with(ConsumerId,
- fun (M) -> [DelMsg | M] end,
- [DelMsg], LogAcc0),
- checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+ SendAcc = maps:update_with(ConsumerId,
+ fun ({InMem, LogMsgs}) ->
+ {InMem, [DelMsg | LogMsgs]}
+ end, {[], [DelMsg]}, SendAcc0),
+ checkout0(Meta, checkout_one(Meta, State), Effects, SendAcc);
checkout0(Meta, {success, ConsumerId, MsgId, Msg, State}, Effects,
- {SendAcc0, LogAcc}) ->
+ SendAcc0) ->
DelMsg = {MsgId, Msg},
SendAcc = maps:update_with(ConsumerId,
- fun (M) -> [DelMsg | M] end,
- [DelMsg], SendAcc0),
- checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
-checkout0(_Meta, {Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
+ fun ({InMem, LogMsgs}) ->
+ {[DelMsg | InMem], LogMsgs}
+ end, {[DelMsg], []}, SendAcc0),
+ checkout0(Meta, checkout_one(Meta, State), Effects, SendAcc);
+checkout0(_Meta, {Activity, State0}, Effects0, SendAcc) ->
Effects1 = case Activity of
nochange ->
- append_send_msg_effects(
- append_log_effects(Effects0, LogAcc), SendAcc);
+ append_delivery_effects(Effects0, SendAcc);
inactive ->
[{aux, inactive}
- | append_send_msg_effects(
- append_log_effects(Effects0, LogAcc), SendAcc)]
+ | append_delivery_effects(Effects0, SendAcc)]
end,
{State0, ok, lists:reverse(Effects1)}.
@@ -1648,17 +1649,9 @@ evaluate_memory_limit(Size,
when is_integer(Size) ->
(Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
-append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
- Effects;
-append_send_msg_effects(Effects0, AccMap) ->
- Effects = maps:fold(fun (C, Msgs, Ef) ->
- [send_msg_effect(C, lists:reverse(Msgs)) | Ef]
- end, Effects0, AccMap),
- [{aux, active} | Effects].
-
-append_log_effects(Effects0, AccMap) ->
- maps:fold(fun (C, Msgs, Ef) ->
- [send_log_effect(C, lists:reverse(Msgs)) | Ef]
+append_delivery_effects(Effects0, AccMap) ->
+ maps:fold(fun (C, {InMemMsgs, LogMsgs}, Ef) ->
+ [delivery_effect(C, lists:reverse(LogMsgs), InMemMsgs) | Ef]
end, Effects0, AccMap).
%% next message is determined as follows:
@@ -1709,16 +1702,22 @@ take_next_msg(#?MODULE{returns = Returns,
end
end.
-send_msg_effect({CTag, CPid}, Msgs) ->
- {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}.
-
-send_log_effect({CTag, CPid}, IdxMsgs) ->
+delivery_effect({CTag, CPid}, [], InMemMsgs) ->
+ {send_msg, CPid, {delivery, CTag, lists:reverse(InMemMsgs)},
+ [local, ra_event]};
+delivery_effect({CTag, CPid}, IdxMsgs, InMemMsgs) ->
{RaftIdxs, Data} = lists:unzip(IdxMsgs),
{log, RaftIdxs,
fun(Log) ->
- Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
- {MsgId, {Header, Msg}}
- end, Log, Data),
+ Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ {MsgId, {Header, Msg}}
+ end, Log, Data),
+ Msgs = case InMemMsgs of
+ [] ->
+ Msgs0;
+ _ ->
+ lists:sort(InMemMsgs ++ Msgs0)
+ end,
[{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}]
end,
{local, node(CPid)}}.
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 92167c319a..fdd9b9e74a 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -2260,6 +2260,8 @@ queue_length_in_memory_limit(Config) ->
Msg2 = <<"msg11">>,
Msg3 = <<"msg111">>,
Msg4 = <<"msg1111">>,
+ Msg5 = <<"msg1111">>,
+
publish(Ch, QQ, Msg1),
publish(Ch, QQ, Msg2),
@@ -2278,7 +2280,12 @@ queue_length_in_memory_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
- dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ publish(Ch, QQ, Msg5),
+ wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]),
+ ExpectedMsgs = [Msg2, Msg3, Msg4, Msg5],
+ validate_queue(Ch, QQ, ExpectedMsgs),
+ ok.
queue_length_in_memory_limit_returns(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -2807,3 +2814,21 @@ queue_names(Records) ->
#resource{name = Name} = amqqueue:get_name(Q),
Name
end || Q <- Records].
+
+
+validate_queue(Ch, Queue, ExpectedMsgs) ->
+ qos(Ch, length(ExpectedMsgs), false),
+ subscribe(Ch, Queue, false),
+ [begin
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = false},
+ #amqp_msg{payload = M}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = false})
+ after 2000 ->
+ flush(10),
+ exit({validate_queue_timeout, M})
+ end
+ end || M <- ExpectedMsgs],
+ ok.
diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl
index fb98047530..2c5b15295f 100644
--- a/deps/rabbit/test/rabbit_fifo_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl
@@ -503,6 +503,27 @@ discarded_message_with_dead_letter_handler_emits_log_effect_test(_) ->
?ASSERT_EFF({log, _RaftIdxs, _}, Effects2),
ok.
+mixed_send_msg_and_log_effects_are_correctly_ordered_test(_) ->
+ Cid = {cid(?FUNCTION_NAME), self()},
+ State00 = init(#{name => test,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ max_in_memory_length =>1,
+ dead_letter_handler =>
+ {somemod, somefun, [somearg]}}),
+ %% enqueue two messages
+ {State0, _} = enq(1, 1, first, State00),
+ {State1, _} = enq(2, 2, snd, State0),
+
+ {_State2, Effects1} = check_n(Cid, 3, 10, State1),
+ ct:pal("Effects ~w", [Effects1]),
+ %% in this case we expect no send_msg effect as any in memory messages
+ %% should be weaved into the send_msg effect emitted by the log effect
+ %% later. hence this is all we can assert on
+ %% as we need to send message is in the correct order to the consuming
+ %% channel or the channel may think a message has been lost in transit
+ ?ASSERT_NO_EFF({send_msg, _, _, _}, Effects1),
+ ok.
+
tick_test(_) ->
Cid = {<<"c">>, self()},
Cid2 = {<<"c2">>, self()},
@@ -1712,3 +1733,6 @@ apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M).
make_checkout(C, S, M) -> rabbit_fifo:make_checkout(C, S, M).
+
+cid(A) when is_atom(A) ->
+ atom_to_binary(A, utf8).