summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2021-04-14 17:37:47 +0100
committerkjnilsson <knilsson@pivotal.io>2021-04-15 15:01:22 +0100
commitb35c29d7b2703389dcc9130690f0bf98d1f3b217 (patch)
tree48a457bd9208ee6758818f3e56d14c0e9a6d173f
parentc975457199f80ad9ef2291756a075927b770f473 (diff)
downloadrabbitmq-server-git-qq-in-memory-limit-bug-fix.tar.gz
QQ: ensure that messages are delivered in orderqq-in-memory-limit-bug-fix
In the case where there are some messages kept in memory mixed with some that are not it is possible that a messages are delivered to the consuming channel with gaps/out of order which would in some cases cause the channel to treat them as re-sends it has already seen and just discard them. When this happens the messages get stuck in the consumer state inside the queue and are never seen by the client consumer and thus never acked. When this happen the release cursors can't be emitted as the smallest raft index will be one of the stuck messages.
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl73
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl27
-rw-r--r--deps/rabbit/test/rabbit_fifo_SUITE.erl24
3 files changed, 86 insertions, 38 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index b205eda0b7..618dc8f4f9 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -370,7 +370,7 @@ apply(#{index := Index}, #purge{},
Effects0 = [garbage_collection],
Reply = {purge, Total},
{State, _, Effects} = evaluate_limit(Index, false, State0,
- State1, Effects0),
+ State1, Effects0),
update_smallest_raft_index(Index, Reply, State, Effects);
apply(_Meta, #garbage_collection{}, State) ->
{State, ok, [{aux, garbage_collection}]};
@@ -1196,7 +1196,8 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
case evaluate_memory_limit(Header, State0) of
true ->
% indexed message with header map
- {State0, {RaftIdx, {Header, 'empty'}}};
+ {State0,
+ {RaftIdx, {Header, 'empty'}}};
false ->
{add_in_memory_counts(Header, State0),
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
@@ -1358,9 +1359,9 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, State0) ->
Discarded = maps:with(MsgIds, Checked0),
- {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
+ {State1, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
Effects0, State0),
- {State, ok, Effects} = checkout(Meta, State0, State2, Effects1, false),
+ {State, ok, Effects} = checkout(Meta, State0, State1, Effects1, false),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
@@ -1536,7 +1537,7 @@ checkout(Meta, OldState, State, Effects) ->
checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldState, State0,
Effects0, HandleConsumerChanges) ->
{State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
- Effects0, {#{}, #{}}),
+ Effects0, #{}),
case evaluate_limit(Index, false, OldState, State1, Effects1) of
{State, true, Effects} ->
case maybe_notify_decorators(State, HandleConsumerChanges) of
@@ -1557,28 +1558,28 @@ checkout(#{index := Index} = Meta, #?MODULE{cfg = #cfg{resource = QName}} = OldS
end.
checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
- Effects, {SendAcc, LogAcc0}) ->
+ Effects, SendAcc0) ->
DelMsg = {RaftIdx, {MsgId, Header}},
- LogAcc = maps:update_with(ConsumerId,
- fun (M) -> [DelMsg | M] end,
- [DelMsg], LogAcc0),
- checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+ SendAcc = maps:update_with(ConsumerId,
+ fun ({InMem, LogMsgs}) ->
+ {InMem, [DelMsg | LogMsgs]}
+ end, {[], [DelMsg]}, SendAcc0),
+ checkout0(Meta, checkout_one(Meta, State), Effects, SendAcc);
checkout0(Meta, {success, ConsumerId, MsgId, Msg, State}, Effects,
- {SendAcc0, LogAcc}) ->
+ SendAcc0) ->
DelMsg = {MsgId, Msg},
SendAcc = maps:update_with(ConsumerId,
- fun (M) -> [DelMsg | M] end,
- [DelMsg], SendAcc0),
- checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
-checkout0(_Meta, {Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
+ fun ({InMem, LogMsgs}) ->
+ {[DelMsg | InMem], LogMsgs}
+ end, {[DelMsg], []}, SendAcc0),
+ checkout0(Meta, checkout_one(Meta, State), Effects, SendAcc);
+checkout0(_Meta, {Activity, State0}, Effects0, SendAcc) ->
Effects1 = case Activity of
nochange ->
- append_send_msg_effects(
- append_log_effects(Effects0, LogAcc), SendAcc);
+ append_delivery_effects(Effects0, SendAcc);
inactive ->
[{aux, inactive}
- | append_send_msg_effects(
- append_log_effects(Effects0, LogAcc), SendAcc)]
+ | append_delivery_effects(Effects0, SendAcc)]
end,
{State0, ok, lists:reverse(Effects1)}.
@@ -1648,17 +1649,9 @@ evaluate_memory_limit(Size,
when is_integer(Size) ->
(Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
-append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
- Effects;
-append_send_msg_effects(Effects0, AccMap) ->
- Effects = maps:fold(fun (C, Msgs, Ef) ->
- [send_msg_effect(C, lists:reverse(Msgs)) | Ef]
- end, Effects0, AccMap),
- [{aux, active} | Effects].
-
-append_log_effects(Effects0, AccMap) ->
- maps:fold(fun (C, Msgs, Ef) ->
- [send_log_effect(C, lists:reverse(Msgs)) | Ef]
+append_delivery_effects(Effects0, AccMap) ->
+ maps:fold(fun (C, {InMemMsgs, LogMsgs}, Ef) ->
+ [delivery_effect(C, lists:reverse(LogMsgs), InMemMsgs) | Ef]
end, Effects0, AccMap).
%% next message is determined as follows:
@@ -1709,16 +1702,22 @@ take_next_msg(#?MODULE{returns = Returns,
end
end.
-send_msg_effect({CTag, CPid}, Msgs) ->
- {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}.
-
-send_log_effect({CTag, CPid}, IdxMsgs) ->
+delivery_effect({CTag, CPid}, [], InMemMsgs) ->
+ {send_msg, CPid, {delivery, CTag, lists:reverse(InMemMsgs)},
+ [local, ra_event]};
+delivery_effect({CTag, CPid}, IdxMsgs, InMemMsgs) ->
{RaftIdxs, Data} = lists:unzip(IdxMsgs),
{log, RaftIdxs,
fun(Log) ->
- Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
- {MsgId, {Header, Msg}}
- end, Log, Data),
+ Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ {MsgId, {Header, Msg}}
+ end, Log, Data),
+ Msgs = case InMemMsgs of
+ [] ->
+ Msgs0;
+ _ ->
+ lists:sort(InMemMsgs ++ Msgs0)
+ end,
[{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}]
end,
{local, node(CPid)}}.
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 92167c319a..fdd9b9e74a 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -2260,6 +2260,8 @@ queue_length_in_memory_limit(Config) ->
Msg2 = <<"msg11">>,
Msg3 = <<"msg111">>,
Msg4 = <<"msg1111">>,
+ Msg5 = <<"msg1111">>,
+
publish(Ch, QQ, Msg1),
publish(Ch, QQ, Msg2),
@@ -2278,7 +2280,12 @@ queue_length_in_memory_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
- dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ publish(Ch, QQ, Msg5),
+ wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]),
+ ExpectedMsgs = [Msg2, Msg3, Msg4, Msg5],
+ validate_queue(Ch, QQ, ExpectedMsgs),
+ ok.
queue_length_in_memory_limit_returns(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -2807,3 +2814,21 @@ queue_names(Records) ->
#resource{name = Name} = amqqueue:get_name(Q),
Name
end || Q <- Records].
+
+
+validate_queue(Ch, Queue, ExpectedMsgs) ->
+ qos(Ch, length(ExpectedMsgs), false),
+ subscribe(Ch, Queue, false),
+ [begin
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = false},
+ #amqp_msg{payload = M}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = false})
+ after 2000 ->
+ flush(10),
+ exit({validate_queue_timeout, M})
+ end
+ end || M <- ExpectedMsgs],
+ ok.
diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl
index fb98047530..2c5b15295f 100644
--- a/deps/rabbit/test/rabbit_fifo_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl
@@ -503,6 +503,27 @@ discarded_message_with_dead_letter_handler_emits_log_effect_test(_) ->
?ASSERT_EFF({log, _RaftIdxs, _}, Effects2),
ok.
+mixed_send_msg_and_log_effects_are_correctly_ordered_test(_) ->
+ Cid = {cid(?FUNCTION_NAME), self()},
+ State00 = init(#{name => test,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
+ max_in_memory_length =>1,
+ dead_letter_handler =>
+ {somemod, somefun, [somearg]}}),
+ %% enqueue two messages
+ {State0, _} = enq(1, 1, first, State00),
+ {State1, _} = enq(2, 2, snd, State0),
+
+ {_State2, Effects1} = check_n(Cid, 3, 10, State1),
+ ct:pal("Effects ~w", [Effects1]),
+ %% in this case we expect no send_msg effect as any in memory messages
+ %% should be weaved into the send_msg effect emitted by the log effect
+ %% later. hence this is all we can assert on
+ %% as we need to send message is in the correct order to the consuming
+ %% channel or the channel may think a message has been lost in transit
+ ?ASSERT_NO_EFF({send_msg, _, _, _}, Effects1),
+ ok.
+
tick_test(_) ->
Cid = {<<"c">>, self()},
Cid2 = {<<"c2">>, self()},
@@ -1712,3 +1733,6 @@ apply(Meta, Entry, State) -> rabbit_fifo:apply(Meta, Entry, State).
init_aux(Conf) -> rabbit_fifo:init_aux(Conf).
handle_aux(S, T, C, A, L, M) -> rabbit_fifo:handle_aux(S, T, C, A, L, M).
make_checkout(C, S, M) -> rabbit_fifo:make_checkout(C, S, M).
+
+cid(A) when is_atom(A) ->
+ atom_to_binary(A, utf8).