summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-06 19:18:33 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-06 19:18:33 +0000
commit801e917644cd577db8b5ab5e612a5483e223795e (patch)
tree1755275deb5e5d98d1c623dba6d56b93c56155a9
parent45b1c46a06eb92ac6bff8ca536a30706c7d345ee (diff)
downloadrabbitmq-server-git-801e917644cd577db8b5ab5e612a5483e223795e.tar.gz
move consumer_use calculations into rabbit_queue_consumers
-rw-r--r--src/rabbit_amqqueue_process.erl45
-rw-r--r--src/rabbit_queue_consumers.erl55
2 files changed, 51 insertions, 49 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6c877cc50f..5c5b79fe61 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -38,7 +38,6 @@
backing_queue,
backing_queue_state,
consumers,
- consumer_use,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -135,7 +134,6 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
consumers = rabbit_queue_consumers:new(),
- consumer_use = {inactive, now_micros(), 0, 0.0},
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
@@ -416,33 +414,7 @@ deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) ->
State2 = State1#q{consumers = Consumers1},
[notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) ||
{_ChPid, CTag} <- Blocked],
- case Active of
- true -> {true, State2};
- false -> {false, update_consumer_use(State2, inactive)}
- end.
-
-update_consumer_use(State = #q{consumer_use = CUInfo}, Use) ->
- State#q{consumer_use = update_consumer_use1(CUInfo, Use)}.
-
-update_consumer_use1({inactive, _, _, _} = CUInfo, inactive) ->
- CUInfo;
-update_consumer_use1({active, _, _} = CUInfo, active) ->
- CUInfo;
-update_consumer_use1({active, Since, Avg}, inactive) ->
- Now = now_micros(),
- {inactive, Now, Now - Since, Avg};
-update_consumer_use1({inactive, Since, Active, Avg}, active) ->
- Now = now_micros(),
- {active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
-
-consumer_use_avg(Active, Inactive, Avg) ->
- Time = Inactive + Active,
- Ratio = Active / Time,
- Weight = erlang:min(1, Time / 1000000),
- case Avg of
- undefined -> Ratio;
- _ -> Ratio * Weight + Avg * (1 - Weight)
- end.
+ {Active, State2}.
confirm_messages([], State) ->
State;
@@ -597,10 +569,10 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
unchanged ->
State;
{unblocked, UnblockedCTags, Consumers1} ->
+ State1 = State#q{consumers = Consumers1},
[notify_decorators(consumer_unblocked, [{consumer_tag, CTag}],
- State) || CTag <- UnblockedCTags],
- run_message_queue(
- update_consumer_use(State#q{consumers = Consumers1}, active))
+ State1) || CTag <- UnblockedCTags],
+ run_message_queue(State1)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -885,15 +857,10 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
rabbit_queue_consumers:count();
-i(consumer_utilisation, #q{consumer_use = ConsumerUse}) ->
+i(consumer_utilisation, #q{consumers = Consumers}) ->
case rabbit_queue_consumers:count() of
0 -> '';
- _ -> case ConsumerUse of
- {active, Since, Avg} ->
- consumer_use_avg(now_micros() - Since, 0, Avg);
- {inactive, Since, Active, Avg} ->
- consumer_use_avg(Active, now_micros() - Since, Avg)
- end
+ _ -> rabbit_queue_consumers:utilisation(Consumers)
end;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 40daec321e..ea0ab6da5f 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -20,13 +20,14 @@
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
send_drained/0, deliver/5, record_ack/3, subtract_acks/2,
possibly_unblock/3,
- resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4]).
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
+ utilisation/1]).
%%----------------------------------------------------------------------------
-define(UNSENT_MESSAGE_LIMIT, 200).
--record(state, {consumers}).
+-record(state, {consumers, use}).
-record(consumer, {tag, ack_required, args}).
@@ -47,7 +48,12 @@
-ifdef(use_specs).
--type state() :: #state{consumers ::priority_queue:q()}.
+-type time_micros() :: non_neg_integer().
+-type ratio() :: float().
+-type state() :: #state{consumers ::priority_queue:q(),
+ use :: {'inactive',
+ time_micros(), time_micros(), ratio()} |
+ {'active', time_micros(), ratio()}}.
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
@@ -83,12 +89,14 @@
-spec activate_limit_fun() -> cr_fun().
-spec credit_fun(boolean(), non_neg_integer(), boolean(),
rabbit_types:ctag()) -> cr_fun().
+-spec utilisation(state()) -> ratio().
-endif.
%%----------------------------------------------------------------------------
-new() -> #state{consumers = priority_queue:new()}.
+new() -> #state{consumers = priority_queue:new(),
+ use = {inactive, now_micros(), 0, 0.0}}.
max_active_priority(#state{consumers = Consumers}) ->
priority_queue:highest(Consumers).
@@ -178,11 +186,11 @@ deliver(FetchFun, Stop, QName, S, State) ->
deliver(_FetchFun, true, _QName, Blocked, S, State) ->
{true, Blocked, S, State};
-deliver( FetchFun, false, QName, Blocked, S, State = #state{
- consumers = Consumers}) ->
+deliver( FetchFun, false, QName, Blocked, S,
+ State = #state{consumers = Consumers, use = Use}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {false, Blocked, S, State};
+ {false, Blocked, S, State#state{use = update_use(Use, inactive)}};
{{value, QEntry, Priority}, Tail} ->
{Stop, Blocked1, S1, Consumers1} =
deliver_to_consumer(FetchFun, QEntry, Priority, QName,
@@ -269,7 +277,7 @@ possibly_unblock(Update, ChPid, State) ->
end.
unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
- State = #state{consumers = Consumers}) ->
+ State = #state{consumers = Consumers, use = Use}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -283,8 +291,8 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
{unblocked,
tags(Unblocked),
- State#state{consumers =
- priority_queue:join(Consumers, UnblockedQ)}}
+ State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
+ use = update_use(Use, active)}}
end.
resume_fun() ->
@@ -312,6 +320,11 @@ credit_fun(IsEmpty, Credit, Drain, CTag) ->
end
end.
+utilisation(#state{use = {active, Since, Avg}}) ->
+ use_avg(now_micros() - Since, 0, Avg);
+utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
+ use_avg(Active, now_micros() - Since, Avg).
+
%%----------------------------------------------------------------------------
lookup_ch(ChPid) ->
@@ -389,3 +402,25 @@ remove_consumers(ChPid, Queue) ->
priority_queue:filter(fun ({CP, _Consumer}) when CP =:= ChPid -> false;
(_) -> true
end, Queue).
+
+update_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_use({active, Since, Avg}, inactive) ->
+ Now = now_micros(),
+ {inactive, Now, Now - Since, Avg};
+update_use({inactive, Since, Active, Avg}, active) ->
+ Now = now_micros(),
+ {active, Now, use_avg(Active, Now - Since, Avg)}.
+
+use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ Ratio = Active / Time,
+ Weight = erlang:min(1, Time / 1000000),
+ case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end.
+
+now_micros() -> timer:now_diff(now(), {0,0,0}).