summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-25 10:32:49 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-25 10:32:49 +0000
commitbe93004b3028c4b41e05b01b330a206aaa3e30ea (patch)
treead3e8b859e6afd97f344b1ed32760736f2e4d2ed /src
parent99ee5fb79f060d80833e4d26f4cd0896457f74b2 (diff)
downloadrabbitmq-server-git-be93004b3028c4b41e05b01b330a206aaa3e30ea.tar.gz
Restructure rabbit_fifo state
To keep all static and rarely changed data in a nested sub-structure. This will reduce gc pressure somewhat. [#163513253]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl125
1 files changed, 68 insertions, 57 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 39dbd8f3f1..26842d5c49 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -197,10 +197,21 @@
status = up :: up | suspected_down
}).
--record(state,
+-record(cfg,
{name :: atom(),
- queue_resource :: rabbit_types:r('queue'),
+ resource :: rabbit_types:r('queue'),
release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
+ dead_letter_handler :: maybe(applied_mfa()),
+ become_leader_handler :: maybe(applied_mfa()),
+ max_length :: maybe(non_neg_integer()),
+ max_bytes :: maybe(non_neg_integer()),
+ %% whether single active consumer is on or not for this queue
+ consumer_strategy = default :: default | single_active,
+ delivery_limit :: maybe(non_neg_integer())
+ }).
+
+-record(state,
+ {cfg :: #cfg{},
% unassigned messages
messages = #{} :: #{msg_in_id() => indexed_msg()},
% defines the lowest message in id available in the messages map
@@ -232,8 +243,6 @@
% consumers that require further service are queued here
% needs to be part of snapshot
service_queue = queue:new() :: queue:queue(consumer_id()),
- dead_letter_handler :: maybe(applied_mfa()),
- become_leader_handler :: maybe(applied_mfa()),
%% This is a special field that is only used for snapshots
%% It represents the queued messages at the time the
%% dehydrated snapshot state was cached.
@@ -247,14 +256,9 @@
PrefixMsgs :: [msg_header()]},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
- max_length :: maybe(non_neg_integer()),
- max_bytes :: maybe(non_neg_integer()),
- %% whether single active consumer is on or not for this queue
- consumer_strategy = default :: default | single_active,
%% waiting consumers, one is picked active consumer is cancelled or dies
%% used only when single active consumer is on
- waiting_consumers = [] :: [{consumer_id(), consumer()}],
- delivery_limit :: maybe(non_neg_integer())
+ waiting_consumers = [] :: [{consumer_id(), consumer()}]
}).
-opaque state() :: #state{}.
@@ -287,8 +291,8 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
- update_config(Conf, #state{name = Name,
- queue_resource = Resource}).
+ update_config(Conf, #state{cfg = #cfg{name = Name,
+ resource = Resource}}).
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
@@ -303,13 +307,14 @@ update_config(Conf, State) ->
false ->
default
end,
- State#state{dead_letter_handler = DLH,
- become_leader_handler = BLH,
- release_cursor_interval = SHI,
- max_length = MaxLength,
- max_bytes = MaxBytes,
- consumer_strategy = ConsumerStrategy,
- delivery_limit = DeliveryLimit}.
+ Cfg = State#state.cfg,
+ State#state{cfg = Cfg#cfg{release_cursor_interval = SHI,
+ dead_letter_handler = DLH,
+ become_leader_handler = BLH,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
+ consumer_strategy = ConsumerStrategy,
+ delivery_limit = DeliveryLimit}}.
zero(_) ->
0.
@@ -562,25 +567,25 @@ apply(_, {nodedown, _Node}, State) ->
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []).
-consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
+consumer_active_flag_update_function(#state{cfg = #cfg{consumer_strategy = default}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
ActivityStatus, Effects)
end;
-consumer_active_flag_update_function(#state{consumer_strategy = single_active}) ->
+consumer_active_flag_update_function(#state{cfg = #cfg{consumer_strategy = single_active}}) ->
fun(_, _, _, _, _, Effects) ->
Effects
end.
handle_waiting_consumer_down(_Pid,
- #state{consumer_strategy = default} = State) ->
+ #state{cfg = #cfg{consumer_strategy = default}} = State) ->
{[], State};
handle_waiting_consumer_down(_Pid,
- #state{consumer_strategy = single_active,
+ #state{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State) ->
{[], State};
handle_waiting_consumer_down(Pid,
- #state{consumer_strategy = single_active,
+ #state{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
% get cancel effects for down waiting consumers
Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
@@ -595,16 +600,16 @@ handle_waiting_consumer_down(Pid,
State = State0#state{waiting_consumers = StillUp},
{Effects, State}.
-update_waiting_consumer_status(_Node, #state{consumer_strategy = default},
+update_waiting_consumer_status(_Node, #state{cfg = #cfg{consumer_strategy = default}},
_Status) ->
[];
update_waiting_consumer_status(_Node,
- #state{consumer_strategy = single_active,
+ #state{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []},
_Status) ->
[];
update_waiting_consumer_status(Node,
- #state{consumer_strategy = single_active,
+ #state{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers},
Status) ->
[begin
@@ -621,9 +626,10 @@ update_waiting_consumer_status(Node,
state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
- name = Name,
- prefix_msgs = {[], []},
- become_leader_handler = BLH}) ->
+ cfg = #cfg{name = Name,
+ become_leader_handler = BLH},
+ prefix_msgs = {[], []}
+ }) ->
% return effects to monitor all current consumers and enqueuers
Pids = lists:usort(maps:keys(Enqs)
++ [P || {_, P} <- maps:keys(Cons)]
@@ -657,8 +663,8 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
-tick(_Ts, #state{name = Name,
- queue_resource = QName,
+tick(_Ts, #state{cfg = #cfg{name = Name,
+ resource = QName},
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
@@ -745,7 +751,7 @@ query_consumer_count(#state{consumers = Consumers,
query_consumers(#state{consumers = Consumers,
waiting_consumers = WaitingConsumers,
- consumer_strategy = ConsumerStrategy } = State) ->
+ cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
ActiveActivityStatusFun =
case ConsumerStrategy of
default ->
@@ -803,7 +809,7 @@ query_consumers(#state{consumers = Consumers,
end, #{}, WaitingConsumers),
maps:merge(FromConsumers, FromWaitingConsumers).
-query_single_active_consumer(#state{consumer_strategy = single_active,
+query_single_active_consumer(#state{cfg = #cfg{consumer_strategy = single_active},
consumers = Consumers}) ->
case maps:size(Consumers) of
0 ->
@@ -874,18 +880,18 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- #state{consumer_strategy = default} = State, Effects, Reason) ->
+ #state{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) ->
%% general case, single active consumer off
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
- #state{consumer_strategy = single_active,
+ #state{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = []} = State,
Effects, Reason) ->
%% single active consumer on, no consumers are waiting
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#state{consumers = Cons0,
- consumer_strategy = single_active,
+ cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0,
Effects0, Reason) ->
%% single active consumer on, consumers are waiting
@@ -909,8 +915,8 @@ cancel_consumer(ConsumerId,
service_queue = ServiceQueue1,
waiting_consumers = RemainingWaitingConsumers},
Effects = consumer_update_active_effects(State, NewActiveConsumerId,
- NewActiveConsumer, true,
- single_active, Effects2),
+ NewActiveConsumer, true,
+ single_active, Effects2),
{State, Effects};
error ->
% The cancelled consumer is not the active one
@@ -923,7 +929,7 @@ cancel_consumer(ConsumerId,
{State0#state{waiting_consumers = WaitingConsumers}, Effects}
end.
-consumer_update_active_effects(#state{queue_resource = QName },
+consumer_update_active_effects(#state{cfg = #cfg{resource = QName}},
ConsumerId, #consumer{meta = Meta},
Active, ActivityStatus,
Effects) ->
@@ -1015,8 +1021,9 @@ append_to_master_index(RaftIdx,
Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
State#state{ra_indexes = Indexes}.
+
incr_enqueue_count(#state{enqueue_count = C,
- release_cursor_interval = C} = State0) ->
+ cfg = #cfg{release_cursor_interval = C}} = State0) ->
% this will trigger a dehydrated version of the state to be stored
% at this raft index for potential future snapshot generation
State0#state{enqueue_count = 0};
@@ -1151,17 +1158,19 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
- #state{dead_letter_handler = undefined},
+ #state{cfg = #cfg{dead_letter_handler = undefined}},
Effects) ->
Effects;
dead_letter_effects(Reason, Discarded,
- #state{dead_letter_handler = {Mod, Fun, Args}}, Effects) ->
+ #state{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
+ Effects) ->
DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) ->
[{Reason, Msg} | Acc]
end, [], Discarded),
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
-cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
+cancel_consumer_effects(ConsumerId,
+ #state{cfg = #cfg{resource = QName}}, Effects) ->
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
@@ -1205,7 +1214,7 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
return_one(0, {'$prefix_msg', Header0},
#state{returns = Returns,
- delivery_limit = DeliveryLimit} = State0, Effects0,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, Effects0,
ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
@@ -1225,7 +1234,7 @@ return_one(0, {'$prefix_msg', Header0},
end;
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
#state{returns = Returns,
- delivery_limit = DeliveryLimit} = State0,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
@@ -1243,7 +1252,9 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
_ ->
%% this should not affect the release cursor in any way
{add_bytes_return(RawMsg,
- State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0}
+ State0#state{returns =
+ lqueue:in({MsgNum, Msg}, Returns)}),
+ Effects0}
end.
return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
@@ -1286,8 +1297,8 @@ checkout0({Activity, State0}, Effects0, Acc) ->
{State0, ok, lists:reverse(Effects1)}.
evaluate_limit(_OldIndexes, Result,
- #state{max_length = undefined,
- max_bytes = undefined} = State,
+ #state{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
evaluate_limit(OldIndexes, Result,
@@ -1453,17 +1464,17 @@ uniq_queue_in(Key, Queue) ->
end.
update_consumer(ConsumerId, Meta, Spec,
- #state{consumer_strategy = default} = State0) ->
+ #state{cfg = #cfg{consumer_strategy = default}} = State0) ->
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
#state{consumers = Cons0,
- consumer_strategy = single_active} = State0)
+ cfg = #cfg{consumer_strategy = single_active}} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumer_strategy = single_active,
+ #state{cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
@@ -1544,11 +1555,11 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
normalize(#state{release_cursors = Cursors} = State) ->
State#state{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
-is_over_limit(#state{max_length = undefined,
- max_bytes = undefined}) ->
+is_over_limit(#state{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
false;
-is_over_limit(#state{max_length = MaxLength,
- max_bytes = MaxBytes,
+is_over_limit(#state{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
msg_bytes_enqueue = BytesEnq} = State) ->
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
@@ -2348,7 +2359,7 @@ single_active_consumer_test() ->
atom_to_binary(?FUNCTION_NAME, utf8)),
release_cursor_interval => 0,
single_active_consumer_on => true}),
- ?assertEqual(single_active, State0#state.consumer_strategy),
+ ?assertEqual(single_active, State0#state.cfg#cfg.consumer_strategy),
?assertEqual(0, map_size(State0#state.consumers)),
% adding some consumers