diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 14:14:00 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 14:14:00 +0000 |
| commit | 3564ac37dbe67a3ad24788e2ef057e8cb67ade90 (patch) | |
| tree | 2584a0ff320ac777e13afa8b602f65414154f1bf /src | |
| parent | 969764b187eff634eab8aeba759b59b392262906 (diff) | |
| download | rabbitmq-server-git-3564ac37dbe67a3ad24788e2ef057e8cb67ade90.tar.gz | |
We can't have drain=true and mode=auto so let's unify to a single mode: manual/drain/auto.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_limiter.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 28 |
2 files changed, 30 insertions, 26 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 2b13590749..fa1a676878 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -128,7 +128,7 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/6, ack_from_queue/3, + is_suspended/1, is_consumer_blocked/2, credit/5, ack_from_queue/3, drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -147,7 +147,7 @@ -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). --type(credit_mode() :: 'manual' | 'auto'). +-type(credit_mode() :: 'manual' | 'drain' | 'auto'). -spec(start_link/1 :: (rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error()). @@ -173,7 +173,7 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/6 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), +-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), credit_mode(), boolean()) -> {boolean(), qstate()}). -spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer()) -> {boolean(), qstate()}). @@ -194,7 +194,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {credit = 0, drain = false, mode}). +-record(credit, {credit = 0, mode}). %%---------------------------------------------------------------------------- %% API @@ -283,11 +283,11 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> {value, #credit{}} -> true end. -credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Drain, Mode, IsEmpty) -> +credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) -> {Res, Cr} = - case IsEmpty andalso Drain of - true -> {true, #credit{credit = 0, drain = false, mode = Mode}}; - false -> {false, #credit{credit = Crd, drain = Drain, mode = Mode}} + case IsEmpty andalso Mode =:= drain of + true -> {true, #credit{credit = 0, mode = manual}}; + false -> {false, #credit{credit = Crd, mode = Mode}} end, {Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}. @@ -304,12 +304,12 @@ ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> {Unblocked, Limiter#qstate{credits = Credits1}}. drained(Limiter = #qstate{credits = Credits}) -> - Drain = fun(C) -> C#credit{credit = 0, drain = false} end, + Drain = fun(C) -> C#credit{credit = 0, mode = manual} end, {CTagCredits, Credits2} = rabbit_misc:gb_trees_fold( - fun (CTag, C = #credit{credit = Crd, drain = true}, {Acc, Creds0}) -> + fun (CTag, C = #credit{credit = Crd, mode = drain}, {Acc, Creds0}) -> {[{CTag, Crd} | Acc], update_credit(CTag, Drain(C), Creds0)}; - (_CTag, #credit{credit = _Crd, drain = false}, {Acc, Creds0}) -> + (_CTag, #credit{credit = _Crd, mode = _Mode}, {Acc, Creds0}) -> {Acc, Creds0} end, {[], Credits}, Credits), {CTagCredits, Limiter#qstate{credits = Credits2}}. @@ -342,9 +342,11 @@ enter_credit(CTag, C, Credits) -> update_credit(CTag, C, Credits) -> gb_trees:update(CTag, ensure_credit_invariant(C), Credits). -ensure_credit_invariant(C = #credit{credit = Credit, drain = Drain}) -> +ensure_credit_invariant(C = #credit{credit = 0, mode = drain}) -> %% Using up all credit implies no need to send a 'drained' event - C#credit{drain = Drain andalso Credit > 0}. + C#credit{mode = manual}; +ensure_credit_invariant(C) -> + C. %%---------------------------------------------------------------------------- %% gen_server callbacks diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index ca500d4800..6024889c71 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -120,7 +120,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), @@ -131,14 +131,13 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( case parse_credit_args(Args) of - none -> C1; - {0, _Drain, auto} -> C1; - {Credit, _Drain, auto} when NoAck -> C1; - {Credit, Drain, Mode} -> credit_and_drain( - C1, ConsumerTag, Credit, - Drain, Mode, IsEmpty) + none -> C1; + {0, auto} -> C1; + {_Credit, auto} when NoAck -> C1; + {Credit, Mode} -> credit_and_drain( + C1, CTag, Credit, Mode, IsEmpty) end), - Consumer = #consumer{tag = ConsumerTag, + Consumer = #consumer{tag = CTag, ack_required = not NoAck, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. @@ -147,11 +146,11 @@ parse_credit_args(Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, C}, {bool, D}} -> {C, D, manual}; + {{long, C}, {bool, D}} -> {C, drain_mode(D)}; _ -> none end; undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of - {_, Prefetch} -> {Prefetch, false, auto}; + {_, Prefetch} -> {Prefetch, auto}; _ -> none end end. @@ -330,7 +329,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> unchanged; #cr{limiter = Limiter} = C -> C1 = #cr{limiter = Limiter1} = - credit_and_drain(C, CTag, Credit, Drain, manual, IsEmpty), + credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty), case is_ch_blocked(C1) orelse (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of @@ -340,6 +339,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> end end. +drain_mode(true) -> drain; +drain_mode(false) -> manual. + utilisation(#state{use = {active, Since, Avg}}) -> use_avg(now_micros() - Since, 0, Avg); utilisation(#state{use = {inactive, Since, Active, Avg}}) -> @@ -405,8 +407,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> end. credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, - CTag, Credit, Drain, Mode, IsEmpty) -> - case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, Mode, IsEmpty) of + CTag, Credit, Mode, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of {true, Limiter1} -> rabbit_channel:send_drained(ChPid, [{CTag, Credit}]), C#cr{limiter = Limiter1}; |
