summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_limiter.erl38
1 files changed, 18 insertions, 20 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 210b6b7c4e..c0eecbefd7 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -286,12 +286,10 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) ->
{Res, Cr} = case IsEmpty andalso Drain of
- true -> {true, make_credit(#credit{credit = 0,
- drain = false})};
- false -> {false, make_credit(#credit{credit = Credit,
- drain = Drain})}
+ true -> {true, #credit{credit = 0, drain = false}};
+ false -> {false, #credit{credit = Credit, drain = Drain}}
end,
- {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}.
+ {Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}.
set_consumer_prefetch(Lim, _CTag, true, _Credit) ->
Lim;
@@ -305,8 +303,7 @@ ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
case gb_trees:lookup(CTag, Credits) of
{value, C = #credit{mode = auto,
credit = C0}} ->
- {gb_trees:update(
- CTag, make_credit(C#credit{credit = C0 + Credit}), Credits),
+ {update_credit(CTag, C#credit{credit = C0 + Credit}, Credits),
C0 =:= 0};
_ ->
{Credits, false}
@@ -314,11 +311,12 @@ ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
{Unblocked, Limiter#qstate{credits = Credits1}}.
drained(Limiter = #qstate{credits = Credits}) ->
+ Drain = fun(C) -> C#credit{credit = 0, drain = false} end,
{CTagCredits, Credits2} =
rabbit_misc:gb_trees_fold(
- fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) ->
- {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)};
- (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) ->
+ fun (CTag, C = #credit{credit = Crd, drain = true}, {Acc, Creds0}) ->
+ {[{CTag, Crd} | Acc], update_credit(CTag, Drain(C), Creds0)};
+ (_CTag, #credit{credit = _Crd, drain = false}, {Acc, Creds0}) ->
{Acc, Creds0}
end, {[], Credits}, Credits),
{CTagCredits, Limiter#qstate{credits = Credits2}}.
@@ -337,23 +335,23 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
%% state for us (#qstate.credits), and maintain a fiction that the
%% limiter is making the decisions...
-make_credit(C = #credit{credit = Credit, drain = Drain}) ->
- %% Using up all credit implies no need to send a 'drained' event
- C#credit{drain = Drain andalso Credit > 0}.
-
decrement_credit(CTag, Credits) ->
case gb_trees:lookup(CTag, Credits) of
{value, C = #credit{credit = Credit}} ->
- C2 = make_credit(C#credit{credit = Credit - 1}),
- gb_trees:update(CTag, C2, Credits);
+ update_credit(CTag, C#credit{credit = Credit - 1}, Credits);
none ->
Credits
end.
-update_credit(CTag, Credit, Drain, Credits) ->
- C = gb_trees:get(CTag, Credits),
- C2 = make_credit(C#credit{credit = Credit, drain = Drain}),
- gb_trees:update(CTag, C2, Credits).
+enter_credit(CTag, C, Credits) ->
+ gb_trees:enter(CTag, ensure_credit_invariant(C), Credits).
+
+update_credit(CTag, C, Credits) ->
+ gb_trees:update(CTag, ensure_credit_invariant(C), Credits).
+
+ensure_credit_invariant(C = #credit{credit = Credit, drain = Drain}) ->
+ %% Using up all credit implies no need to send a 'drained' event
+ C#credit{drain = Drain andalso Credit > 0}.
%%----------------------------------------------------------------------------
%% gen_server callbacks