diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-10-17 10:13:10 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-10-17 10:13:10 +0100 |
| commit | d48cff40509e6cd19bb75aad82a88a2a0b95453e (patch) | |
| tree | c63abef5830235658ed6cb0d69b1ea607d6d85a5 /src | |
| parent | 851244f0fb6d4e264b660da340447914f459265a (diff) | |
| download | rabbitmq-server-git-d48cff40509e6cd19bb75aad82a88a2a0b95453e.tar.gz | |
rabbit_fifo: Optimise peak memory use
When taking a snapshot point
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 4 |
2 files changed, 30 insertions, 11 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 37afefef7e..891a6827dc 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -143,7 +143,14 @@ update_config(Conf, State) -> competing end, Cfg = State#?MODULE.cfg, - State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI, + SHICur = case State#?MODULE.cfg of + #cfg{release_cursor_interval = {_, C}} -> + C; + #cfg{release_cursor_interval = C} -> + C + end, + + State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur}, dead_letter_handler = DLH, become_leader_handler = BLH, max_length = MaxLength, @@ -1580,12 +1587,13 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, ServiceQueue0 end. - %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point dehydrate_state(#?MODULE{messages = Messages, consumers = Consumers, returns = Returns, + low_msg_num = Low, + next_msg_num = Next, prefix_msgs = {PrefRet0, PrefMsg0}, waiting_consumers = Waiting0} = State) -> %% TODO: optimise this function as far as possible @@ -1600,13 +1608,10 @@ dehydrate_state(#?MODULE{messages = Messages, end, lists:reverse(PrefRet0), lqueue:to_list(Returns)), - PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) -> - [Msg | Acc]; - ({_, {_RaftIdx, {Header, _}}}, Acc) -> - [Header | Acc] - end, - lists:reverse(PrefMsg0), - lists:sort(maps:to_list(Messages))), + PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []), + %% prefix messages are not populated in normal operation only after + %% recovering from a snapshot + PrefMsgs = PrefMsg0 ++ PrefMsgsSuff, Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0], State#?MODULE{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), @@ -1617,9 +1622,21 @@ dehydrate_state(#?MODULE{messages = Messages, end, Consumers), returns = lqueue:new(), prefix_msgs = {lists:reverse(PrefRet), - lists:reverse(PrefMsgs)}, + PrefMsgs}, waiting_consumers = Waiting}. +dehydrate_messages(Low, Next, _Msgs, Acc) + when Next < Low -> + Acc; +dehydrate_messages(Low, Next, Msgs, Acc0) -> + Acc = case maps:get(Next, Msgs) of + {_RaftIdx, {_, 'empty'} = Msg} -> + [Msg | Acc0]; + {_RaftIdx, {Header, _}} -> + [Header | Acc0] + end, + dehydrate_messages(Low, Next - 1, Msgs, Acc). + dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> M; diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 0d1d5ed2d1..16e665f9df 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -104,7 +104,9 @@ -record(cfg, {name :: atom(), resource :: rabbit_types:r('queue'), - release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(), + release_cursor_interval = + {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY} :: + non_neg_integer() | {non_neg_integer(), non_neg_integer()}, dead_letter_handler :: option(applied_mfa()), become_leader_handler :: option(applied_mfa()), max_length :: option(non_neg_integer()), |
