diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 55 |
2 files changed, 51 insertions, 49 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6c877cc50f..5c5b79fe61 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -38,7 +38,6 @@ backing_queue, backing_queue_state, consumers, - consumer_use, expires, sync_timer_ref, rate_timer_ref, @@ -135,7 +134,6 @@ init_state(Q) -> exclusive_consumer = none, has_had_consumers = false, consumers = rabbit_queue_consumers:new(), - consumer_use = {inactive, now_micros(), 0, 0.0}, senders = pmon:new(delegate), msg_id_to_channel = gb_trees:empty(), status = running, @@ -416,33 +414,7 @@ deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) -> State2 = State1#q{consumers = Consumers1}, [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) || {_ChPid, CTag} <- Blocked], - case Active of - true -> {true, State2}; - false -> {false, update_consumer_use(State2, inactive)} - end. - -update_consumer_use(State = #q{consumer_use = CUInfo}, Use) -> - State#q{consumer_use = update_consumer_use1(CUInfo, Use)}. - -update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) -> - CUInfo; -update_consumer_use1({active, _, _} = CUInfo, active) -> - CUInfo; -update_consumer_use1({active, Since, Avg}, inactive) -> - Now = now_micros(), - {inactive, Now, Now - Since, Avg}; -update_consumer_use1({inactive, Since, Active, Avg}, active) -> - Now = now_micros(), - {active, Now, consumer_use_avg(Active, Now - Since, Avg)}. - -consumer_use_avg(Active, Inactive, Avg) -> - Time = Inactive + Active, - Ratio = Active / Time, - Weight = erlang:min(1, Time / 1000000), - case Avg of - undefined -> Ratio; - _ -> Ratio * Weight + Avg * (1 - Weight) - end. + {Active, State2}. confirm_messages([], State) -> State; @@ -597,10 +569,10 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> unchanged -> State; {unblocked, UnblockedCTags, Consumers1} -> + State1 = State#q{consumers = Consumers1}, [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}], - State) || CTag <- UnblockedCTags], - run_message_queue( - update_consumer_use(State#q{consumers = Consumers1}, active)) + State1) || CTag <- UnblockedCTags], + run_message_queue(State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -885,15 +857,10 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> rabbit_queue_consumers:count(); -i(consumer_utilisation, #q{consumer_use = ConsumerUse}) -> +i(consumer_utilisation, #q{consumers = Consumers}) -> case rabbit_queue_consumers:count() of 0 -> ''; - _ -> case ConsumerUse of - {active, Since, Avg} -> - consumer_use_avg(now_micros() - Since, 0, Avg); - {inactive, Since, Active, Avg} -> - consumer_use_avg(Active, now_micros() - Since, Avg) - end + _ -> rabbit_queue_consumers:utilisation(Consumers) end; i(memory, _) -> {memory, M} = process_info(self(), memory), diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 40daec321e..ea0ab6da5f 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -20,13 +20,14 @@ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, send_drained/0, deliver/5, record_ack/3, subtract_acks/2, possibly_unblock/3, - resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4]). + resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, + utilisation/1]). %%---------------------------------------------------------------------------- -define(UNSENT_MESSAGE_LIMIT, 200). --record(state, {consumers}). +-record(state, {consumers, use}). -record(consumer, {tag, ack_required, args}). @@ -47,7 +48,12 @@ -ifdef(use_specs). --type state() :: #state{consumers ::priority_queue:q()}. +-type time_micros() :: non_neg_integer(). +-type ratio() :: float(). +-type state() :: #state{consumers ::priority_queue:q(), + use :: {'inactive', + time_micros(), time_micros(), ratio()} | + {'active', time_micros(), ratio()}}. -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). @@ -83,12 +89,14 @@ -spec activate_limit_fun() -> cr_fun(). -spec credit_fun(boolean(), non_neg_integer(), boolean(), rabbit_types:ctag()) -> cr_fun(). +-spec utilisation(state()) -> ratio(). -endif. %%---------------------------------------------------------------------------- -new() -> #state{consumers = priority_queue:new()}. +new() -> #state{consumers = priority_queue:new(), + use = {inactive, now_micros(), 0, 0.0}}. max_active_priority(#state{consumers = Consumers}) -> priority_queue:highest(Consumers). @@ -178,11 +186,11 @@ deliver(FetchFun, Stop, QName, S, State) -> deliver(_FetchFun, true, _QName, Blocked, S, State) -> {true, Blocked, S, State}; -deliver( FetchFun, false, QName, Blocked, S, State = #state{ - consumers = Consumers}) -> +deliver( FetchFun, false, QName, Blocked, S, + State = #state{consumers = Consumers, use = Use}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {false, Blocked, S, State}; + {false, Blocked, S, State#state{use = update_use(Use, inactive)}}; {{value, QEntry, Priority}, Tail} -> {Stop, Blocked1, S1, Consumers1} = deliver_to_consumer(FetchFun, QEntry, Priority, QName, @@ -269,7 +277,7 @@ possibly_unblock(Update, ChPid, State) -> end. unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, - State = #state{consumers = Consumers}) -> + State = #state{consumers = Consumers, use = Use}) -> case lists:partition( fun({_P, {_ChPid, #consumer{tag = CTag}}}) -> rabbit_limiter:is_consumer_blocked(Limiter, CTag) @@ -283,8 +291,8 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, update_ch_record(C#cr{blocked_consumers = BlockedQ1}), {unblocked, tags(Unblocked), - State#state{consumers = - priority_queue:join(Consumers, UnblockedQ)}} + State#state{consumers = priority_queue:join(Consumers, UnblockedQ), + use = update_use(Use, active)}} end. resume_fun() -> @@ -312,6 +320,11 @@ credit_fun(IsEmpty, Credit, Drain, CTag) -> end end. +utilisation(#state{use = {active, Since, Avg}}) -> + use_avg(now_micros() - Since, 0, Avg); +utilisation(#state{use = {inactive, Since, Active, Avg}}) -> + use_avg(Active, now_micros() - Since, Avg). + %%---------------------------------------------------------------------------- lookup_ch(ChPid) -> @@ -389,3 +402,25 @@ remove_consumers(ChPid, Queue) -> priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false; (_) -> true end, Queue). + +update_use({inactive, _, _, _} = CUInfo, inactive) -> + CUInfo; +update_use({active, _, _} = CUInfo, active) -> + CUInfo; +update_use({active, Since, Avg}, inactive) -> + Now = now_micros(), + {inactive, Now, Now - Since, Avg}; +update_use({inactive, Since, Active, Avg}, active) -> + Now = now_micros(), + {active, Now, use_avg(Active, Now - Since, Avg)}. + +use_avg(Active, Inactive, Avg) -> + Time = Inactive + Active, + Ratio = Active / Time, + Weight = erlang:min(1, Time / 1000000), + case Avg of + undefined -> Ratio; + _ -> Ratio * Weight + Avg * (1 - Weight) + end. + +now_micros() -> timer:now_diff(now(), {0,0,0}). |
