diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 119 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 6 | ||||
| -rw-r--r-- | src/rabbit_fifo_v0.erl | 20 |
3 files changed, 66 insertions, 79 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index fb1794cae4..13c532af47 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -137,16 +137,14 @@ update_config(Conf, State) -> competing end, Cfg = State#?MODULE.cfg, - SHICur = case State#?MODULE.cfg of - #cfg{release_cursor_interval = {_, C}} -> - C; - #cfg{release_cursor_interval = undefined} -> - SHI; - #cfg{release_cursor_interval = C} -> - C - end, + RCI = case State#?MODULE.cfg of + #cfg{release_cursor_interval = undefined} -> + {SHI, SHI}; + #cfg{release_cursor_interval = {_, C}} -> + {SHI, C} + end, - State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur}, + State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI, dead_letter_handler = DLH, become_leader_handler = BLH, max_length = MaxLength, @@ -312,17 +310,17 @@ apply(#{index := RaftIdx}, #purge{}, messages = Messages} = State0) -> Total = messages_ready(State0), Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, - [I || {I, _} <- lists:sort(maps:values(Messages))]), + [I || {I, _} <- lists:sort(lqueue:to_list(Messages))]), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), {State, _, Effects} = update_smallest_raft_index(RaftIdx, State0#?MODULE{ra_indexes = Indexes, - messages = #{}, + messages = lqueue:new(), returns = lqueue:new(), msg_bytes_enqueue = 0, prefix_msgs = {0, [], 0, []}, - low_msg_num = undefined, + % low_msg_num = undefined, msg_bytes_in_memory = 0, msgs_ready_in_memory = 0}, []), @@ -467,9 +465,13 @@ apply(_, #purge_nodes{nodes = Nodes}, State0) -> {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []); -apply(_Meta, {machine_version, 0, 1}, State) -> +apply(_Meta, {machine_version, 0, 1}, V0State0) -> + V0State = rabbit_fifo_v0:normalize_for_v1(V0State0), %% quick hack to "convert" the state from version one - {setelement(1, State, ?MODULE), ok, []}. + State = setelement(1, V0State, ?MODULE), + V0Msgs = rabbit_fifo_v0:messages_map(V0State), + V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))), + {State#?MODULE{messages = V1Msgs}, ok, []}. purge_node(Node, State, Effects) -> lists:foldl(fun(Pid, {S0, E0}) -> @@ -816,7 +818,7 @@ messages_ready(#?MODULE{messages = M, %% prefix messages will rarely have anything in them during normal %% operations so length/1 is fine here - maps:size(M) + lqueue:len(R) + RCnt + PCnt. + lqueue:len(M) + lqueue:len(R) + RCnt + PCnt. messages_total(#?MODULE{ra_indexes = I, prefix_msgs = {RCnt, _R, PCnt, _P}}) -> @@ -1005,7 +1007,6 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) -> end. enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, - low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> %% the initial header is an integer only - it will get expanded to a map %% when the next required key is added @@ -1020,10 +1021,10 @@ enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages, {RaftIdx, {Header, RawMsg}}} % indexed message with header map end, State = add_bytes_enqueue(Header, State1), - State#?MODULE{messages = Messages#{NextMsgNum => Msg}, + State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages), %% this is probably only done to record it when low_msg_num %% is undefined - low_msg_num = min(LowMsgNum, NextMsgNum), + % low_msg_num = min(LowMsgNum, NextMsgNum), next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, @@ -1042,12 +1043,6 @@ incr_enqueue_count(#?MODULE{enqueue_count = C, %% A: Because it needs to be the very last thing we do and we %% first needs to run the checkout logic. State0#?MODULE{enqueue_count = 0}; -incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg} - = State0) - when is_integer(C) -> - %% conversion to new release cursor interval format - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, - incr_enqueue_count(State); incr_enqueue_count(#?MODULE{enqueue_count = C} = State) -> State#?MODULE{enqueue_count = C + 1}. @@ -1070,22 +1065,13 @@ maybe_store_dehydrated_state(RaftIdx, min(max(Total, Base), ?RELEASE_CURSOR_EVERY_MAX) end, - State = convert_prefix_msgs( - State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = - {Base, Interval}}}), + State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = + {Base, Interval}}}, Dehydrated = dehydrate_state(State), Cursor = {release_cursor, RaftIdx, Dehydrated}, Cursors = lqueue:in(Cursor, Cursors0), State#?MODULE{release_cursors = Cursors} end; -maybe_store_dehydrated_state(RaftIdx, - #?MODULE{cfg = - #cfg{release_cursor_interval = C} = Cfg} - = State0) - when is_integer(C) -> - %% convert to new format - State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}}, - maybe_store_dehydrated_state(RaftIdx, State); maybe_store_dehydrated_state(_RaftIdx, State) -> State. @@ -1406,8 +1392,7 @@ evaluate_limit(Result, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; -evaluate_limit(Result, State00, Effects0) -> - State0 = convert_prefix_msgs(State00), +evaluate_limit(Result, State0, Effects0) -> case is_over_limit(State0) of true -> {State, Effects} = drop_head(State0, Effects0), @@ -1463,7 +1448,6 @@ take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) - {{'$prefix_msg', Header}, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}}; take_next_msg(#?MODULE{returns = Returns, - low_msg_num = Low0, messages = Messages0, prefix_msgs = {NumR, R, NumP, P}} = State) -> %% use peek rather than out there as the most likely case is an empty @@ -1473,21 +1457,11 @@ take_next_msg(#?MODULE{returns = Returns, {NextMsg, State#?MODULE{returns = lqueue:drop(Returns)}}; empty when P == [] -> - case Low0 of - undefined -> + case lqueue:out(Messages0) of + {empty, _} -> empty; - _ -> - {Msg, Messages} = maps:take(Low0, Messages0), - case maps:size(Messages) of - 0 -> - {{Low0, Msg}, - State#?MODULE{messages = Messages, - low_msg_num = undefined}}; - _ -> - {{Low0, Msg}, - State#?MODULE{messages = Messages, - low_msg_num = Low0 + 1}} - end + {{value, {_, _} = SeqMsg}, Messages} -> + {SeqMsg, State#?MODULE{messages = Messages }} end; empty -> [Msg | Rem] = P, @@ -1584,7 +1558,7 @@ checkout_one(#?MODULE{service_queue = SQ0, {nochange, InitState} end; empty -> - case maps:size(Messages0) of + case lqueue:len(Messages0) of 0 -> {nochange, InitState}; _ -> {inactive, InitState} end @@ -1675,18 +1649,13 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, ServiceQueue0 end. -convert_prefix_msgs(#?MODULE{prefix_msgs = {R, P}} = State) -> - State#?MODULE{prefix_msgs = {length(R), R, length(P), P}}; -convert_prefix_msgs(State) -> - State. - %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point dehydrate_state(#?MODULE{messages = Messages, consumers = Consumers, returns = Returns, - low_msg_num = Low, - next_msg_num = Next, + % low_msg_num = Low, + % next_msg_num = Next, prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0}, waiting_consumers = Waiting0} = State) -> RCnt = lqueue:len(Returns), @@ -1703,34 +1672,34 @@ dehydrate_state(#?MODULE{messages = Messages, [], lqueue:to_list(Returns)), PrefRet = PrefRet0 ++ PrefRet1, - PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []), + PrefMsgsSuff = dehydrate_messages(Messages, []), %% prefix messages are not populated in normal operation only after %% recovering from a snapshot PrefMsgs = PrefMsg0 ++ PrefMsgsSuff, Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0], - State#?MODULE{messages = #{}, + State#?MODULE{messages = lqueue:new(), ra_indexes = rabbit_fifo_index:empty(), release_cursors = lqueue:new(), - low_msg_num = undefined, + % low_msg_num = undefined, consumers = maps:map(fun (_, C) -> dehydrate_consumer(C) end, Consumers), returns = lqueue:new(), prefix_msgs = {PRCnt + RCnt, PrefRet, - PPCnt + maps:size(Messages), PrefMsgs}, + PPCnt + lqueue:len(Messages), PrefMsgs}, waiting_consumers = Waiting}. -dehydrate_messages(Low, Next, _Msgs, Acc) - when Next < Low -> - Acc; -dehydrate_messages(Low, Next, Msgs, Acc0) -> - Acc = case maps:get(Next, Msgs) of - {_RaftIdx, {_, 'empty'} = Msg} -> - [Msg | Acc0]; - {_RaftIdx, {Header, _}} -> - [Header | Acc0] - end, - dehydrate_messages(Low, Next - 1, Msgs, Acc). +%% TODO make body recursive to avoid lists:reverse +dehydrate_messages(Msgs0, Acc0) -> + {OutRes, Msgs} = lqueue:out(Msgs0), + case OutRes of + {value, {_, 'empty'} = Msg} -> + dehydrate_messages(Msgs, [Msg | Acc0]); + {value, {Header, _}} -> + dehydrate_messages(Msgs, [Header | Acc0]); + empty -> + lists:reverse(Acc0) + end. dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index ebbaa9e1eb..ec93f480dd 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -110,9 +110,7 @@ -record(cfg, {name :: atom(), resource :: rabbit_types:r('queue'), - release_cursor_interval :: - undefined | non_neg_integer() | - {non_neg_integer(), non_neg_integer()}, + release_cursor_interval :: option({non_neg_integer(), non_neg_integer()}), dead_letter_handler :: option(applied_mfa()), become_leader_handler :: option(applied_mfa()), max_length :: option(non_neg_integer()), @@ -132,7 +130,7 @@ -record(rabbit_fifo, {cfg :: #cfg{}, % unassigned messages - messages = #{} :: #{msg_in_id() => indexed_msg()}, + messages = lqueue:new() :: lqueue:queue(), % defines the lowest message in id available in the messages map % that isn't a return low_msg_num :: option(msg_in_id()), diff --git a/src/rabbit_fifo_v0.erl b/src/rabbit_fifo_v0.erl index 01330fe54f..52706aec1f 100644 --- a/src/rabbit_fifo_v0.erl +++ b/src/rabbit_fifo_v0.erl @@ -53,6 +53,9 @@ %% misc dehydrate_state/1, normalize/1, + normalize_for_v1/1, + %% getters for coversions + messages_map/1, %% protocol helpers make_enqueue/3, @@ -124,6 +127,7 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> + rabbit_log:info("rabbit_fifo: init v0 ~p", [Conf]), update_config(Conf, #?MODULE{cfg = #cfg{name = Name, resource = Resource}}). @@ -1754,6 +1758,22 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength, messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). +normalize_for_v1(#?MODULE{cfg = Cfg} = State) -> + %% run all v0 conversions so that v1 does not have to have this code + RCI = case Cfg of + #cfg{release_cursor_interval = {_, _} = R} -> + R; + #cfg{release_cursor_interval = undefined} -> + {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY}; + #cfg{release_cursor_interval = C} -> + {?RELEASE_CURSOR_EVERY, C} + end, + convert_prefix_msgs( + State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI}}). + +messages_map(#?MODULE{messages = Messages}) -> + Messages. + -spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol(). make_enqueue(Pid, Seq, Msg) -> #enqueue{pid = Pid, seq = Seq, msg = Msg}. |
