diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-25 10:32:49 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-25 10:32:49 +0000 |
| commit | be93004b3028c4b41e05b01b330a206aaa3e30ea (patch) | |
| tree | ad3e8b859e6afd97f344b1ed32760736f2e4d2ed /src | |
| parent | 99ee5fb79f060d80833e4d26f4cd0896457f74b2 (diff) | |
| download | rabbitmq-server-git-be93004b3028c4b41e05b01b330a206aaa3e30ea.tar.gz | |
Restructure rabbit_fifo state
To keep all static and rarely changed data in a nested sub-structure.
This will reduce gc pressure somewhat.
[#163513253]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 125 |
1 files changed, 68 insertions, 57 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 39dbd8f3f1..26842d5c49 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -197,10 +197,21 @@ status = up :: up | suspected_down }). --record(state, +-record(cfg, {name :: atom(), - queue_resource :: rabbit_types:r('queue'), + resource :: rabbit_types:r('queue'), release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(), + dead_letter_handler :: maybe(applied_mfa()), + become_leader_handler :: maybe(applied_mfa()), + max_length :: maybe(non_neg_integer()), + max_bytes :: maybe(non_neg_integer()), + %% whether single active consumer is on or not for this queue + consumer_strategy = default :: default | single_active, + delivery_limit :: maybe(non_neg_integer()) + }). + +-record(state, + {cfg :: #cfg{}, % unassigned messages messages = #{} :: #{msg_in_id() => indexed_msg()}, % defines the lowest message in id available in the messages map @@ -232,8 +243,6 @@ % consumers that require further service are queued here % needs to be part of snapshot service_queue = queue:new() :: queue:queue(consumer_id()), - dead_letter_handler :: maybe(applied_mfa()), - become_leader_handler :: maybe(applied_mfa()), %% This is a special field that is only used for snapshots %% It represents the queued messages at the time the %% dehydrated snapshot state was cached. @@ -247,14 +256,9 @@ PrefixMsgs :: [msg_header()]}, msg_bytes_enqueue = 0 :: non_neg_integer(), msg_bytes_checkout = 0 :: non_neg_integer(), - max_length :: maybe(non_neg_integer()), - max_bytes :: maybe(non_neg_integer()), - %% whether single active consumer is on or not for this queue - consumer_strategy = default :: default | single_active, %% waiting consumers, one is picked active consumer is cancelled or dies %% used only when single active consumer is on - waiting_consumers = [] :: [{consumer_id(), consumer()}], - delivery_limit :: maybe(non_neg_integer()) + waiting_consumers = [] :: [{consumer_id(), consumer()}] }). -opaque state() :: #state{}. @@ -287,8 +291,8 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> - update_config(Conf, #state{name = Name, - queue_resource = Resource}). + update_config(Conf, #state{cfg = #cfg{name = Name, + resource = Resource}}). update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), @@ -303,13 +307,14 @@ update_config(Conf, State) -> false -> default end, - State#state{dead_letter_handler = DLH, - become_leader_handler = BLH, - release_cursor_interval = SHI, - max_length = MaxLength, - max_bytes = MaxBytes, - consumer_strategy = ConsumerStrategy, - delivery_limit = DeliveryLimit}. + Cfg = State#state.cfg, + State#state{cfg = Cfg#cfg{release_cursor_interval = SHI, + dead_letter_handler = DLH, + become_leader_handler = BLH, + max_length = MaxLength, + max_bytes = MaxBytes, + consumer_strategy = ConsumerStrategy, + delivery_limit = DeliveryLimit}}. zero(_) -> 0. @@ -562,25 +567,25 @@ apply(_, {nodedown, _Node}, State) -> apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). -consumer_active_flag_update_function(#state{consumer_strategy = default}) -> +consumer_active_flag_update_function(#state{cfg = #cfg{consumer_strategy = default}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) end; -consumer_active_flag_update_function(#state{consumer_strategy = single_active}) -> +consumer_active_flag_update_function(#state{cfg = #cfg{consumer_strategy = single_active}}) -> fun(_, _, _, _, _, Effects) -> Effects end. handle_waiting_consumer_down(_Pid, - #state{consumer_strategy = default} = State) -> + #state{cfg = #cfg{consumer_strategy = default}} = State) -> {[], State}; handle_waiting_consumer_down(_Pid, - #state{consumer_strategy = single_active, + #state{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []} = State) -> {[], State}; handle_waiting_consumer_down(Pid, - #state{consumer_strategy = single_active, + #state{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0) -> % get cancel effects for down waiting consumers Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end, @@ -595,16 +600,16 @@ handle_waiting_consumer_down(Pid, State = State0#state{waiting_consumers = StillUp}, {Effects, State}. -update_waiting_consumer_status(_Node, #state{consumer_strategy = default}, +update_waiting_consumer_status(_Node, #state{cfg = #cfg{consumer_strategy = default}}, _Status) -> []; update_waiting_consumer_status(_Node, - #state{consumer_strategy = single_active, + #state{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []}, _Status) -> []; update_waiting_consumer_status(Node, - #state{consumer_strategy = single_active, + #state{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers}, Status) -> [begin @@ -621,9 +626,10 @@ update_waiting_consumer_status(Node, state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, waiting_consumers = WaitingConsumers, - name = Name, - prefix_msgs = {[], []}, - become_leader_handler = BLH}) -> + cfg = #cfg{name = Name, + become_leader_handler = BLH}, + prefix_msgs = {[], []} + }) -> % return effects to monitor all current consumers and enqueuers Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)] @@ -657,8 +663,8 @@ state_enter(_, _) -> -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). -tick(_Ts, #state{name = Name, - queue_resource = QName, +tick(_Ts, #state{cfg = #cfg{name = Name, + resource = QName}, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> Metrics = {Name, @@ -745,7 +751,7 @@ query_consumer_count(#state{consumers = Consumers, query_consumers(#state{consumers = Consumers, waiting_consumers = WaitingConsumers, - consumer_strategy = ConsumerStrategy } = State) -> + cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = case ConsumerStrategy of default -> @@ -803,7 +809,7 @@ query_consumers(#state{consumers = Consumers, end, #{}, WaitingConsumers), maps:merge(FromConsumers, FromWaitingConsumers). -query_single_active_consumer(#state{consumer_strategy = single_active, +query_single_active_consumer(#state{cfg = #cfg{consumer_strategy = single_active}, consumers = Consumers}) -> case maps:size(Consumers) of 0 -> @@ -874,18 +880,18 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #state{consumer_strategy = default} = State, Effects, Reason) -> + #state{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) -> %% general case, single active consumer off cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, - #state{consumer_strategy = single_active, + #state{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = []} = State, Effects, Reason) -> %% single active consumer on, no consumers are waiting cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #state{consumers = Cons0, - consumer_strategy = single_active, + cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0, Effects0, Reason) -> %% single active consumer on, consumers are waiting @@ -909,8 +915,8 @@ cancel_consumer(ConsumerId, service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, Effects = consumer_update_active_effects(State, NewActiveConsumerId, - NewActiveConsumer, true, - single_active, Effects2), + NewActiveConsumer, true, + single_active, Effects2), {State, Effects}; error -> % The cancelled consumer is not the active one @@ -923,7 +929,7 @@ cancel_consumer(ConsumerId, {State0#state{waiting_consumers = WaitingConsumers}, Effects} end. -consumer_update_active_effects(#state{queue_resource = QName }, +consumer_update_active_effects(#state{cfg = #cfg{resource = QName}}, ConsumerId, #consumer{meta = Meta}, Active, ActivityStatus, Effects) -> @@ -1015,8 +1021,9 @@ append_to_master_index(RaftIdx, Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0), State#state{ra_indexes = Indexes}. + incr_enqueue_count(#state{enqueue_count = C, - release_cursor_interval = C} = State0) -> + cfg = #cfg{release_cursor_interval = C}} = State0) -> % this will trigger a dehydrated version of the state to be stored % at this raft index for potential future snapshot generation State0#state{enqueue_count = 0}; @@ -1151,17 +1158,19 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, - #state{dead_letter_handler = undefined}, + #state{cfg = #cfg{dead_letter_handler = undefined}}, Effects) -> Effects; dead_letter_effects(Reason, Discarded, - #state{dead_letter_handler = {Mod, Fun, Args}}, Effects) -> + #state{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}}, + Effects) -> DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) -> [{Reason, Msg} | Acc] end, [], Discarded), [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects]. -cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) -> +cancel_consumer_effects(ConsumerId, + #state{cfg = #cfg{resource = QName}}, Effects) -> [{mod_call, rabbit_quorum_queue, cancel_consumer_handler, [QName, ConsumerId]} | Effects]. @@ -1205,7 +1214,7 @@ find_next_cursor(Smallest, Cursors0, Potential) -> return_one(0, {'$prefix_msg', Header0}, #state{returns = Returns, - delivery_limit = DeliveryLimit} = State0, Effects0, + cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId, Con) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, @@ -1225,7 +1234,7 @@ return_one(0, {'$prefix_msg', Header0}, end; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, #state{returns = Returns, - delivery_limit = DeliveryLimit} = State0, + cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0, ConsumerId, Con) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, @@ -1243,7 +1252,9 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, _ -> %% this should not affect the release cursor in any way {add_bytes_return(RawMsg, - State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} + State0#state{returns = + lqueue:in({MsgNum, Msg}, Returns)}), + Effects0} end. return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> @@ -1286,8 +1297,8 @@ checkout0({Activity, State0}, Effects0, Acc) -> {State0, ok, lists:reverse(Effects1)}. evaluate_limit(_OldIndexes, Result, - #state{max_length = undefined, - max_bytes = undefined} = State, + #state{cfg = #cfg{max_length = undefined, + max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; evaluate_limit(OldIndexes, Result, @@ -1453,17 +1464,17 @@ uniq_queue_in(Key, Queue) -> end. update_consumer(ConsumerId, Meta, Spec, - #state{consumer_strategy = default} = State0) -> + #state{cfg = #cfg{consumer_strategy = default}} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, #state{consumers = Cons0, - consumer_strategy = single_active} = State0) + cfg = #cfg{consumer_strategy = single_active}} = State0) when map_size(Cons0) == 0 -> %% single active consumer on, no one is consuming yet update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, - #state{consumer_strategy = single_active, + #state{cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = WaitingConsumers0} = State0) -> %% single active consumer on and one active consumer already %% adding the new consumer to the waiting list @@ -1544,11 +1555,11 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> normalize(#state{release_cursors = Cursors} = State) -> State#state{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. -is_over_limit(#state{max_length = undefined, - max_bytes = undefined}) -> +is_over_limit(#state{cfg = #cfg{max_length = undefined, + max_bytes = undefined}}) -> false; -is_over_limit(#state{max_length = MaxLength, - max_bytes = MaxBytes, +is_over_limit(#state{cfg = #cfg{max_length = MaxLength, + max_bytes = MaxBytes}, msg_bytes_enqueue = BytesEnq} = State) -> messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes). @@ -2348,7 +2359,7 @@ single_active_consumer_test() -> atom_to_binary(?FUNCTION_NAME, utf8)), release_cursor_interval => 0, single_active_consumer_on => true}), - ?assertEqual(single_active, State0#state.consumer_strategy), + ?assertEqual(single_active, State0#state.cfg#cfg.consumer_strategy), ?assertEqual(0, map_size(State0#state.consumers)), % adding some consumers |
