summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_queue_consumers.erl')
-rw-r--r--deps/rabbit/src/rabbit_queue_consumers.erl568
1 files changed, 568 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_queue_consumers.erl b/deps/rabbit/src/rabbit_queue_consumers.erl
new file mode 100644
index 0000000000..4f826f72e8
--- /dev/null
+++ b/deps/rabbit/src/rabbit_queue_consumers.erl
@@ -0,0 +1,568 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_queue_consumers).
+
+-export([new/0, max_active_priority/1, inactive/1, all/1, all/3, count/0,
+ unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
+ send_drained/0, deliver/5, record_ack/3, subtract_acks/3,
+ possibly_unblock/3,
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
+ credit/6, utilisation/1, is_same/3, get_consumer/1, get/3,
+ consumer_tag/1, get_infos/1]).
+
+%%----------------------------------------------------------------------------
+
+-define(QUEUE, lqueue).
+
+-define(UNSENT_MESSAGE_LIMIT, 200).
+
+%% Utilisation average calculations are all in μs.
+-define(USE_AVG_HALF_LIFE, 1000000.0).
+
+-record(state, {consumers, use}).
+
+-record(consumer, {tag, ack_required, prefetch, args, user}).
+
+%% These are held in our process dictionary
+-record(cr, {ch_pid,
+ monitor_ref,
+ acktags,
+ consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked (rate/prefetch limited) for any reason
+ blocked_consumers,
+ %% The limiter itself
+ limiter,
+ %% Internal flow control for queue -> writer
+ unsent_message_count}).
+
+%%----------------------------------------------------------------------------
+
+-type time_micros() :: non_neg_integer().
+-type ratio() :: float().
+-type state() :: #state{consumers ::priority_queue:q(),
+ use :: {'inactive',
+ time_micros(), time_micros(), ratio()} |
+ {'active', time_micros(), ratio()}}.
+-type consumer() :: #consumer{tag::rabbit_types:ctag(), ack_required::boolean(),
+ prefetch::non_neg_integer(), args::rabbit_framing:amqp_table(),
+ user::rabbit_types:username()}.
+-type ch() :: pid().
+-type ack() :: non_neg_integer().
+-type cr_fun() :: fun ((#cr{}) -> #cr{}).
+-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
+
+%%----------------------------------------------------------------------------
+
+-spec new() -> state().
+
+new() -> #state{consumers = priority_queue:new(),
+ use = {active,
+ erlang:monotonic_time(micro_seconds),
+ 1.0}}.
+
+-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
+
+max_active_priority(#state{consumers = Consumers}) ->
+ priority_queue:highest(Consumers).
+
+-spec inactive(state()) -> boolean().
+
+inactive(#state{consumers = Consumers}) ->
+ priority_queue:is_empty(Consumers).
+
+-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
+ non_neg_integer(), boolean(), atom(),
+ rabbit_framing:amqp_table(), rabbit_types:username()}].
+
+all(State) ->
+ all(State, none, false).
+
+all(#state{consumers = Consumers}, SingleActiveConsumer, SingleActiveConsumerOn) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) end,
+ consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, []), all_ch_record()).
+
+consumers(Consumers, SingleActiveConsumer, SingleActiveConsumerOn, Acc) ->
+ ActiveActivityStatusFun = case SingleActiveConsumerOn of
+ true ->
+ fun({ChPid, Consumer}) ->
+ case SingleActiveConsumer of
+ {ChPid, Consumer} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end
+ end;
+ false ->
+ fun(_) -> {true, up} end
+ end,
+ priority_queue:fold(
+ fun ({ChPid, Consumer}, _P, Acc1) ->
+ #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
+ args = Args, user = Username} = Consumer,
+ {Active, ActivityStatus} = ActiveActivityStatusFun({ChPid, Consumer}),
+ [{ChPid, CTag, Ack, Prefetch, Active, ActivityStatus, Args, Username} | Acc1]
+ end, Acc, Consumers).
+
+-spec count() -> non_neg_integer().
+
+count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
+
+-spec unacknowledged_message_count() -> non_neg_integer().
+
+unacknowledged_message_count() ->
+ lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]).
+
+-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
+ non_neg_integer(), rabbit_framing:amqp_table(), boolean(),
+ rabbit_types:username(), state())
+ -> state().
+
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
+ Username, State = #state{consumers = Consumers,
+ use = CUInfo}) ->
+ C = #cr{consumer_count = Count,
+ limiter = Limiter} = ch_record(ChPid, LimiterPid),
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
+ update_ch_record(
+ case parse_credit_args(Prefetch, Args) of
+ {0, auto} -> C1;
+ {_Credit, auto} when NoAck -> C1;
+ {Credit, Mode} -> credit_and_drain(
+ C1, CTag, Credit, Mode, IsEmpty)
+ end),
+ Consumer = #consumer{tag = CTag,
+ ack_required = not NoAck,
+ prefetch = Prefetch,
+ args = Args,
+ user = Username},
+ State#state{consumers = add_consumer({ChPid, Consumer}, Consumers),
+ use = update_use(CUInfo, active)}.
+
+-spec remove(ch(), rabbit_types:ctag(), state()) ->
+ 'not_found' | state().
+
+remove(ChPid, CTag, State = #state{consumers = Consumers}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{consumer_count = Count,
+ limiter = Limiter,
+ blocked_consumers = Blocked} ->
+ Blocked1 = remove_consumer(ChPid, CTag, Blocked),
+ Limiter1 = case Count of
+ 1 -> rabbit_limiter:deactivate(Limiter);
+ _ -> Limiter
+ end,
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag),
+ update_ch_record(C#cr{consumer_count = Count - 1,
+ limiter = Limiter2,
+ blocked_consumers = Blocked1}),
+ State#state{consumers =
+ remove_consumer(ChPid, CTag, Consumers)}
+ end.
+
+-spec erase_ch(ch(), state()) ->
+ 'not_found' | {[ack()], [rabbit_types:ctag()],
+ state()}.
+
+erase_ch(ChPid, State = #state{consumers = Consumers}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ blocked_consumers = BlockedQ} ->
+ All = priority_queue:join(Consumers, BlockedQ),
+ ok = erase_ch_record(C),
+ Filtered = priority_queue:filter(chan_pred(ChPid, true), All),
+ {[AckTag || {AckTag, _CTag} <- ?QUEUE:to_list(ChAckTags)],
+ tags(priority_queue:to_list(Filtered)),
+ State#state{consumers = remove_consumers(ChPid, Consumers)}}
+ end.
+
+-spec send_drained() -> 'ok'.
+
+send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
+ ok.
+
+-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
+ rabbit_amqqueue:name(), state(), boolean(),
+ none | {ch(), rabbit_types:ctag()} | {ch(), consumer()}) ->
+ {'delivered', boolean(), T, state()} |
+ {'undelivered', boolean(), state()}.
+
+deliver(FetchFun, QName, State, SingleActiveConsumerIsOn, ActiveConsumer) ->
+ deliver(FetchFun, QName, false, State, SingleActiveConsumerIsOn, ActiveConsumer).
+
+deliver(_FetchFun, _QName, false, State, true, none) ->
+ {undelivered, false,
+ State#state{use = update_use(State#state.use, inactive)}};
+deliver(FetchFun, QName, false, State = #state{consumers = Consumers}, true, SingleActiveConsumer) ->
+ {ChPid, Consumer} = SingleActiveConsumer,
+ %% blocked (rate/prefetch limited) consumers are removed from the queue state, but not the exclusive_consumer field,
+ %% so we need to do this check to avoid adding the exclusive consumer to the channel record
+ %% over and over
+ case is_blocked(SingleActiveConsumer) of
+ true ->
+ {undelivered, false,
+ State#state{use = update_use(State#state.use, inactive)}};
+ false ->
+ case deliver_to_consumer(FetchFun, SingleActiveConsumer, QName) of
+ {delivered, R} ->
+ {delivered, false, R, State};
+ undelivered ->
+ {ChPid, Consumer} = SingleActiveConsumer,
+ Consumers1 = remove_consumer(ChPid, Consumer#consumer.tag, Consumers),
+ {undelivered, true,
+ State#state{consumers = Consumers1, use = update_use(State#state.use, inactive)}}
+ end
+ end;
+deliver(FetchFun, QName, ConsumersChanged,
+ State = #state{consumers = Consumers}, false, _SingleActiveConsumer) ->
+ case priority_queue:out_p(Consumers) of
+ {empty, _} ->
+ {undelivered, ConsumersChanged,
+ State#state{use = update_use(State#state.use, inactive)}};
+ {{value, QEntry, Priority}, Tail} ->
+ case deliver_to_consumer(FetchFun, QEntry, QName) of
+ {delivered, R} ->
+ {delivered, ConsumersChanged, R,
+ State#state{consumers = priority_queue:in(QEntry, Priority,
+ Tail)}};
+ undelivered ->
+ deliver(FetchFun, QName, true,
+ State#state{consumers = Tail}, false, _SingleActiveConsumer)
+ end
+ end.
+
+deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
+ C = lookup_ch(ChPid),
+ case is_ch_blocked(C) of
+ true ->
+ block_consumer(C, E),
+ undelivered;
+ false -> case rabbit_limiter:can_send(C#cr.limiter,
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag) of
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
+ undelivered;
+ {continue, Limiter} ->
+ {delivered, deliver_to_consumer(
+ FetchFun, Consumer,
+ C#cr{limiter = Limiter}, QName)}
+ end
+ end.
+
+deliver_to_consumer(FetchFun,
+ #consumer{tag = CTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ QName) ->
+ {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
+ rabbit_channel:deliver(ChPid, CTag, AckRequired,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ ChAckTags1 = case AckRequired of
+ true -> ?QUEUE:in({AckTag, CTag}, ChAckTags);
+ false -> ChAckTags
+ end,
+ update_ch_record(C#cr{acktags = ChAckTags1,
+ unsent_message_count = Count + 1}),
+ R.
+
+is_blocked(Consumer = {ChPid, _C}) ->
+ #cr{blocked_consumers = BlockedConsumers} = lookup_ch(ChPid),
+ priority_queue:member(Consumer, BlockedConsumers).
+
+-spec record_ack(ch(), pid(), ack()) -> 'ok'.
+
+record_ack(ChPid, LimiterPid, AckTag) ->
+ C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
+ update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}),
+ ok.
+
+-spec subtract_acks(ch(), [ack()], state()) ->
+ 'not_found' | 'unchanged' | {'unblocked', state()}.
+
+subtract_acks(ChPid, AckTags, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ not_found;
+ C = #cr{acktags = ChAckTags, limiter = Lim} ->
+ {CTagCounts, AckTags2} = subtract_acks(
+ AckTags, [], maps:new(), ChAckTags),
+ {Unblocked, Lim2} =
+ maps:fold(
+ fun (CTag, Count, {UnblockedN, LimN}) ->
+ {Unblocked1, LimN1} =
+ rabbit_limiter:ack_from_queue(LimN, CTag, Count),
+ {UnblockedN orelse Unblocked1, LimN1}
+ end, {false, Lim}, CTagCounts),
+ C2 = C#cr{acktags = AckTags2, limiter = Lim2},
+ case Unblocked of
+ true -> unblock(C2, State);
+ false -> update_ch_record(C2),
+ unchanged
+ end
+ end.
+
+subtract_acks([], [], CTagCounts, AckQ) ->
+ {CTagCounts, AckQ};
+subtract_acks([], Prefix, CTagCounts, AckQ) ->
+ {CTagCounts, ?QUEUE:join(?QUEUE:from_list(lists:reverse(Prefix)), AckQ)};
+subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
+ case ?QUEUE:out(AckQ) of
+ {{value, {T, CTag}}, QTail} ->
+ subtract_acks(TL, Prefix,
+ maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail);
+ {{value, V}, QTail} ->
+ subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail);
+ {empty, _} ->
+ subtract_acks([], Prefix, CTagCounts, AckQ)
+ end.
+
+-spec possibly_unblock(cr_fun(), ch(), state()) ->
+ 'unchanged' | {'unblocked', state()}.
+
+possibly_unblock(Update, ChPid, State) ->
+ case lookup_ch(ChPid) of
+ not_found -> unchanged;
+ C -> C1 = Update(C),
+ case is_ch_blocked(C) andalso not is_ch_blocked(C1) of
+ false -> update_ch_record(C1),
+ unchanged;
+ true -> unblock(C1, State)
+ end
+ end.
+
+unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
+ State = #state{consumers = Consumers, use = Use}) ->
+ case lists:partition(
+ fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
+ rabbit_limiter:is_consumer_blocked(Limiter, CTag)
+ end, priority_queue:to_list(BlockedQ)) of
+ {_, []} ->
+ update_ch_record(C),
+ unchanged;
+ {Blocked, Unblocked} ->
+ BlockedQ1 = priority_queue:from_list(Blocked),
+ UnblockedQ = priority_queue:from_list(Unblocked),
+ update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
+ {unblocked,
+ State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
+ use = update_use(Use, active)}}
+ end.
+
+-spec resume_fun() -> cr_fun().
+
+resume_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:resume(Limiter)}
+ end.
+
+-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
+
+notify_sent_fun(Credit) ->
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - Credit}
+ end.
+
+-spec activate_limit_fun() -> cr_fun().
+
+activate_limit_fun() ->
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
+ end.
+
+-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
+ state()) -> 'unchanged' | {'unblocked', state()}.
+
+credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ unchanged;
+ #cr{limiter = Limiter} = C ->
+ C1 = #cr{limiter = Limiter1} =
+ credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty),
+ case is_ch_blocked(C1) orelse
+ (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
+ rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
+ true -> update_ch_record(C1),
+ unchanged;
+ false -> unblock(C1, State)
+ end
+ end.
+
+drain_mode(true) -> drain;
+drain_mode(false) -> manual.
+
+-spec utilisation(state()) -> ratio().
+
+utilisation(#state{use = {active, Since, Avg}}) ->
+ use_avg(erlang:monotonic_time(micro_seconds) - Since, 0, Avg);
+utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
+ use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg).
+
+is_same(ChPid, ConsumerTag, {ChPid, #consumer{tag = ConsumerTag}}) ->
+ true;
+is_same(_ChPid, _ConsumerTag, _Consumer) ->
+ false.
+
+get_consumer(#state{consumers = Consumers}) ->
+ case priority_queue:out_p(Consumers) of
+ {{value, Consumer, _Priority}, _Tail} -> Consumer;
+ {empty, _} -> undefined
+ end.
+
+-spec get(ch(), rabbit_types:ctag(), state()) -> undefined | consumer().
+
+get(ChPid, ConsumerTag, #state{consumers = Consumers}) ->
+ Consumers1 = priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP == ChPid) and (CT == ConsumerTag)
+ end, Consumers),
+ case priority_queue:out_p(Consumers1) of
+ {empty, _} -> undefined;
+ {{value, Consumer, _Priority}, _Tail} -> Consumer
+ end.
+
+-spec get_infos(consumer()) -> term().
+
+get_infos(Consumer) ->
+ {Consumer#consumer.tag,Consumer#consumer.ack_required,
+ Consumer#consumer.prefetch, Consumer#consumer.args}.
+
+-spec consumer_tag(consumer()) -> rabbit_types:ctag().
+
+consumer_tag(#consumer{tag = CTag}) ->
+ CTag.
+
+
+
+%%----------------------------------------------------------------------------
+
+parse_credit_args(Default, Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, C}, {bool, D}} -> {C, drain_mode(D)};
+ _ -> {Default, auto}
+ end;
+ undefined -> {Default, auto}
+ end.
+
+lookup_ch(ChPid) ->
+ case get({ch, ChPid}) of
+ undefined -> not_found;
+ C -> C
+ end.
+
+ch_record(ChPid, LimiterPid) ->
+ Key = {ch, ChPid},
+ case get(Key) of
+ undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ Limiter = rabbit_limiter:client(LimiterPid),
+ C = #cr{ch_pid = ChPid,
+ monitor_ref = MonitorRef,
+ acktags = ?QUEUE:new(),
+ consumer_count = 0,
+ blocked_consumers = priority_queue:new(),
+ limiter = Limiter,
+ unsent_message_count = 0},
+ put(Key, C),
+ C;
+ C = #cr{} -> C
+ end.
+
+update_ch_record(C = #cr{consumer_count = ConsumerCount,
+ acktags = ChAckTags,
+ unsent_message_count = UnsentMessageCount}) ->
+ case {?QUEUE:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of
+ {true, 0, 0} -> ok = erase_ch_record(C);
+ _ -> ok = store_ch_record(C)
+ end,
+ C.
+
+store_ch_record(C = #cr{ch_pid = ChPid}) ->
+ put({ch, ChPid}, C),
+ ok.
+
+erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
+ erlang:demonitor(MonitorRef),
+ erase({ch, ChPid}),
+ ok.
+
+all_ch_record() -> [C || {{ch, _}, C} <- get()].
+
+block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
+ update_ch_record(C#cr{blocked_consumers = add_consumer(QEntry, Blocked)}).
+
+is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
+
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+ case rabbit_limiter:drained(Limiter) of
+ {[], Limiter} -> C;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ C#cr{limiter = Limiter2}
+ end.
+
+credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
+ CTag, Credit, Mode, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
+ {true, Limiter1} -> rabbit_channel:send_drained(ChPid,
+ [{CTag, Credit}]),
+ C#cr{limiter = Limiter1};
+ {false, Limiter1} -> C#cr{limiter = Limiter1}
+ end.
+
+tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList].
+
+add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->
+ Priority = case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_, P} -> P;
+ _ -> 0
+ end,
+ priority_queue:in({ChPid, Consumer}, Priority, Queue).
+
+remove_consumer(ChPid, CTag, Queue) ->
+ priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= CTag)
+ end, Queue).
+
+remove_consumers(ChPid, Queue) ->
+ priority_queue:filter(chan_pred(ChPid, false), Queue).
+
+chan_pred(ChPid, Want) ->
+ fun ({CP, _Consumer}) when CP =:= ChPid -> Want;
+ (_) -> not Want
+ end.
+
+update_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_use({active, Since, Avg}, inactive) ->
+ Now = erlang:monotonic_time(micro_seconds),
+ {inactive, Now, Now - Since, Avg};
+update_use({inactive, Since, Active, Avg}, active) ->
+ Now = erlang:monotonic_time(micro_seconds),
+ {active, Now, use_avg(Active, Now - Since, Avg)}.
+
+use_avg(0, 0, Avg) ->
+ Avg;
+use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).