diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-12 15:59:51 +0000 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-11-26 16:12:37 +0000 |
commit | b831dbc88bffab8707140362e84f2e6b23fc3b0e (patch) | |
tree | 2466c7814fa78bf035af0308cc19b964d9fc15ef | |
parent | e836217959c4ec8e3b7b0983644df269385c53e0 (diff) | |
download | rabbitmq-server-git-safer-requeue.tar.gz |
QQ: better handle repeated requeues.safer-requeue
Repeated re-queues are a problem for quorums queues. A consumer
that repeatedly re-queues all messages may eventually cause the
quorum queue to run out of messages as the log cannot be truncated
if messages are not consumed in a fifo-ish order.
This change addresses this as follows:
All return commands are not send directly to the log but instead
are sent to the aux machine which evaluates how the return should be
processed. After this decision it will use the new "append" effect
to add it to the log as before.
If the queue has a delivery_limit configured the return command
will be appended as before and the message will be returned
to the front of the queue as before. This is safe as eventually
the message will be dropped or dead-letter when it reaches
it's delivery limit.
If the queue has _not_ configured a delivery_limit the return
will be turned into a new command that includes the original
message in full and will be returne to the back of the queue.
This ensure that messages in the queue will be cycled in fifo-ish
order and thus snapshots will be taken.
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 124 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.hrl | 1 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 14 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 21 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 18 | ||||
-rw-r--r-- | erlang_ls.config | 22 |
6 files changed, 177 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index fe5adb029c..c7cbb55aa6 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -1,4 +1,4 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public +%% This Source Code Form is subject tconsumer_ido the terms of the Mozilla Public %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% @@ -71,6 +71,9 @@ -record(enqueue, {pid :: option(pid()), seq :: option(msg_seqno()), msg :: raw_msg()}). +-record(requeue, {consumer_id :: consumer_id(), + msg_id :: msg_id(), + msg :: indexed_msg()}). -record(register_enqueuer, {pid :: pid()}). -record(checkout, {consumer_id :: consumer_id(), spec :: checkout_spec(), @@ -92,6 +95,7 @@ -opaque protocol() :: #enqueue{} | + #requeue{} | #register_enqueuer{} | #checkout{} | #settle{} | @@ -226,6 +230,52 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, _ -> {State, ok} end; +apply(#{index := Idx} = Meta, + #requeue{consumer_id = ConsumerId, + msg_id = MsgId, + %% as we read the message from disk it is already + %% an inmemory message + msg = ?INDEX_MSG(OldIdx, ?MSG(_Header, _RawMsg) = Msg)}, + #?MODULE{consumers = Cons0, + messages = Messages, + ra_indexes = Indexes0} = State00) -> + case Cons0 of + #{ConsumerId := #consumer{checked_out = Checked0} = Con0} + when is_map_key(MsgId, Checked0) -> + %% construct an index message with the current raft index + %% and update delivery count before adding it to the message queue + ?INDEX_MSG(_, ?MSG(Header, _)) = IdxMsg0 = + update_msg_header(delivery_count, fun incr/1, 1, ?INDEX_MSG(Idx, Msg)), + + State0 = add_bytes_return(Header, State00), + {State1, IdxMsg} = + case evaluate_memory_limit(Header, State0) of + true -> + % indexed message with header map + {State0, ?INDEX_MSG(Idx, ?DISK_MSG(Header))}; + false -> + {add_in_memory_counts(Header, State0), IdxMsg0} + end, + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0), + credit = increase_credit(Con0, 1)}, + State2 = update_or_remove_sub( + Meta, + ConsumerId, + Con, + State1#?MODULE{ra_indexes = rabbit_fifo_index:delete(OldIdx, Indexes0), + messages = lqueue:in(IdxMsg, Messages)}), + + %% We have to increment the enqueue counter to ensure release cursors + %% are generated + State3 = incr_enqueue_count(State2), + + {State, Ret, Effs} = checkout(Meta, State0, State3, []), + update_smallest_raft_index(Idx, Ret, + maybe_store_dehydrated_state(Idx, State), + Effs); + _ -> + {State00, ok} + end; apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0, @@ -861,11 +911,48 @@ init_aux(Name) when is_atom(Name) -> capacity = {inactive, Now, 1, 1.0}}. handle_aux(leader, _, garbage_collection, State, Log, MacState) -> - % ra_log_wal:force_roll_over(ra_log_wal), {no_reply, force_eval_gc(Log, MacState, State), Log}; handle_aux(follower, _, garbage_collection, State, Log, MacState) -> - % ra_log_wal:force_roll_over(ra_log_wal), {no_reply, force_eval_gc(Log, MacState, State), Log}; +handle_aux(leader, cast, {#return{msg_ids = MsgIds, + consumer_id = ConsumerId}, Corr, Pid}, + Aux0, Log0, #?MODULE{cfg = #cfg{delivery_limit = undefined}, + consumers = Consumers, + ra_indexes = _Indexes}) -> + case Consumers of + #{ConsumerId := #consumer{checked_out = Checked}} -> + {Log, ToReturn} = + maps:fold( + fun (MsgId, ?INDEX_MSG(Idx, ?DISK_MSG(Header)), {L0, Acc}) -> + %% it is possible this is not found if the consumer + %% crashed and the message got removed + %% TODO: handle when log entry is not found + case ra_log:fetch(Idx, L0) of + {{_, _, {_, _, Cmd, _}}, L} -> + Msg = case Cmd of + #enqueue{msg = M} -> M; + #requeue{msg = ?INDEX_MSG(_, ?MSG(_H, M))} -> + M + end, + IdxMsg = ?INDEX_MSG(Idx, ?MSG(Header, Msg)), + {L, [{MsgId, IdxMsg} | Acc]}; + {undefined, L} -> + {L, Acc} + end; + (MsgId, IdxMsg, {L0, Acc}) -> + {L0, [{MsgId, IdxMsg} | Acc]} + end, {Log0, []}, maps:with(MsgIds, Checked)), + + Appends = make_requeue(ConsumerId, {notify, Corr, Pid}, + lists:sort(ToReturn), []), + {no_reply, Aux0, Log, Appends}; + _ -> + {no_reply, Aux0, Log0} + end; +handle_aux(leader, cast, {#return{} = Ret, Corr, Pid}, + Aux0, Log, #?MODULE{}) -> + %% for returns with a delivery limit set we can just return as before + {no_reply, Aux0, Log, [{append, Ret, {notify, Corr, Pid}}]}; handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) -> {no_reply, Aux0, Log}; handle_aux(_RaState, cast, Cmd, #aux{capacity = Use0} = Aux0, @@ -1580,7 +1667,7 @@ return_one(Meta, MsgId, Msg0, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) -> #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), - Msg = update_msg_header(delivery_count, fun (C) -> C + 1 end, 1, Msg0), + Msg = update_msg_header(delivery_count, fun incr/1, 1, Msg0), Header = get_msg_header(Msg), case get_header(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> @@ -1809,7 +1896,11 @@ delivery_effect({CTag, CPid}, IdxMsgs, InMemMsgs) -> {RaftIdxs, Data} = lists:unzip(IdxMsgs), {log, RaftIdxs, fun(Log) -> - Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> + Msgs0 = lists:zipwith(fun + (#enqueue{msg = Msg}, {MsgId, Header}) -> + {MsgId, {Header, Msg}}; + (#requeue{msg = ?INDEX_MSG(_, ?MSG(_, Msg))}, + {MsgId, Header}) -> {MsgId, {Header, Msg}} end, Log, Data), Msgs = case InMemMsgs of @@ -1839,7 +1930,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, {ConsumerMsg, State0} -> %% there are consumers waiting to be serviced %% process consumer checkout - case maps:get(ConsumerId, Cons0) of + case maps:get(ConsumerId, Cons0, error) of #consumer{credit = 0} -> %% no credit but was still on queue %% can happen when draining @@ -2299,3 +2390,24 @@ smallest_raft_index(#?MODULE{cfg = _Cfg, {undefined, State} end end. + +make_requeue(ConsumerId, Notify, [{MsgId, Msg}], Acc) -> + lists:reverse([{append, + #requeue{consumer_id = ConsumerId, + msg_id = MsgId, + msg = Msg}, + Notify} + | Acc]); +make_requeue(ConsumerId, Notify, [{MsgId, Msg} | Rem], Acc) -> + make_requeue(ConsumerId, Notify, Rem, + [{append, + #requeue{consumer_id = ConsumerId, + msg_id = MsgId, + msg = Msg}, + noreply} + | Acc]); +make_requeue(_ConsumerId, _Notify, [], []) -> + []. + +incr(I) -> + I + 1. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index c797c9d9bd..95e7003460 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -122,6 +122,7 @@ -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list + unused, pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], status = up :: up | suspected_down, diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 7784e15e60..e102ad2207 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -828,6 +828,19 @@ next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) -> consumer_id(ConsumerTag) -> {ConsumerTag, self()}. +send_command(Server, Correlation, Command, _Priority, + #state{pending = Pending, + cfg = #cfg{soft_limit = SftLmt}} = State0) + when element(1, Command) == return -> + %% returns are sent to the aux machine for pre-evaluation + {Seq, State} = next_seq(State0), + ok = ra:cast_aux_command(Server, {Command, Seq, self()}), + Tag = case maps:size(Pending) >= SftLmt of + true -> slow; + false -> ok + end, + {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, + slow = Tag == slow}}; send_command(Server, Correlation, Command, Priority, #state{pending = Pending, cfg = #cfg{soft_limit = SftLmt}} = State0) -> @@ -840,6 +853,7 @@ send_command(Server, Correlation, Command, Priority, {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, slow = Tag == slow}}. + resend_command(Node, Correlation, Command, #state{pending = Pending} = State0) -> {Seq, State} = next_seq(State0), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 0131d1538b..fc1d0b5a54 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -826,7 +826,8 @@ deliver(true, Delivery, QState0) -> rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, Delivery#delivery.message, QState0). -deliver(QSs, #delivery{confirm = Confirm} = Delivery) -> +deliver(QSs, #delivery{confirm = Confirm} = Delivery0) -> + Delivery = clean_delivery(Delivery0), lists:foldl( fun({Q, stateless}, {Qs, Actions}) -> QRef = amqqueue:get_pid(Q), @@ -1610,3 +1611,21 @@ notify_decorators(QName, F, A) -> {error, not_found} -> ok end. + +%% remove any data that a quorum queue doesn't need +clean_delivery(#delivery{message = + #basic_message{content = Content0} = Msg} = Delivery) -> + Content = case Content0 of + #content{properties = none} -> + Content0; + #content{protocol = none} -> + Content0; + #content{properties = Props, + protocol = Proto} -> + Content0#content{properties = none, + properties_bin = Proto:encode_properties(Props)} + end, + + Delivery#delivery{message = Msg#basic_message{%%id = undefined, + content = Content}}. + diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 1de9d1f5db..09f473cb4f 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -81,11 +81,12 @@ groups() -> quorum_cluster_size_7, node_removal_is_not_quorum_critical ]}, - {clustered_with_partitions, [], [ - reconnect_consumer_and_publish, - reconnect_consumer_and_wait, - reconnect_consumer_and_wait_channel_down - ]} + {clustered_with_partitions, [], + [ + reconnect_consumer_and_publish, + reconnect_consumer_and_wait, + reconnect_consumer_and_wait_channel_down + ]} ]} ]. @@ -1889,7 +1890,11 @@ subscribe_redelivery_count(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + declare(Ch, QQ, + [ + {<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 0} + ])), RaName = ra_name(QQ), publish(Ch, QQ), @@ -1919,6 +1924,7 @@ subscribe_redelivery_count(Config) -> multiple = false, requeue = true}) after 5000 -> + flush(1), exit(basic_deliver_timeout_2) end, diff --git a/erlang_ls.config b/erlang_ls.config index 10b5073efa..5fc0d8b7e3 100644 --- a/erlang_ls.config +++ b/erlang_ls.config @@ -1,25 +1,27 @@ # otp_path: "/path/to/otp/lib/erlang" deps_dirs: - "deps/*" - - "deps/rabbit/apps/*" diagnostics: - disabled: [] + disabled: + - bound_var_in_pattern enabled: - crossref - dialyzer - - elvis + # - elvis - compiler include_dirs: - "deps" - "deps/*/include" -# lenses: -# enabled: -# - ct-run-test -# - show-behaviour-usages -# disabled: [] +lenses: + enabled: + - ct-run-test + - show-behaviour-usages + - suggest-spec + - function-references + disabled: [] # macros: # - name: DEFINED_WITH_VALUE # value: 42 -code_reload: - node: rabbit@localhost +# code_reload: +# node: rabbit@nkarl-a02 plt_path: .rabbitmq_server_release.plt |