diff options
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.erl | 124 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo.hrl | 1 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_fifo_client.erl | 14 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_quorum_queue.erl | 21 | ||||
-rw-r--r-- | deps/rabbit/test/quorum_queue_SUITE.erl | 18 | ||||
-rw-r--r-- | erlang_ls.config | 22 |
6 files changed, 177 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index fe5adb029c..c7cbb55aa6 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -1,4 +1,4 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public +%% This Source Code Form is subject tconsumer_ido the terms of the Mozilla Public %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% @@ -71,6 +71,9 @@ -record(enqueue, {pid :: option(pid()), seq :: option(msg_seqno()), msg :: raw_msg()}). +-record(requeue, {consumer_id :: consumer_id(), + msg_id :: msg_id(), + msg :: indexed_msg()}). -record(register_enqueuer, {pid :: pid()}). -record(checkout, {consumer_id :: consumer_id(), spec :: checkout_spec(), @@ -92,6 +95,7 @@ -opaque protocol() :: #enqueue{} | + #requeue{} | #register_enqueuer{} | #checkout{} | #settle{} | @@ -226,6 +230,52 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, _ -> {State, ok} end; +apply(#{index := Idx} = Meta, + #requeue{consumer_id = ConsumerId, + msg_id = MsgId, + %% as we read the message from disk it is already + %% an inmemory message + msg = ?INDEX_MSG(OldIdx, ?MSG(_Header, _RawMsg) = Msg)}, + #?MODULE{consumers = Cons0, + messages = Messages, + ra_indexes = Indexes0} = State00) -> + case Cons0 of + #{ConsumerId := #consumer{checked_out = Checked0} = Con0} + when is_map_key(MsgId, Checked0) -> + %% construct an index message with the current raft index + %% and update delivery count before adding it to the message queue + ?INDEX_MSG(_, ?MSG(Header, _)) = IdxMsg0 = + update_msg_header(delivery_count, fun incr/1, 1, ?INDEX_MSG(Idx, Msg)), + + State0 = add_bytes_return(Header, State00), + {State1, IdxMsg} = + case evaluate_memory_limit(Header, State0) of + true -> + % indexed message with header map + {State0, ?INDEX_MSG(Idx, ?DISK_MSG(Header))}; + false -> + {add_in_memory_counts(Header, State0), IdxMsg0} + end, + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0), + credit = increase_credit(Con0, 1)}, + State2 = update_or_remove_sub( + Meta, + ConsumerId, + Con, + State1#?MODULE{ra_indexes = rabbit_fifo_index:delete(OldIdx, Indexes0), + messages = lqueue:in(IdxMsg, Messages)}), + + %% We have to increment the enqueue counter to ensure release cursors + %% are generated + State3 = incr_enqueue_count(State2), + + {State, Ret, Effs} = checkout(Meta, State0, State3, []), + update_smallest_raft_index(Idx, Ret, + maybe_store_dehydrated_state(Idx, State), + Effs); + _ -> + {State00, ok} + end; apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0, @@ -861,11 +911,48 @@ init_aux(Name) when is_atom(Name) -> capacity = {inactive, Now, 1, 1.0}}. handle_aux(leader, _, garbage_collection, State, Log, MacState) -> - % ra_log_wal:force_roll_over(ra_log_wal), {no_reply, force_eval_gc(Log, MacState, State), Log}; handle_aux(follower, _, garbage_collection, State, Log, MacState) -> - % ra_log_wal:force_roll_over(ra_log_wal), {no_reply, force_eval_gc(Log, MacState, State), Log}; +handle_aux(leader, cast, {#return{msg_ids = MsgIds, + consumer_id = ConsumerId}, Corr, Pid}, + Aux0, Log0, #?MODULE{cfg = #cfg{delivery_limit = undefined}, + consumers = Consumers, + ra_indexes = _Indexes}) -> + case Consumers of + #{ConsumerId := #consumer{checked_out = Checked}} -> + {Log, ToReturn} = + maps:fold( + fun (MsgId, ?INDEX_MSG(Idx, ?DISK_MSG(Header)), {L0, Acc}) -> + %% it is possible this is not found if the consumer + %% crashed and the message got removed + %% TODO: handle when log entry is not found + case ra_log:fetch(Idx, L0) of + {{_, _, {_, _, Cmd, _}}, L} -> + Msg = case Cmd of + #enqueue{msg = M} -> M; + #requeue{msg = ?INDEX_MSG(_, ?MSG(_H, M))} -> + M + end, + IdxMsg = ?INDEX_MSG(Idx, ?MSG(Header, Msg)), + {L, [{MsgId, IdxMsg} | Acc]}; + {undefined, L} -> + {L, Acc} + end; + (MsgId, IdxMsg, {L0, Acc}) -> + {L0, [{MsgId, IdxMsg} | Acc]} + end, {Log0, []}, maps:with(MsgIds, Checked)), + + Appends = make_requeue(ConsumerId, {notify, Corr, Pid}, + lists:sort(ToReturn), []), + {no_reply, Aux0, Log, Appends}; + _ -> + {no_reply, Aux0, Log0} + end; +handle_aux(leader, cast, {#return{} = Ret, Corr, Pid}, + Aux0, Log, #?MODULE{}) -> + %% for returns with a delivery limit set we can just return as before + {no_reply, Aux0, Log, [{append, Ret, {notify, Corr, Pid}}]}; handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) -> {no_reply, Aux0, Log}; handle_aux(_RaState, cast, Cmd, #aux{capacity = Use0} = Aux0, @@ -1580,7 +1667,7 @@ return_one(Meta, MsgId, Msg0, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId) -> #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), - Msg = update_msg_header(delivery_count, fun (C) -> C + 1 end, 1, Msg0), + Msg = update_msg_header(delivery_count, fun incr/1, 1, Msg0), Header = get_msg_header(Msg), case get_header(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> @@ -1809,7 +1896,11 @@ delivery_effect({CTag, CPid}, IdxMsgs, InMemMsgs) -> {RaftIdxs, Data} = lists:unzip(IdxMsgs), {log, RaftIdxs, fun(Log) -> - Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) -> + Msgs0 = lists:zipwith(fun + (#enqueue{msg = Msg}, {MsgId, Header}) -> + {MsgId, {Header, Msg}}; + (#requeue{msg = ?INDEX_MSG(_, ?MSG(_, Msg))}, + {MsgId, Header}) -> {MsgId, {Header, Msg}} end, Log, Data), Msgs = case InMemMsgs of @@ -1839,7 +1930,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0, {ConsumerMsg, State0} -> %% there are consumers waiting to be serviced %% process consumer checkout - case maps:get(ConsumerId, Cons0) of + case maps:get(ConsumerId, Cons0, error) of #consumer{credit = 0} -> %% no credit but was still on queue %% can happen when draining @@ -2299,3 +2390,24 @@ smallest_raft_index(#?MODULE{cfg = _Cfg, {undefined, State} end end. + +make_requeue(ConsumerId, Notify, [{MsgId, Msg}], Acc) -> + lists:reverse([{append, + #requeue{consumer_id = ConsumerId, + msg_id = MsgId, + msg = Msg}, + Notify} + | Acc]); +make_requeue(ConsumerId, Notify, [{MsgId, Msg} | Rem], Acc) -> + make_requeue(ConsumerId, Notify, Rem, + [{append, + #requeue{consumer_id = ConsumerId, + msg_id = MsgId, + msg = Msg}, + noreply} + | Acc]); +make_requeue(_ConsumerId, _Notify, [], []) -> + []. + +incr(I) -> + I + 1. diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index c797c9d9bd..95e7003460 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -122,6 +122,7 @@ -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list + unused, pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}], status = up :: up | suspected_down, diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 7784e15e60..e102ad2207 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -828,6 +828,19 @@ next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) -> consumer_id(ConsumerTag) -> {ConsumerTag, self()}. +send_command(Server, Correlation, Command, _Priority, + #state{pending = Pending, + cfg = #cfg{soft_limit = SftLmt}} = State0) + when element(1, Command) == return -> + %% returns are sent to the aux machine for pre-evaluation + {Seq, State} = next_seq(State0), + ok = ra:cast_aux_command(Server, {Command, Seq, self()}), + Tag = case maps:size(Pending) >= SftLmt of + true -> slow; + false -> ok + end, + {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, + slow = Tag == slow}}; send_command(Server, Correlation, Command, Priority, #state{pending = Pending, cfg = #cfg{soft_limit = SftLmt}} = State0) -> @@ -840,6 +853,7 @@ send_command(Server, Correlation, Command, Priority, {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}}, slow = Tag == slow}}. + resend_command(Node, Correlation, Command, #state{pending = Pending} = State0) -> {Seq, State} = next_seq(State0), diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 0131d1538b..fc1d0b5a54 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -826,7 +826,8 @@ deliver(true, Delivery, QState0) -> rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, Delivery#delivery.message, QState0). -deliver(QSs, #delivery{confirm = Confirm} = Delivery) -> +deliver(QSs, #delivery{confirm = Confirm} = Delivery0) -> + Delivery = clean_delivery(Delivery0), lists:foldl( fun({Q, stateless}, {Qs, Actions}) -> QRef = amqqueue:get_pid(Q), @@ -1610,3 +1611,21 @@ notify_decorators(QName, F, A) -> {error, not_found} -> ok end. + +%% remove any data that a quorum queue doesn't need +clean_delivery(#delivery{message = + #basic_message{content = Content0} = Msg} = Delivery) -> + Content = case Content0 of + #content{properties = none} -> + Content0; + #content{protocol = none} -> + Content0; + #content{properties = Props, + protocol = Proto} -> + Content0#content{properties = none, + properties_bin = Proto:encode_properties(Props)} + end, + + Delivery#delivery{message = Msg#basic_message{%%id = undefined, + content = Content}}. + diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 1de9d1f5db..09f473cb4f 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -81,11 +81,12 @@ groups() -> quorum_cluster_size_7, node_removal_is_not_quorum_critical ]}, - {clustered_with_partitions, [], [ - reconnect_consumer_and_publish, - reconnect_consumer_and_wait, - reconnect_consumer_and_wait_channel_down - ]} + {clustered_with_partitions, [], + [ + reconnect_consumer_and_publish, + reconnect_consumer_and_wait, + reconnect_consumer_and_wait_channel_down + ]} ]} ]. @@ -1889,7 +1890,11 @@ subscribe_redelivery_count(Config) -> Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + declare(Ch, QQ, + [ + {<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 0} + ])), RaName = ra_name(QQ), publish(Ch, QQ), @@ -1919,6 +1924,7 @@ subscribe_redelivery_count(Config) -> multiple = false, requeue = true}) after 5000 -> + flush(1), exit(basic_deliver_timeout_2) end, diff --git a/erlang_ls.config b/erlang_ls.config index 10b5073efa..5fc0d8b7e3 100644 --- a/erlang_ls.config +++ b/erlang_ls.config @@ -1,25 +1,27 @@ # otp_path: "/path/to/otp/lib/erlang" deps_dirs: - "deps/*" - - "deps/rabbit/apps/*" diagnostics: - disabled: [] + disabled: + - bound_var_in_pattern enabled: - crossref - dialyzer - - elvis + # - elvis - compiler include_dirs: - "deps" - "deps/*/include" -# lenses: -# enabled: -# - ct-run-test -# - show-behaviour-usages -# disabled: [] +lenses: + enabled: + - ct-run-test + - show-behaviour-usages + - suggest-spec + - function-references + disabled: [] # macros: # - name: DEFINED_WITH_VALUE # value: 42 -code_reload: - node: rabbit@localhost +# code_reload: +# node: rabbit@nkarl-a02 plt_path: .rabbitmq_server_release.plt |