summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl73
1 files changed, 43 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 69ae6a7e8a..48aa62ef4c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -62,7 +62,8 @@
-record(cb_info, {inactive,
active,
inactive_dur,
- active_dur}).
+ active_dur,
+ avg}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -497,15 +498,7 @@ deliver_msgs_to_consumers(DeliverFun, false,
consumer_bound_info = CBInfo}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- #cb_info{active = WhenActive,
- inactive = WhenInactive} = CBInfo,
- Now = erlang:now(),
- CBInfo1 = case timer:now_diff(WhenInactive, WhenActive) > 0 of
- true -> CBInfo;
- false -> CBInfo#cb_info{active_dur = timer:now_diff(Now, WhenActive),
- inactive = Now}
- end,
- {false, State#q{consumer_bound_info = CBInfo1}};
+ {false, State#q{consumer_bound_info = update_cb(CBInfo, inactive)}};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -556,6 +549,34 @@ deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
{Result, is_empty(State1), State1}.
+update_cb(#cb_info{active = WhenActive,
+ inactive = WhenInactive} = CBInfo, inactive) ->
+ case timer:now_diff(WhenInactive, WhenActive) > 0 of
+ true -> CBInfo;
+ false -> Now = erlang:now(),
+ CBInfo#cb_info{active_dur = timer:now_diff(Now, WhenActive),
+ inactive = Now}
+ end;
+update_cb(#cb_info{inactive = WhenInactive,
+ active = WhenActive,
+ active_dur = Active,
+ avg = Avg} = CBInfo, active) ->
+ case timer:now_diff(WhenActive, WhenInactive) > 0 of
+ true -> CBInfo;
+ false -> Now = erlang:now(),
+ Inactive = timer:now_diff(Now, WhenInactive),
+ Time = Inactive + Active,
+ Ratio = Active / Time,
+ Weight = erlang:min(1, Time / 1000000),
+ Avg1 = case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end,
+ CBInfo#cb_info{inactive_dur = Inactive,
+ active = Now,
+ avg = Avg1}
+ end.
+
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
@@ -733,7 +754,7 @@ possibly_unblock(State, ChPid, Update) ->
end
end.
-unblock(State, C = #cr{limiter = Limiter}) ->
+unblock(State = #q{consumer_bound_info = CBInfo}, C = #cr{limiter = Limiter}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -745,17 +766,8 @@ unblock(State, C = #cr{limiter = Limiter}) ->
BlockedQ = priority_queue:from_list(Blocked),
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- #q{consumer_bound_info = CBInfo} = State,
- #cb_info{inactive = WhenInactive,
- active = WhenActive} = CBInfo,
- State1 = case timer:now_diff(WhenActive, WhenInactive) > 0 of
- true -> State;
- false -> Now = erlang:now(),
- CBInfo1 = CBInfo#cb_info{inactive_dur = timer:now_diff(Now, WhenInactive),
- active = Now},
- State#q{consumer_bound_info = CBInfo1}
- end,
- AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
+ State1 = State#q{consumer_bound_info = update_cb(CBInfo, active)},
+ AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ),
State2 = State1#q{active_consumers = AC1},
[notify_decorators(
consumer_unblocked, [{consumer_tag, CTag}], State2) ||
@@ -1068,18 +1080,19 @@ i(messages, State) ->
i(consumers, _) ->
consumer_count();
i(consumer_utilisation,
- #q{consumer_bound_info = #cb_info{active_dur = ActiveDur,
- active = WhenActive,
- inactive_dur = InactiveDur,
- inactive = WhenInactive}}) ->
+ #q{consumer_bound_info = #cb_info{active = WhenActive,
+ inactive = WhenInactive,
+ avg = Avg}}) ->
Now = erlang:now(),
case timer:now_diff(Now, WhenInactive) < 5000000 of
false -> case timer:now_diff(WhenInactive, WhenActive) > 0 of
- true -> 0;
- false -> 100
+ true -> inactive;
+ false -> active
end;
- true -> Ratio = ActiveDur / (InactiveDur + ActiveDur),
- trunc(Ratio * 100)
+ true -> case Avg of
+ undefined -> noavg;
+ _ -> trunc(Avg * 100)
+ end
end;
i(memory, _) ->
{memory, M} = process_info(self(), memory),