diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 58 |
1 files changed, 23 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index dbbc84421e..80372fdb30 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,7 +39,7 @@ backing_queue, backing_queue_state, active_consumers, - consumer_use_info, + consumer_use, expires, sync_timer_ref, rate_timer_ref, @@ -151,7 +151,7 @@ init_state(Q) -> exclusive_consumer = none, has_had_consumers = false, active_consumers = priority_queue:new(), - consumer_use_info = {inactive, now_micros(), 0, 0.0}, + consumer_use = {inactive, now_micros(), 0, 0.0}, senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running, @@ -486,10 +486,11 @@ deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers, - consumer_use_info = CUInfo}) -> + consumer_use = CUInfo}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - {false, State#q{consumer_use_info = update_cu(CUInfo, inactive)}}; + {false, + State#q{consumer_use = update_consumer_use(CUInfo, inactive)}}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -540,24 +541,25 @@ deliver_from_queue_deliver(AckRequired, State) -> {Result, State1} = fetch(AckRequired, State), {Result, is_empty(State1), State1}. -update_cu({inactive, _, _, _} = CUInfo, inactive) -> +update_consumer_use({inactive, _, _, _} = CUInfo, inactive) -> CUInfo; -update_cu({active, _, _} = CUInfo, active) -> +update_consumer_use({active, _, _} = CUInfo, active) -> CUInfo; -update_cu({active, Since, Avg}, inactive) -> +update_consumer_use({active, Since, Avg}, inactive) -> Now = now_micros(), {inactive, Now, Now - Since, Avg}; -update_cu({inactive, Since, Active, Avg}, active) -> +update_consumer_use({inactive, Since, Active, Avg}, active) -> Now = now_micros(), - Inactive = Now - Since, + {active, Now, consumer_use_avg(Active, Now - Since, Avg)}. + +consumer_use_avg(Active, Inactive, Avg) -> Time = Inactive + Active, Ratio = Active / Time, Weight = erlang:min(1, Time / 1000000), - Avg1 = case Avg of - undefined -> Ratio; - _ -> Ratio * Weight + Avg * (1 - Weight) - end, - {active, Now, Avg1}. + case Avg of + undefined -> Ratio; + _ -> Ratio * Weight + Avg * (1 - Weight) + end. confirm_messages([], State) -> State; @@ -736,7 +738,7 @@ possibly_unblock(State, ChPid, Update) -> end end. -unblock(State = #q{consumer_use_info = CUInfo}, C = #cr{limiter = Limiter}) -> +unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) -> case lists:partition( fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) @@ -748,7 +750,8 @@ unblock(State = #q{consumer_use_info = CUInfo}, C = #cr{limiter = Limiter}) -> BlockedQ = priority_queue:from_list(Blocked), UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), - State1 = State#q{consumer_use_info = update_cu(CUInfo, active)}, + State1 = State#q{consumer_use = + update_consumer_use(CUInfo, active)}, AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ), State2 = State1#q{active_consumers = AC1}, [notify_decorators( @@ -1061,25 +1064,10 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); -i(consumer_utilisation, #q{consumer_use_info = {active, Since, Avg}}) -> - Now = now_micros(), - case Now - Since > 1000000 of - true -> active; - false -> case Avg of - undefined -> noavg; - _ -> trunc(Avg * 100) - end - end; -i(consumer_utilisation, #q{consumer_use_info = {inactive, Since, _, Avg}}) -> - Now = now_micros(), - case Now - Since > 1000000 of - true -> inactive; - false -> case Avg of - undefined -> noavg; - _ -> trunc(Avg * 100) - end - end; - +i(consumer_utilisation, #q{consumer_use = {active, Since, Avg}}) -> + consumer_use_avg(now_micros() - Since, 0, Avg); +i(consumer_utilisation, #q{consumer_use = {inactive, Since, Active, Avg}}) -> + consumer_use_avg(Active, now_micros() - Since, Avg); i(memory, _) -> {memory, M} = process_info(self(), memory), M; |
