diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 73 |
1 files changed, 43 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 69ae6a7e8a..48aa62ef4c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -62,7 +62,8 @@ -record(cb_info, {inactive, active, inactive_dur, - active_dur}). + active_dur, + avg}). %% These are held in our process dictionary -record(cr, {ch_pid, @@ -497,15 +498,7 @@ deliver_msgs_to_consumers(DeliverFun, false, consumer_bound_info = CBInfo}) -> case priority_queue:out_p(ActiveConsumers) of {empty, _} -> - #cb_info{active = WhenActive, - inactive = WhenInactive} = CBInfo, - Now = erlang:now(), - CBInfo1 = case timer:now_diff(WhenInactive, WhenActive) > 0 of - true -> CBInfo; - false -> CBInfo#cb_info{active_dur = timer:now_diff(Now, WhenActive), - inactive = Now} - end, - {false, State#q{consumer_bound_info = CBInfo1}}; + {false, State#q{consumer_bound_info = update_cb(CBInfo, inactive)}}; {{value, QEntry, Priority}, Tail} -> {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, Priority, @@ -556,6 +549,34 @@ deliver_from_queue_deliver(AckRequired, State) -> {Result, State1} = fetch(AckRequired, State), {Result, is_empty(State1), State1}. +update_cb(#cb_info{active = WhenActive, + inactive = WhenInactive} = CBInfo, inactive) -> + case timer:now_diff(WhenInactive, WhenActive) > 0 of + true -> CBInfo; + false -> Now = erlang:now(), + CBInfo#cb_info{active_dur = timer:now_diff(Now, WhenActive), + inactive = Now} + end; +update_cb(#cb_info{inactive = WhenInactive, + active = WhenActive, + active_dur = Active, + avg = Avg} = CBInfo, active) -> + case timer:now_diff(WhenActive, WhenInactive) > 0 of + true -> CBInfo; + false -> Now = erlang:now(), + Inactive = timer:now_diff(Now, WhenInactive), + Time = Inactive + Active, + Ratio = Active / Time, + Weight = erlang:min(1, Time / 1000000), + Avg1 = case Avg of + undefined -> Ratio; + _ -> Ratio * Weight + Avg * (1 - Weight) + end, + CBInfo#cb_info{inactive_dur = Inactive, + active = Now, + avg = Avg1} + end. + confirm_messages([], State) -> State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> @@ -733,7 +754,7 @@ possibly_unblock(State, ChPid, Update) -> end end. -unblock(State, C = #cr{limiter = Limiter}) -> +unblock(State = #q{consumer_bound_info = CBInfo}, C = #cr{limiter = Limiter}) -> case lists:partition( fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) @@ -745,17 +766,8 @@ unblock(State, C = #cr{limiter = Limiter}) -> BlockedQ = priority_queue:from_list(Blocked), UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), - #q{consumer_bound_info = CBInfo} = State, - #cb_info{inactive = WhenInactive, - active = WhenActive} = CBInfo, - State1 = case timer:now_diff(WhenActive, WhenInactive) > 0 of - true -> State; - false -> Now = erlang:now(), - CBInfo1 = CBInfo#cb_info{inactive_dur = timer:now_diff(Now, WhenInactive), - active = Now}, - State#q{consumer_bound_info = CBInfo1} - end, - AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), + State1 = State#q{consumer_bound_info = update_cb(CBInfo, active)}, + AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ), State2 = State1#q{active_consumers = AC1}, [notify_decorators( consumer_unblocked, [{consumer_tag, CTag}], State2) || @@ -1068,18 +1080,19 @@ i(messages, State) -> i(consumers, _) -> consumer_count(); i(consumer_utilisation, - #q{consumer_bound_info = #cb_info{active_dur = ActiveDur, - active = WhenActive, - inactive_dur = InactiveDur, - inactive = WhenInactive}}) -> + #q{consumer_bound_info = #cb_info{active = WhenActive, + inactive = WhenInactive, + avg = Avg}}) -> Now = erlang:now(), case timer:now_diff(Now, WhenInactive) < 5000000 of false -> case timer:now_diff(WhenInactive, WhenActive) > 0 of - true -> 0; - false -> 100 + true -> inactive; + false -> active end; - true -> Ratio = ActiveDur / (InactiveDur + ActiveDur), - trunc(Ratio * 100) + true -> case Avg of + undefined -> noavg; + _ -> trunc(Avg * 100) + end end; i(memory, _) -> {memory, M} = process_info(self(), memory), |
