summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-11-12 15:59:51 +0000
committerKarl Nilsson <kjnilsson@gmail.com>2021-11-26 16:12:37 +0000
commitb831dbc88bffab8707140362e84f2e6b23fc3b0e (patch)
tree2466c7814fa78bf035af0308cc19b964d9fc15ef
parente836217959c4ec8e3b7b0983644df269385c53e0 (diff)
downloadrabbitmq-server-git-safer-requeue.tar.gz
QQ: better handle repeated requeues.safer-requeue
Repeated re-queues are a problem for quorums queues. A consumer that repeatedly re-queues all messages may eventually cause the quorum queue to run out of messages as the log cannot be truncated if messages are not consumed in a fifo-ish order. This change addresses this as follows: All return commands are not send directly to the log but instead are sent to the aux machine which evaluates how the return should be processed. After this decision it will use the new "append" effect to add it to the log as before. If the queue has a delivery_limit configured the return command will be appended as before and the message will be returned to the front of the queue as before. This is safe as eventually the message will be dropped or dead-letter when it reaches it's delivery limit. If the queue has _not_ configured a delivery_limit the return will be turned into a new command that includes the original message in full and will be returne to the back of the queue. This ensure that messages in the queue will be cycled in fifo-ish order and thus snapshots will be taken.
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl124
-rw-r--r--deps/rabbit/src/rabbit_fifo.hrl1
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl14
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl21
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl18
-rw-r--r--erlang_ls.config22
6 files changed, 177 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index fe5adb029c..c7cbb55aa6 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -1,4 +1,4 @@
-%% This Source Code Form is subject to the terms of the Mozilla Public
+%% This Source Code Form is subject tconsumer_ido the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
@@ -71,6 +71,9 @@
-record(enqueue, {pid :: option(pid()),
seq :: option(msg_seqno()),
msg :: raw_msg()}).
+-record(requeue, {consumer_id :: consumer_id(),
+ msg_id :: msg_id(),
+ msg :: indexed_msg()}).
-record(register_enqueuer, {pid :: pid()}).
-record(checkout, {consumer_id :: consumer_id(),
spec :: checkout_spec(),
@@ -92,6 +95,7 @@
-opaque protocol() ::
#enqueue{} |
+ #requeue{} |
#register_enqueuer{} |
#checkout{} |
#settle{} |
@@ -226,6 +230,52 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
_ ->
{State, ok}
end;
+apply(#{index := Idx} = Meta,
+ #requeue{consumer_id = ConsumerId,
+ msg_id = MsgId,
+ %% as we read the message from disk it is already
+ %% an inmemory message
+ msg = ?INDEX_MSG(OldIdx, ?MSG(_Header, _RawMsg) = Msg)},
+ #?MODULE{consumers = Cons0,
+ messages = Messages,
+ ra_indexes = Indexes0} = State00) ->
+ case Cons0 of
+ #{ConsumerId := #consumer{checked_out = Checked0} = Con0}
+ when is_map_key(MsgId, Checked0) ->
+ %% construct an index message with the current raft index
+ %% and update delivery count before adding it to the message queue
+ ?INDEX_MSG(_, ?MSG(Header, _)) = IdxMsg0 =
+ update_msg_header(delivery_count, fun incr/1, 1, ?INDEX_MSG(Idx, Msg)),
+
+ State0 = add_bytes_return(Header, State00),
+ {State1, IdxMsg} =
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ % indexed message with header map
+ {State0, ?INDEX_MSG(Idx, ?DISK_MSG(Header))};
+ false ->
+ {add_in_memory_counts(Header, State0), IdxMsg0}
+ end,
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0),
+ credit = increase_credit(Con0, 1)},
+ State2 = update_or_remove_sub(
+ Meta,
+ ConsumerId,
+ Con,
+ State1#?MODULE{ra_indexes = rabbit_fifo_index:delete(OldIdx, Indexes0),
+ messages = lqueue:in(IdxMsg, Messages)}),
+
+ %% We have to increment the enqueue counter to ensure release cursors
+ %% are generated
+ State3 = incr_enqueue_count(State2),
+
+ {State, Ret, Effs} = checkout(Meta, State0, State3, []),
+ update_smallest_raft_index(Idx, Ret,
+ maybe_store_dehydrated_state(Idx, State),
+ Effs);
+ _ ->
+ {State00, ok}
+ end;
apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
drain = Drain, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0,
@@ -861,11 +911,48 @@ init_aux(Name) when is_atom(Name) ->
capacity = {inactive, Now, 1, 1.0}}.
handle_aux(leader, _, garbage_collection, State, Log, MacState) ->
- % ra_log_wal:force_roll_over(ra_log_wal),
{no_reply, force_eval_gc(Log, MacState, State), Log};
handle_aux(follower, _, garbage_collection, State, Log, MacState) ->
- % ra_log_wal:force_roll_over(ra_log_wal),
{no_reply, force_eval_gc(Log, MacState, State), Log};
+handle_aux(leader, cast, {#return{msg_ids = MsgIds,
+ consumer_id = ConsumerId}, Corr, Pid},
+ Aux0, Log0, #?MODULE{cfg = #cfg{delivery_limit = undefined},
+ consumers = Consumers,
+ ra_indexes = _Indexes}) ->
+ case Consumers of
+ #{ConsumerId := #consumer{checked_out = Checked}} ->
+ {Log, ToReturn} =
+ maps:fold(
+ fun (MsgId, ?INDEX_MSG(Idx, ?DISK_MSG(Header)), {L0, Acc}) ->
+ %% it is possible this is not found if the consumer
+ %% crashed and the message got removed
+ %% TODO: handle when log entry is not found
+ case ra_log:fetch(Idx, L0) of
+ {{_, _, {_, _, Cmd, _}}, L} ->
+ Msg = case Cmd of
+ #enqueue{msg = M} -> M;
+ #requeue{msg = ?INDEX_MSG(_, ?MSG(_H, M))} ->
+ M
+ end,
+ IdxMsg = ?INDEX_MSG(Idx, ?MSG(Header, Msg)),
+ {L, [{MsgId, IdxMsg} | Acc]};
+ {undefined, L} ->
+ {L, Acc}
+ end;
+ (MsgId, IdxMsg, {L0, Acc}) ->
+ {L0, [{MsgId, IdxMsg} | Acc]}
+ end, {Log0, []}, maps:with(MsgIds, Checked)),
+
+ Appends = make_requeue(ConsumerId, {notify, Corr, Pid},
+ lists:sort(ToReturn), []),
+ {no_reply, Aux0, Log, Appends};
+ _ ->
+ {no_reply, Aux0, Log0}
+ end;
+handle_aux(leader, cast, {#return{} = Ret, Corr, Pid},
+ Aux0, Log, #?MODULE{}) ->
+ %% for returns with a delivery limit set we can just return as before
+ {no_reply, Aux0, Log, [{append, Ret, {notify, Corr, Pid}}]};
handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) ->
{no_reply, Aux0, Log};
handle_aux(_RaState, cast, Cmd, #aux{capacity = Use0} = Aux0,
@@ -1580,7 +1667,7 @@ return_one(Meta, MsgId, Msg0,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId) ->
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
- Msg = update_msg_header(delivery_count, fun (C) -> C + 1 end, 1, Msg0),
+ Msg = update_msg_header(delivery_count, fun incr/1, 1, Msg0),
Header = get_msg_header(Msg),
case get_header(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
@@ -1809,7 +1896,11 @@ delivery_effect({CTag, CPid}, IdxMsgs, InMemMsgs) ->
{RaftIdxs, Data} = lists:unzip(IdxMsgs),
{log, RaftIdxs,
fun(Log) ->
- Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ Msgs0 = lists:zipwith(fun
+ (#enqueue{msg = Msg}, {MsgId, Header}) ->
+ {MsgId, {Header, Msg}};
+ (#requeue{msg = ?INDEX_MSG(_, ?MSG(_, Msg))},
+ {MsgId, Header}) ->
{MsgId, {Header, Msg}}
end, Log, Data),
Msgs = case InMemMsgs of
@@ -1839,7 +1930,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
{ConsumerMsg, State0} ->
%% there are consumers waiting to be serviced
%% process consumer checkout
- case maps:get(ConsumerId, Cons0) of
+ case maps:get(ConsumerId, Cons0, error) of
#consumer{credit = 0} ->
%% no credit but was still on queue
%% can happen when draining
@@ -2299,3 +2390,24 @@ smallest_raft_index(#?MODULE{cfg = _Cfg,
{undefined, State}
end
end.
+
+make_requeue(ConsumerId, Notify, [{MsgId, Msg}], Acc) ->
+ lists:reverse([{append,
+ #requeue{consumer_id = ConsumerId,
+ msg_id = MsgId,
+ msg = Msg},
+ Notify}
+ | Acc]);
+make_requeue(ConsumerId, Notify, [{MsgId, Msg} | Rem], Acc) ->
+ make_requeue(ConsumerId, Notify, Rem,
+ [{append,
+ #requeue{consumer_id = ConsumerId,
+ msg_id = MsgId,
+ msg = Msg},
+ noreply}
+ | Acc]);
+make_requeue(_ConsumerId, _Notify, [], []) ->
+ [].
+
+incr(I) ->
+ I + 1.
diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl
index c797c9d9bd..95e7003460 100644
--- a/deps/rabbit/src/rabbit_fifo.hrl
+++ b/deps/rabbit/src/rabbit_fifo.hrl
@@ -122,6 +122,7 @@
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
+ unused,
pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
status = up :: up |
suspected_down,
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index 7784e15e60..e102ad2207 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -828,6 +828,19 @@ next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) ->
consumer_id(ConsumerTag) ->
{ConsumerTag, self()}.
+send_command(Server, Correlation, Command, _Priority,
+ #state{pending = Pending,
+ cfg = #cfg{soft_limit = SftLmt}} = State0)
+ when element(1, Command) == return ->
+ %% returns are sent to the aux machine for pre-evaluation
+ {Seq, State} = next_seq(State0),
+ ok = ra:cast_aux_command(Server, {Command, Seq, self()}),
+ Tag = case maps:size(Pending) >= SftLmt of
+ true -> slow;
+ false -> ok
+ end,
+ {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}},
+ slow = Tag == slow}};
send_command(Server, Correlation, Command, Priority,
#state{pending = Pending,
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
@@ -840,6 +853,7 @@ send_command(Server, Correlation, Command, Priority,
{Tag, State#state{pending = Pending#{Seq => {Correlation, Command}},
slow = Tag == slow}}.
+
resend_command(Node, Correlation, Command,
#state{pending = Pending} = State0) ->
{Seq, State} = next_seq(State0),
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 0131d1538b..fc1d0b5a54 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -826,7 +826,8 @@ deliver(true, Delivery, QState0) ->
rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
Delivery#delivery.message, QState0).
-deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
+deliver(QSs, #delivery{confirm = Confirm} = Delivery0) ->
+ Delivery = clean_delivery(Delivery0),
lists:foldl(
fun({Q, stateless}, {Qs, Actions}) ->
QRef = amqqueue:get_pid(Q),
@@ -1610,3 +1611,21 @@ notify_decorators(QName, F, A) ->
{error, not_found} ->
ok
end.
+
+%% remove any data that a quorum queue doesn't need
+clean_delivery(#delivery{message =
+ #basic_message{content = Content0} = Msg} = Delivery) ->
+ Content = case Content0 of
+ #content{properties = none} ->
+ Content0;
+ #content{protocol = none} ->
+ Content0;
+ #content{properties = Props,
+ protocol = Proto} ->
+ Content0#content{properties = none,
+ properties_bin = Proto:encode_properties(Props)}
+ end,
+
+ Delivery#delivery{message = Msg#basic_message{%%id = undefined,
+ content = Content}}.
+
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 1de9d1f5db..09f473cb4f 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -81,11 +81,12 @@ groups() ->
quorum_cluster_size_7,
node_removal_is_not_quorum_critical
]},
- {clustered_with_partitions, [], [
- reconnect_consumer_and_publish,
- reconnect_consumer_and_wait,
- reconnect_consumer_and_wait_channel_down
- ]}
+ {clustered_with_partitions, [],
+ [
+ reconnect_consumer_and_publish,
+ reconnect_consumer_and_wait,
+ reconnect_consumer_and_wait_channel_down
+ ]}
]}
].
@@ -1889,7 +1890,11 @@ subscribe_redelivery_count(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ declare(Ch, QQ,
+ [
+ {<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 0}
+ ])),
RaName = ra_name(QQ),
publish(Ch, QQ),
@@ -1919,6 +1924,7 @@ subscribe_redelivery_count(Config) ->
multiple = false,
requeue = true})
after 5000 ->
+ flush(1),
exit(basic_deliver_timeout_2)
end,
diff --git a/erlang_ls.config b/erlang_ls.config
index 10b5073efa..5fc0d8b7e3 100644
--- a/erlang_ls.config
+++ b/erlang_ls.config
@@ -1,25 +1,27 @@
# otp_path: "/path/to/otp/lib/erlang"
deps_dirs:
- "deps/*"
- - "deps/rabbit/apps/*"
diagnostics:
- disabled: []
+ disabled:
+ - bound_var_in_pattern
enabled:
- crossref
- dialyzer
- - elvis
+ # - elvis
- compiler
include_dirs:
- "deps"
- "deps/*/include"
-# lenses:
-# enabled:
-# - ct-run-test
-# - show-behaviour-usages
-# disabled: []
+lenses:
+ enabled:
+ - ct-run-test
+ - show-behaviour-usages
+ - suggest-spec
+ - function-references
+ disabled: []
# macros:
# - name: DEFINED_WITH_VALUE
# value: 42
-code_reload:
- node: rabbit@localhost
+# code_reload:
+# node: rabbit@nkarl-a02
plt_path: .rabbitmq_server_release.plt