summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl58
1 files changed, 23 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index dbbc84421e..80372fdb30 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,7 +39,7 @@
backing_queue,
backing_queue_state,
active_consumers,
- consumer_use_info,
+ consumer_use,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -151,7 +151,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = priority_queue:new(),
- consumer_use_info = {inactive, now_micros(), 0, 0.0},
+ consumer_use = {inactive, now_micros(), 0, 0.0},
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
@@ -486,10 +486,11 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers,
- consumer_use_info = CUInfo}) ->
+ consumer_use = CUInfo}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false, State#q{consumer_use_info = update_cu(CUInfo, inactive)}};
+ {false,
+ State#q{consumer_use = update_consumer_use(CUInfo, inactive)}};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -540,24 +541,25 @@ deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
{Result, is_empty(State1), State1}.
-update_cu({inactive, _, _, _} = CUInfo, inactive) ->
+update_consumer_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
-update_cu({active, _, _} = CUInfo, active) ->
+update_consumer_use({active, _, _} = CUInfo, active) ->
CUInfo;
-update_cu({active, Since, Avg}, inactive) ->
+update_consumer_use({active, Since, Avg}, inactive) ->
Now = now_micros(),
{inactive, Now, Now - Since, Avg};
-update_cu({inactive, Since, Active, Avg}, active) ->
+update_consumer_use({inactive, Since, Active, Avg}, active) ->
Now = now_micros(),
- Inactive = Now - Since,
+ {active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
+
+consumer_use_avg(Active, Inactive, Avg) ->
Time = Inactive + Active,
Ratio = Active / Time,
Weight = erlang:min(1, Time / 1000000),
- Avg1 = case Avg of
- undefined -> Ratio;
- _ -> Ratio * Weight + Avg * (1 - Weight)
- end,
- {active, Now, Avg1}.
+ case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end.
confirm_messages([], State) ->
State;
@@ -736,7 +738,7 @@ possibly_unblock(State, ChPid, Update) ->
end
end.
-unblock(State = #q{consumer_use_info = CUInfo}, C = #cr{limiter = Limiter}) ->
+unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -748,7 +750,8 @@ unblock(State = #q{consumer_use_info = CUInfo}, C = #cr{limiter = Limiter}) ->
BlockedQ = priority_queue:from_list(Blocked),
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- State1 = State#q{consumer_use_info = update_cu(CUInfo, active)},
+ State1 = State#q{consumer_use =
+ update_consumer_use(CUInfo, active)},
AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ),
State2 = State1#q{active_consumers = AC1},
[notify_decorators(
@@ -1061,25 +1064,10 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
-i(consumer_utilisation, #q{consumer_use_info = {active, Since, Avg}}) ->
- Now = now_micros(),
- case Now - Since > 1000000 of
- true -> active;
- false -> case Avg of
- undefined -> noavg;
- _ -> trunc(Avg * 100)
- end
- end;
-i(consumer_utilisation, #q{consumer_use_info = {inactive, Since, _, Avg}}) ->
- Now = now_micros(),
- case Now - Since > 1000000 of
- true -> inactive;
- false -> case Avg of
- undefined -> noavg;
- _ -> trunc(Avg * 100)
- end
- end;
-
+i(consumer_utilisation, #q{consumer_use = {active, Since, Avg}}) ->
+ consumer_use_avg(now_micros() - Since, 0, Avg);
+i(consumer_utilisation, #q{consumer_use = {inactive, Since, Active, Avg}}) ->
+ consumer_use_avg(Active, now_micros() - Since, Avg);
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;