summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-15 14:14:00 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-15 14:14:00 +0000
commit3564ac37dbe67a3ad24788e2ef057e8cb67ade90 (patch)
tree2584a0ff320ac777e13afa8b602f65414154f1bf /src
parent969764b187eff634eab8aeba759b59b392262906 (diff)
downloadrabbitmq-server-git-3564ac37dbe67a3ad24788e2ef057e8cb67ade90.tar.gz
We can't have drain=true and mode=auto so let's unify to a single mode: manual/drain/auto.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_limiter.erl28
-rw-r--r--src/rabbit_queue_consumers.erl28
2 files changed, 30 insertions, 26 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 2b13590749..fa1a676878 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -128,7 +128,7 @@
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/6, ack_from_queue/3,
+ is_suspended/1, is_consumer_blocked/2, credit/5, ack_from_queue/3,
drained/1, forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -147,7 +147,7 @@
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
--type(credit_mode() :: 'manual' | 'auto').
+-type(credit_mode() :: 'manual' | 'drain' | 'auto').
-spec(start_link/1 :: (rabbit_types:proc_name()) ->
rabbit_types:ok_pid_or_error()).
@@ -173,7 +173,7 @@
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
--spec(credit/6 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
+-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(),
credit_mode(), boolean()) -> {boolean(), qstate()}).
-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer())
-> {boolean(), qstate()}).
@@ -194,7 +194,7 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
--record(credit, {credit = 0, drain = false, mode}).
+-record(credit, {credit = 0, mode}).
%%----------------------------------------------------------------------------
%% API
@@ -283,11 +283,11 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
{value, #credit{}} -> true
end.
-credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Drain, Mode, IsEmpty) ->
+credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) ->
{Res, Cr} =
- case IsEmpty andalso Drain of
- true -> {true, #credit{credit = 0, drain = false, mode = Mode}};
- false -> {false, #credit{credit = Crd, drain = Drain, mode = Mode}}
+ case IsEmpty andalso Mode =:= drain of
+ true -> {true, #credit{credit = 0, mode = manual}};
+ false -> {false, #credit{credit = Crd, mode = Mode}}
end,
{Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}.
@@ -304,12 +304,12 @@ ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
{Unblocked, Limiter#qstate{credits = Credits1}}.
drained(Limiter = #qstate{credits = Credits}) ->
- Drain = fun(C) -> C#credit{credit = 0, drain = false} end,
+ Drain = fun(C) -> C#credit{credit = 0, mode = manual} end,
{CTagCredits, Credits2} =
rabbit_misc:gb_trees_fold(
- fun (CTag, C = #credit{credit = Crd, drain = true}, {Acc, Creds0}) ->
+ fun (CTag, C = #credit{credit = Crd, mode = drain}, {Acc, Creds0}) ->
{[{CTag, Crd} | Acc], update_credit(CTag, Drain(C), Creds0)};
- (_CTag, #credit{credit = _Crd, drain = false}, {Acc, Creds0}) ->
+ (_CTag, #credit{credit = _Crd, mode = _Mode}, {Acc, Creds0}) ->
{Acc, Creds0}
end, {[], Credits}, Credits),
{CTagCredits, Limiter#qstate{credits = Credits2}}.
@@ -342,9 +342,11 @@ enter_credit(CTag, C, Credits) ->
update_credit(CTag, C, Credits) ->
gb_trees:update(CTag, ensure_credit_invariant(C), Credits).
-ensure_credit_invariant(C = #credit{credit = Credit, drain = Drain}) ->
+ensure_credit_invariant(C = #credit{credit = 0, mode = drain}) ->
%% Using up all credit implies no need to send a 'drained' event
- C#credit{drain = Drain andalso Credit > 0}.
+ C#credit{mode = manual};
+ensure_credit_invariant(C) ->
+ C.
%%----------------------------------------------------------------------------
%% gen_server callbacks
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index ca500d4800..6024889c71 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -120,7 +120,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
@@ -131,14 +131,13 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
case parse_credit_args(Args) of
- none -> C1;
- {0, _Drain, auto} -> C1;
- {Credit, _Drain, auto} when NoAck -> C1;
- {Credit, Drain, Mode} -> credit_and_drain(
- C1, ConsumerTag, Credit,
- Drain, Mode, IsEmpty)
+ none -> C1;
+ {0, auto} -> C1;
+ {_Credit, auto} when NoAck -> C1;
+ {Credit, Mode} -> credit_and_drain(
+ C1, CTag, Credit, Mode, IsEmpty)
end),
- Consumer = #consumer{tag = ConsumerTag,
+ Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
@@ -147,11 +146,11 @@ parse_credit_args(Args) ->
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, C}, {bool, D}} -> {C, D, manual};
+ {{long, C}, {bool, D}} -> {C, drain_mode(D)};
_ -> none
end;
undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
- {_, Prefetch} -> {Prefetch, false, auto};
+ {_, Prefetch} -> {Prefetch, auto};
_ -> none
end
end.
@@ -330,7 +329,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
unchanged;
#cr{limiter = Limiter} = C ->
C1 = #cr{limiter = Limiter1} =
- credit_and_drain(C, CTag, Credit, Drain, manual, IsEmpty),
+ credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty),
case is_ch_blocked(C1) orelse
(not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
@@ -340,6 +339,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
end
end.
+drain_mode(true) -> drain;
+drain_mode(false) -> manual.
+
utilisation(#state{use = {active, Since, Avg}}) ->
use_avg(now_micros() - Since, 0, Avg);
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
@@ -405,8 +407,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
end.
credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
- CTag, Credit, Drain, Mode, IsEmpty) ->
- case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, Mode, IsEmpty) of
+ CTag, Credit, Mode, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
{true, Limiter1} -> rabbit_channel:send_drained(ChPid,
[{CTag, Credit}]),
C#cr{limiter = Limiter1};