diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 4 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 2 |
3 files changed, 31 insertions, 12 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 37afefef7e..891a6827dc 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -143,7 +143,14 @@ update_config(Conf, State) -> competing end, Cfg = State#?MODULE.cfg, - State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI, + SHICur = case State#?MODULE.cfg of + #cfg{release_cursor_interval = {_, C}} -> + C; + #cfg{release_cursor_interval = C} -> + C + end, + + State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur}, dead_letter_handler = DLH, become_leader_handler = BLH, max_length = MaxLength, @@ -1580,12 +1587,13 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, ServiceQueue0 end. - %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point dehydrate_state(#?MODULE{messages = Messages, consumers = Consumers, returns = Returns, + low_msg_num = Low, + next_msg_num = Next, prefix_msgs = {PrefRet0, PrefMsg0}, waiting_consumers = Waiting0} = State) -> %% TODO: optimise this function as far as possible @@ -1600,13 +1608,10 @@ dehydrate_state(#?MODULE{messages = Messages, end, lists:reverse(PrefRet0), lqueue:to_list(Returns)), - PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) -> - [Msg | Acc]; - ({_, {_RaftIdx, {Header, _}}}, Acc) -> - [Header | Acc] - end, - lists:reverse(PrefMsg0), - lists:sort(maps:to_list(Messages))), + PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []), + %% prefix messages are not populated in normal operation only after + %% recovering from a snapshot + PrefMsgs = PrefMsg0 ++ PrefMsgsSuff, Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0], State#?MODULE{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), @@ -1617,9 +1622,21 @@ dehydrate_state(#?MODULE{messages = Messages, end, Consumers), returns = lqueue:new(), prefix_msgs = {lists:reverse(PrefRet), - lists:reverse(PrefMsgs)}, + PrefMsgs}, waiting_consumers = Waiting}. +dehydrate_messages(Low, Next, _Msgs, Acc) + when Next < Low -> + Acc; +dehydrate_messages(Low, Next, Msgs, Acc0) -> + Acc = case maps:get(Next, Msgs) of + {_RaftIdx, {_, 'empty'} = Msg} -> + [Msg | Acc0]; + {_RaftIdx, {Header, _}} -> + [Header | Acc0] + end, + dehydrate_messages(Low, Next - 1, Msgs, Acc). + dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> M; diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index 0d1d5ed2d1..16e665f9df 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -104,7 +104,9 @@ -record(cfg, {name :: atom(), resource :: rabbit_types:r('queue'), - release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(), + release_cursor_interval = + {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY} :: + non_neg_integer() | {non_neg_integer(), non_neg_integer()}, dead_letter_handler :: option(applied_mfa()), become_leader_handler :: option(applied_mfa()), max_length :: option(non_neg_integer()), diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 0b947a9d1d..e0fced2de3 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -691,7 +691,7 @@ single_active_prop(Conf0, Commands, ValidateOrder) -> validate_msg_order(_, [], S) -> S; validate_msg_order(Cid, [{_, {H, Num}} | Rem], PrevMax) -> - Redelivered = maps:is_key(delivery_count, H), + Redelivered = is_map(H) andalso maps:is_key(delivery_count, H), case undefined of _ when Num == PrevMax + 1 -> %% forwards case |
