summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl124
-rw-r--r--deps/rabbit/src/rabbit_fifo.hrl1
-rw-r--r--deps/rabbit/src/rabbit_fifo_client.erl14
-rw-r--r--deps/rabbit/src/rabbit_quorum_queue.erl21
-rw-r--r--deps/rabbit/test/quorum_queue_SUITE.erl18
-rw-r--r--erlang_ls.config22
6 files changed, 177 insertions, 23 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index fe5adb029c..c7cbb55aa6 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -1,4 +1,4 @@
-%% This Source Code Form is subject to the terms of the Mozilla Public
+%% This Source Code Form is subject tconsumer_ido the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
@@ -71,6 +71,9 @@
-record(enqueue, {pid :: option(pid()),
seq :: option(msg_seqno()),
msg :: raw_msg()}).
+-record(requeue, {consumer_id :: consumer_id(),
+ msg_id :: msg_id(),
+ msg :: indexed_msg()}).
-record(register_enqueuer, {pid :: pid()}).
-record(checkout, {consumer_id :: consumer_id(),
spec :: checkout_spec(),
@@ -92,6 +95,7 @@
-opaque protocol() ::
#enqueue{} |
+ #requeue{} |
#register_enqueuer{} |
#checkout{} |
#settle{} |
@@ -226,6 +230,52 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
_ ->
{State, ok}
end;
+apply(#{index := Idx} = Meta,
+ #requeue{consumer_id = ConsumerId,
+ msg_id = MsgId,
+ %% as we read the message from disk it is already
+ %% an inmemory message
+ msg = ?INDEX_MSG(OldIdx, ?MSG(_Header, _RawMsg) = Msg)},
+ #?MODULE{consumers = Cons0,
+ messages = Messages,
+ ra_indexes = Indexes0} = State00) ->
+ case Cons0 of
+ #{ConsumerId := #consumer{checked_out = Checked0} = Con0}
+ when is_map_key(MsgId, Checked0) ->
+ %% construct an index message with the current raft index
+ %% and update delivery count before adding it to the message queue
+ ?INDEX_MSG(_, ?MSG(Header, _)) = IdxMsg0 =
+ update_msg_header(delivery_count, fun incr/1, 1, ?INDEX_MSG(Idx, Msg)),
+
+ State0 = add_bytes_return(Header, State00),
+ {State1, IdxMsg} =
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ % indexed message with header map
+ {State0, ?INDEX_MSG(Idx, ?DISK_MSG(Header))};
+ false ->
+ {add_in_memory_counts(Header, State0), IdxMsg0}
+ end,
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked0),
+ credit = increase_credit(Con0, 1)},
+ State2 = update_or_remove_sub(
+ Meta,
+ ConsumerId,
+ Con,
+ State1#?MODULE{ra_indexes = rabbit_fifo_index:delete(OldIdx, Indexes0),
+ messages = lqueue:in(IdxMsg, Messages)}),
+
+ %% We have to increment the enqueue counter to ensure release cursors
+ %% are generated
+ State3 = incr_enqueue_count(State2),
+
+ {State, Ret, Effs} = checkout(Meta, State0, State3, []),
+ update_smallest_raft_index(Idx, Ret,
+ maybe_store_dehydrated_state(Idx, State),
+ Effs);
+ _ ->
+ {State00, ok}
+ end;
apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
drain = Drain, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0,
@@ -861,11 +911,48 @@ init_aux(Name) when is_atom(Name) ->
capacity = {inactive, Now, 1, 1.0}}.
handle_aux(leader, _, garbage_collection, State, Log, MacState) ->
- % ra_log_wal:force_roll_over(ra_log_wal),
{no_reply, force_eval_gc(Log, MacState, State), Log};
handle_aux(follower, _, garbage_collection, State, Log, MacState) ->
- % ra_log_wal:force_roll_over(ra_log_wal),
{no_reply, force_eval_gc(Log, MacState, State), Log};
+handle_aux(leader, cast, {#return{msg_ids = MsgIds,
+ consumer_id = ConsumerId}, Corr, Pid},
+ Aux0, Log0, #?MODULE{cfg = #cfg{delivery_limit = undefined},
+ consumers = Consumers,
+ ra_indexes = _Indexes}) ->
+ case Consumers of
+ #{ConsumerId := #consumer{checked_out = Checked}} ->
+ {Log, ToReturn} =
+ maps:fold(
+ fun (MsgId, ?INDEX_MSG(Idx, ?DISK_MSG(Header)), {L0, Acc}) ->
+ %% it is possible this is not found if the consumer
+ %% crashed and the message got removed
+ %% TODO: handle when log entry is not found
+ case ra_log:fetch(Idx, L0) of
+ {{_, _, {_, _, Cmd, _}}, L} ->
+ Msg = case Cmd of
+ #enqueue{msg = M} -> M;
+ #requeue{msg = ?INDEX_MSG(_, ?MSG(_H, M))} ->
+ M
+ end,
+ IdxMsg = ?INDEX_MSG(Idx, ?MSG(Header, Msg)),
+ {L, [{MsgId, IdxMsg} | Acc]};
+ {undefined, L} ->
+ {L, Acc}
+ end;
+ (MsgId, IdxMsg, {L0, Acc}) ->
+ {L0, [{MsgId, IdxMsg} | Acc]}
+ end, {Log0, []}, maps:with(MsgIds, Checked)),
+
+ Appends = make_requeue(ConsumerId, {notify, Corr, Pid},
+ lists:sort(ToReturn), []),
+ {no_reply, Aux0, Log, Appends};
+ _ ->
+ {no_reply, Aux0, Log0}
+ end;
+handle_aux(leader, cast, {#return{} = Ret, Corr, Pid},
+ Aux0, Log, #?MODULE{}) ->
+ %% for returns with a delivery limit set we can just return as before
+ {no_reply, Aux0, Log, [{append, Ret, {notify, Corr, Pid}}]};
handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) ->
{no_reply, Aux0, Log};
handle_aux(_RaState, cast, Cmd, #aux{capacity = Use0} = Aux0,
@@ -1580,7 +1667,7 @@ return_one(Meta, MsgId, Msg0,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId) ->
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
- Msg = update_msg_header(delivery_count, fun (C) -> C + 1 end, 1, Msg0),
+ Msg = update_msg_header(delivery_count, fun incr/1, 1, Msg0),
Header = get_msg_header(Msg),
case get_header(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
@@ -1809,7 +1896,11 @@ delivery_effect({CTag, CPid}, IdxMsgs, InMemMsgs) ->
{RaftIdxs, Data} = lists:unzip(IdxMsgs),
{log, RaftIdxs,
fun(Log) ->
- Msgs0 = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ Msgs0 = lists:zipwith(fun
+ (#enqueue{msg = Msg}, {MsgId, Header}) ->
+ {MsgId, {Header, Msg}};
+ (#requeue{msg = ?INDEX_MSG(_, ?MSG(_, Msg))},
+ {MsgId, Header}) ->
{MsgId, {Header, Msg}}
end, Log, Data),
Msgs = case InMemMsgs of
@@ -1839,7 +1930,7 @@ checkout_one(Meta, #?MODULE{service_queue = SQ0,
{ConsumerMsg, State0} ->
%% there are consumers waiting to be serviced
%% process consumer checkout
- case maps:get(ConsumerId, Cons0) of
+ case maps:get(ConsumerId, Cons0, error) of
#consumer{credit = 0} ->
%% no credit but was still on queue
%% can happen when draining
@@ -2299,3 +2390,24 @@ smallest_raft_index(#?MODULE{cfg = _Cfg,
{undefined, State}
end
end.
+
+make_requeue(ConsumerId, Notify, [{MsgId, Msg}], Acc) ->
+ lists:reverse([{append,
+ #requeue{consumer_id = ConsumerId,
+ msg_id = MsgId,
+ msg = Msg},
+ Notify}
+ | Acc]);
+make_requeue(ConsumerId, Notify, [{MsgId, Msg} | Rem], Acc) ->
+ make_requeue(ConsumerId, Notify, Rem,
+ [{append,
+ #requeue{consumer_id = ConsumerId,
+ msg_id = MsgId,
+ msg = Msg},
+ noreply}
+ | Acc]);
+make_requeue(_ConsumerId, _Notify, [], []) ->
+ [].
+
+incr(I) ->
+ I + 1.
diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl
index c797c9d9bd..95e7003460 100644
--- a/deps/rabbit/src/rabbit_fifo.hrl
+++ b/deps/rabbit/src/rabbit_fifo.hrl
@@ -122,6 +122,7 @@
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
+ unused,
pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
status = up :: up |
suspected_down,
diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl
index 7784e15e60..e102ad2207 100644
--- a/deps/rabbit/src/rabbit_fifo_client.erl
+++ b/deps/rabbit/src/rabbit_fifo_client.erl
@@ -828,6 +828,19 @@ next_enqueue_seq(#state{next_enqueue_seq = Seq} = State) ->
consumer_id(ConsumerTag) ->
{ConsumerTag, self()}.
+send_command(Server, Correlation, Command, _Priority,
+ #state{pending = Pending,
+ cfg = #cfg{soft_limit = SftLmt}} = State0)
+ when element(1, Command) == return ->
+ %% returns are sent to the aux machine for pre-evaluation
+ {Seq, State} = next_seq(State0),
+ ok = ra:cast_aux_command(Server, {Command, Seq, self()}),
+ Tag = case maps:size(Pending) >= SftLmt of
+ true -> slow;
+ false -> ok
+ end,
+ {Tag, State#state{pending = Pending#{Seq => {Correlation, Command}},
+ slow = Tag == slow}};
send_command(Server, Correlation, Command, Priority,
#state{pending = Pending,
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
@@ -840,6 +853,7 @@ send_command(Server, Correlation, Command, Priority,
{Tag, State#state{pending = Pending#{Seq => {Correlation, Command}},
slow = Tag == slow}}.
+
resend_command(Node, Correlation, Command,
#state{pending = Pending} = State0) ->
{Seq, State} = next_seq(State0),
diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl
index 0131d1538b..fc1d0b5a54 100644
--- a/deps/rabbit/src/rabbit_quorum_queue.erl
+++ b/deps/rabbit/src/rabbit_quorum_queue.erl
@@ -826,7 +826,8 @@ deliver(true, Delivery, QState0) ->
rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
Delivery#delivery.message, QState0).
-deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
+deliver(QSs, #delivery{confirm = Confirm} = Delivery0) ->
+ Delivery = clean_delivery(Delivery0),
lists:foldl(
fun({Q, stateless}, {Qs, Actions}) ->
QRef = amqqueue:get_pid(Q),
@@ -1610,3 +1611,21 @@ notify_decorators(QName, F, A) ->
{error, not_found} ->
ok
end.
+
+%% remove any data that a quorum queue doesn't need
+clean_delivery(#delivery{message =
+ #basic_message{content = Content0} = Msg} = Delivery) ->
+ Content = case Content0 of
+ #content{properties = none} ->
+ Content0;
+ #content{protocol = none} ->
+ Content0;
+ #content{properties = Props,
+ protocol = Proto} ->
+ Content0#content{properties = none,
+ properties_bin = Proto:encode_properties(Props)}
+ end,
+
+ Delivery#delivery{message = Msg#basic_message{%%id = undefined,
+ content = Content}}.
+
diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl
index 1de9d1f5db..09f473cb4f 100644
--- a/deps/rabbit/test/quorum_queue_SUITE.erl
+++ b/deps/rabbit/test/quorum_queue_SUITE.erl
@@ -81,11 +81,12 @@ groups() ->
quorum_cluster_size_7,
node_removal_is_not_quorum_critical
]},
- {clustered_with_partitions, [], [
- reconnect_consumer_and_publish,
- reconnect_consumer_and_wait,
- reconnect_consumer_and_wait_channel_down
- ]}
+ {clustered_with_partitions, [],
+ [
+ reconnect_consumer_and_publish,
+ reconnect_consumer_and_wait,
+ reconnect_consumer_and_wait_channel_down
+ ]}
]}
].
@@ -1889,7 +1890,11 @@ subscribe_redelivery_count(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ declare(Ch, QQ,
+ [
+ {<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 0}
+ ])),
RaName = ra_name(QQ),
publish(Ch, QQ),
@@ -1919,6 +1924,7 @@ subscribe_redelivery_count(Config) ->
multiple = false,
requeue = true})
after 5000 ->
+ flush(1),
exit(basic_deliver_timeout_2)
end,
diff --git a/erlang_ls.config b/erlang_ls.config
index 10b5073efa..5fc0d8b7e3 100644
--- a/erlang_ls.config
+++ b/erlang_ls.config
@@ -1,25 +1,27 @@
# otp_path: "/path/to/otp/lib/erlang"
deps_dirs:
- "deps/*"
- - "deps/rabbit/apps/*"
diagnostics:
- disabled: []
+ disabled:
+ - bound_var_in_pattern
enabled:
- crossref
- dialyzer
- - elvis
+ # - elvis
- compiler
include_dirs:
- "deps"
- "deps/*/include"
-# lenses:
-# enabled:
-# - ct-run-test
-# - show-behaviour-usages
-# disabled: []
+lenses:
+ enabled:
+ - ct-run-test
+ - show-behaviour-usages
+ - suggest-spec
+ - function-references
+ disabled: []
# macros:
# - name: DEFINED_WITH_VALUE
# value: 42
-code_reload:
- node: rabbit@localhost
+# code_reload:
+# node: rabbit@nkarl-a02
plt_path: .rabbitmq_server_release.plt