summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-10-17 10:13:10 +0100
committerkjnilsson <knilsson@pivotal.io>2019-10-17 10:13:10 +0100
commitd48cff40509e6cd19bb75aad82a88a2a0b95453e (patch)
treec63abef5830235658ed6cb0d69b1ea607d6d85a5 /src
parent851244f0fb6d4e264b660da340447914f459265a (diff)
downloadrabbitmq-server-git-d48cff40509e6cd19bb75aad82a88a2a0b95453e.tar.gz
rabbit_fifo: Optimise peak memory use
When taking a snapshot point
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl37
-rw-r--r--src/rabbit_fifo.hrl4
2 files changed, 30 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 37afefef7e..891a6827dc 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -143,7 +143,14 @@ update_config(Conf, State) ->
competing
end,
Cfg = State#?MODULE.cfg,
- State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI,
+ SHICur = case State#?MODULE.cfg of
+ #cfg{release_cursor_interval = {_, C}} ->
+ C;
+ #cfg{release_cursor_interval = C} ->
+ C
+ end,
+
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur},
dead_letter_handler = DLH,
become_leader_handler = BLH,
max_length = MaxLength,
@@ -1580,12 +1587,13 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0
end.
-
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
dehydrate_state(#?MODULE{messages = Messages,
consumers = Consumers,
returns = Returns,
+ low_msg_num = Low,
+ next_msg_num = Next,
prefix_msgs = {PrefRet0, PrefMsg0},
waiting_consumers = Waiting0} = State) ->
%% TODO: optimise this function as far as possible
@@ -1600,13 +1608,10 @@ dehydrate_state(#?MODULE{messages = Messages,
end,
lists:reverse(PrefRet0),
lqueue:to_list(Returns)),
- PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) ->
- [Msg | Acc];
- ({_, {_RaftIdx, {Header, _}}}, Acc) ->
- [Header | Acc]
- end,
- lists:reverse(PrefMsg0),
- lists:sort(maps:to_list(Messages))),
+ PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []),
+ %% prefix messages are not populated in normal operation only after
+ %% recovering from a snapshot
+ PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
State#?MODULE{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
@@ -1617,9 +1622,21 @@ dehydrate_state(#?MODULE{messages = Messages,
end, Consumers),
returns = lqueue:new(),
prefix_msgs = {lists:reverse(PrefRet),
- lists:reverse(PrefMsgs)},
+ PrefMsgs},
waiting_consumers = Waiting}.
+dehydrate_messages(Low, Next, _Msgs, Acc)
+ when Next < Low ->
+ Acc;
+dehydrate_messages(Low, Next, Msgs, Acc0) ->
+ Acc = case maps:get(Next, Msgs) of
+ {_RaftIdx, {_, 'empty'} = Msg} ->
+ [Msg | Acc0];
+ {_RaftIdx, {Header, _}} ->
+ [Header | Acc0]
+ end,
+ dehydrate_messages(Low, Next - 1, Msgs, Acc).
+
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
M;
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 0d1d5ed2d1..16e665f9df 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -104,7 +104,9 @@
-record(cfg,
{name :: atom(),
resource :: rabbit_types:r('queue'),
- release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
+ release_cursor_interval =
+ {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY} ::
+ non_neg_integer() | {non_neg_integer(), non_neg_integer()},
dead_letter_handler :: option(applied_mfa()),
become_leader_handler :: option(applied_mfa()),
max_length :: option(non_neg_integer()),