From b831dbc88bffab8707140362e84f2e6b23fc3b0e Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 12 Nov 2021 15:59:51 +0000 Subject: QQ: better handle repeated requeues. Repeated re-queues are a problem for quorums queues. A consumer that repeatedly re-queues all messages may eventually cause the quorum queue to run out of messages as the log cannot be truncated if messages are not consumed in a fifo-ish order. This change addresses this as follows: All return commands are not send directly to the log but instead are sent to the aux machine which evaluates how the return should be processed. After this decision it will use the new "append" effect to add it to the log as before. If the queue has a delivery_limit configured the return command will be appended as before and the message will be returned to the front of the queue as before. This is safe as eventually the message will be dropped or dead-letter when it reaches it's delivery limit. If the queue has _not_ configured a delivery_limit the return will be turned into a new command that includes the original message in full and will be returne to the back of the queue. This ensure that messages in the queue will be cycled in fifo-ish order and thus snapshots will be taken. --- deps/rabbit/src/rabbit_fifo.erl | 124 ++++++++++++++++++++++++++++++-- deps/rabbit/src/rabbit_fifo.hrl | 1 + deps/rabbit/src/rabbit_fifo_client.erl | 14 ++++ deps/rabbit/src/rabbit_quorum_queue.erl | 21 +++++- deps/rabbit/test/quorum_queue_SUITE.erl | 18 +++-- erlang_ls.config | 22 +++--- 6 files changed, 177 insertions(+), 23 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index fe5adb029c..c7cbb55aa6 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -1,4 +1,4 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public +%% This Source Code Form is subject tconsumer_ido the terms of the Mozilla Public %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% @@ -71,6 +71,9 @@ -record(enqueue, {pid :: option(pid()), seq :: option(msg_seqno()), msg :: raw_msg()}). +-record(requeue, {consumer_id :: consumer_id(), + msg_id :: msg_id(), + msg :: indexed_msg()}). -record(register_enqueuer, {pid :: pid()}). -record(checkout, {consumer_id :: consumer_id(), spec :: checkout_spec(), @@ -92,6 +95,7 @@ -opaque protocol() :: #enqueue{} | + #requeue{} | #register_enqueuer{} | #checkout{} | #settle{} | @@ -226,6 +230,52 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, _ -> {State, ok} end; +apply(#{index := Idx} = Meta, + #requeue{consumer_id = ConsumerId, + msg_id = MsgId, + %% as we read the message from disk it is already + %% an inmemory message + msg = ?INDEX_MSG(OldIdx, ?MSG(_Header, _RawMsg) = Msg)}, + #?MODULE{consumers = Cons0, + messages = Messages, + ra_indexes = Indexes0} = State00) -> + case Cons0 of + #{ConsumerId := #consumer{checked_out = Checked0} = Con0} + when is_map_key(MsgId, Checked0) -> + %% construct an index message with the current raft index + %% and update delivery count before adding it to the message queue + ?INDEX_MSG(_, ?MSG(Header, _)) = IdxMsg0 = + update_msg_header(delivery_count, fun incr/1, 1, ?INDEX_MSG(Idx, Msg)), + + State0 = add_bytes_return(Header, State00), + {State1, IdxMsg} = + case evaluate_memory_limit(Header, State0) of + true -> + % indexed message with header map + {State0, ?INDEX_MSG(Idx, ?DISK_MSG(Header))}; + false -> + {add_in_memory_counts(Header, State0), IdxMsg0} + end, + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0), + credit = increase_credit(Con0, 1)}, + State2 = update_or_remove_sub( + Meta, + ConsumerId, + Con, + State1#?MODULE{ra_indexes = rabbit_fifo_index:delete(OldIdx, Indexes0), + messages = lqueue:in(IdxMsg, Messages)}), + + %% We have to increment the enqueue counter to ensure release cursors + %% are generated + State3 = incr_enqueue_count(State2), + + {State, Ret, Effs} = checkout(Meta, State0, State3, []), + update_smallest_raft_index(Idx, Ret, + maybe_store_dehydrated_state(Idx, State), + Effs); + _ -> + {State00, ok} + end; apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0, @@ -861,11 +911,48 @@ init_aux(Name) when is_atom(Name) -> capacity = {inactive, Now, 1, 1.0}}. handle_aux(leader, _, garbage_collection, State, Log, MacState) -> - % ra_log_wal:force_roll_over(ra_log_wal), {no_reply, force_eval_gc(Log, MacState, State), Log}; handle_aux(follower, _, garbage_collection, State, Log, MacState) -> - % ra_log_wal:force_roll_over(ra_log_wal), {no_reply, force_eval_gc(Log, MacState, State), Log}; +handle_aux(leader, cast, {#return{msg_ids = MsgIds, + consumer_id = ConsumerId}, Corr, Pid}, + Aux0, Log0, #?MODULE{cfg = #cfg{delivery_limit = undefined}, + consumers = Consumers, + ra_indexes = _Indexes}) -> + case Consumers of + #{ConsumerId := #consumer{checked_out = Checked}} -> + {Log, ToReturn} = + maps:fold( + fun (MsgId, ?INDEX_MSG(Idx, ?DISK_MSG(Header)), {L0, Acc}) -> + %% it is possible this is not found if the consumer + %% crashed and the message got removed + %% TODO: handle when log entry is not found + case ra_log:fetch(Idx, L0) of + {{_, _, {_, _, Cmd, _}}, L} -> + Msg = case Cmd of + #enqueue{msg = M} -> M; + #requeue{msg = ?INDEX_MSG(_, ?MSG(_H, M))} -> + M + end, + IdxMsg = ?INDEX_MSG(Idx, ?MSG(Header, Msg)), + {L, [{MsgId, IdxMsg} | Acc]}; + {undefined, L} -> + {L, Acc} + end; + (MsgId, IdxMsg, {L0, Acc}) -> + {L0, [{MsgId, IdxMsg} | Acc]} + end, {Log0, []}, maps:with(MsgIds, Checked)), + + Appends = make_requeue(ConsumerId, {notify, Corr, Pid}, + lists:sort(ToReturn), []), + {no_reply, Aux0, Log, Appends}; + _ -> + {no_reply, Aux0, Log0} + end; +handle_aux(leader, cast, {#return{} = Ret, Corr, Pid}, + Aux0, Log, #?MODULE{}) -> + %% for returns with a delivery limit set we can just return as before + {no_reply, Aux0, Log, [{append, Ret, {notify, Corr, Pid}}]}; handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) -> {no_reply, Aux0, Log}; handle_aux(_RaState, cast, Cmd, #aux{capacity = Use0} = Aux0, @@ -1580,7 +1667,7 @@ return_one(Meta, MsgId, Msg0, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) -> #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), - Msg = update_msg_header(delivery_count, fun (C) -> C + 1 end, 1, Msg0), + Msg = update_msg_header(delivery_count, fun incr/1, 1, Msg0), Header = get_msg_header(Msg), case get_header(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> @@ -1809,7 +1896,11 @@ delivery_effect({CTag, CPid}, IdxMsgs, InMemMsgs) -> {RaftIdxs, Data} = lists:unzip(IdxMsgs), {log, RaftIdxs, fun(Log) -> - Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> + Msgs0 = lists:zipwith(fun + (#enqueue{msg = Msg}, {MsgId, Header}) -> + {MsgId, {Header, Msg}}; + (#requeue{msg = ?INDEX_MSG(_, ?MSG(_, Msg))}, + {MsgId, Header}) -> {MsgId, {Header, Msg}} end, Log, Data), Msgs = case InMemMsgs of @@ -1839,7 +1930,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, {ConsumerMsg, State0} -> %% there are consumers waiting to be serviced %% process consumer checkout - case maps:get(ConsumerId, Cons0) of + case maps:get(ConsumerId, Cons0, error) of #consumer{credit = 0} -> %% no credit but was still on queue %% can happen when draining @@ -2299,3 +2390,24 @@ smallest_raft_index(#?MODULE{cfg = _Cfg, {undefined, State} end end. + +make_requeue(ConsumerId, Notify, [{MsgId, Msg}], Acc) -> + lists:reverse([{append, + #requeue{consumer_id = ConsumerId, + msg_id = MsgId, + msg = Msg}, + Notify} + | Acc]); +make_requeue(ConsumerId, Notify, [{MsgId, Msg} | Rem], Acc) -> + make_requeue(ConsumerId, Notify, Rem, + [{append, + #requeue{consumer_id = ConsumerId, + msg_id = MsgId, + msg = Msg}, + noreply} + | Acc]); +make_requeue(_ConsumerId, _Notify, [], []) -> + []. + +incr(I) -> + I + 1. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index c797c9d9bd..95e7003460 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -122,6 +122,7 @@ -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list + unused, pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], status = up :: up | suspected_down, diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 7784e15e60..e102ad2207 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -828,6 +828,19 @@ next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) -> consumer_id(ConsumerTag) -> {ConsumerTag, self()}. +send_command(Server, Correlation, Command, _Priority, + #state{pending = Pending, + cfg = #cfg{soft_limit = SftLmt}} = State0) + when element(1, Command) == return -> + %% returns are sent to the aux machine for pre-evaluation + {Seq, State} = next_seq(State0), + ok = ra:cast_aux_command(Server, {Command, Seq, self()}), + Tag = case maps:size(Pending) >= SftLmt of + true -> slow; + false -> ok + end, + {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, + slow = Tag == slow}}; send_command(Server, Correlation, Command, Priority, #state{pending = Pending, cfg = #cfg{soft_limit = SftLmt}} = State0) -> @@ -840,6 +853,7 @@ send_command(Server, Correlation, Command, Priority, {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, slow = Tag == slow}}. + resend_command(Node, Correlation, Command, #state{pending = Pending} = State0) -> {Seq, State} = next_seq(State0), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 0131d1538b..fc1d0b5a54 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -826,7 +826,8 @@ deliver(true, Delivery, QState0) -> rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, Delivery#delivery.message, QState0). -deliver(QSs, #delivery{confirm = Confirm} = Delivery) -> +deliver(QSs, #delivery{confirm = Confirm} = Delivery0) -> + Delivery = clean_delivery(Delivery0), lists:foldl( fun({Q, stateless}, {Qs, Actions}) -> QRef = amqqueue:get_pid(Q), @@ -1610,3 +1611,21 @@ notify_decorators(QName, F, A) -> {error, not_found} -> ok end. + +%% remove any data that a quorum queue doesn't need +clean_delivery(#delivery{message = + #basic_message{content = Content0} = Msg} = Delivery) -> + Content = case Content0 of + #content{properties = none} -> + Content0; + #content{protocol = none} -> + Content0; + #content{properties = Props, + protocol = Proto} -> + Content0#content{properties = none, + properties_bin = Proto:encode_properties(Props)} + end, + + Delivery#delivery{message = Msg#basic_message{%%id = undefined, + content = Content}}. + diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 1de9d1f5db..09f473cb4f 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -81,11 +81,12 @@ groups() -> quorum_cluster_size_7, node_removal_is_not_quorum_critical ]}, - {clustered_with_partitions, [], [ - reconnect_consumer_and_publish, - reconnect_consumer_and_wait, - reconnect_consumer_and_wait_channel_down - ]} + {clustered_with_partitions, [], + [ + reconnect_consumer_and_publish, + reconnect_consumer_and_wait, + reconnect_consumer_and_wait_channel_down + ]} ]} ]. @@ -1889,7 +1890,11 @@ subscribe_redelivery_count(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + declare(Ch, QQ, + [ + {<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 0} + ])), RaName = ra_name(QQ), publish(Ch, QQ), @@ -1919,6 +1924,7 @@ subscribe_redelivery_count(Config) -> multiple = false, requeue = true}) after 5000 -> + flush(1), exit(basic_deliver_timeout_2) end, diff --git a/erlang_ls.config b/erlang_ls.config index 10b5073efa..5fc0d8b7e3 100644 --- a/erlang_ls.config +++ b/erlang_ls.config @@ -1,25 +1,27 @@ # otp_path: "/path/to/otp/lib/erlang" deps_dirs: - "deps/*" - - "deps/rabbit/apps/*" diagnostics: - disabled: [] + disabled: + - bound_var_in_pattern enabled: - crossref - dialyzer - - elvis + # - elvis - compiler include_dirs: - "deps" - "deps/*/include" -# lenses: -# enabled: -# - ct-run-test -# - show-behaviour-usages -# disabled: [] +lenses: + enabled: + - ct-run-test + - show-behaviour-usages + - suggest-spec + - function-references + disabled: [] # macros: # - name: DEFINED_WITH_VALUE # value: 42 -code_reload: - node: rabbit@localhost +# code_reload: +# node: rabbit@nkarl-a02 plt_path: .rabbitmq_server_release.plt -- cgit v1.2.1