summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_limiter.erl45
1 files changed, 23 insertions, 22 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index e32f072e0f..e2c4fdbe93 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -145,7 +145,8 @@ is_blocked(Limiter) ->
inform(Limiter = #token{q_state = Credits},
ChPid, Len, {basic_credit, CTag, Credit, Count, Drain, Reply}) ->
- {Unblock, Credits2} = update_credit(CTag, Credit, Count, Drain, Credits),
+ {Unblock, Credits2} = update_credit(
+ CTag, Len, ChPid, Credit, Count, Drain, Credits),
case Reply of
true -> rabbit_channel:send_command(
ChPid, #'basic.credit_ok'{available = Len});
@@ -179,16 +180,19 @@ record_send_q(CTag, Len, ChPid, Credits) ->
decr_credit(CTag, Len, ChPid, Cred, Credits) ->
#credit{credit = Credit, count = Count, drain = Drain} = Cred,
- {NewCredit, NewCount} =
- case {Len, Drain} of
- {1, true} -> %% Drain, so advance til credit = 0
- NewCount0 = serial_add(Count, (Credit - 1)),
- send_drained(ChPid, CTag, NewCount0),
- {0, NewCount0}; %% Magic reduction to 0
- {_, _} -> {Credit - 1, serial_add(Count, 1)}
- end,
+ {NewCredit, NewCount} = maybe_drain(Len - 1, Drain, CTag, ChPid,
+ Credit - 1, serial_add(Count, 1)),
write_credit(CTag, NewCredit, NewCount, Drain, Credits).
+maybe_drain(0, true, CTag, ChPid, Credit, Count) ->
+ %% Drain, so advance til credit = 0
+ NewCount = serial_add(Count, Credit - 2),
+ send_drained(ChPid, CTag, NewCount),
+ {0, NewCount}; %% Magic reduction to 0
+
+maybe_drain(_, _, _, _, Credit, Count) ->
+ {Credit, Count}.
+
send_drained(ChPid, CTag, Count) ->
rabbit_channel:send_command(ChPid,
#'basic.credit_state'{consumer_tag = CTag,
@@ -197,19 +201,16 @@ send_drained(ChPid, CTag, Count) ->
available = 0,
drain = true}).
-%% Update the credit state.
-%% TODO Edge case: if the queue has nothing in it, and drain is set,
-%% we want to send a basic.credit back.
-update_credit(CTag, Credit, Count0, Drain, Credits) ->
- Count =
- case dict:find(CTag, Credits) of
- %% Use our count if we can, more accurate
- {ok, #credit{ count = LocalCount }} -> LocalCount;
- %% But if this is new, take it from the adapter
- _ -> Count0
- end,
- NewCredits = write_credit(CTag, Credit, Count, Drain, Credits),
- case Credit > 0 of
+update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
+ Count = case dict:find(CTag, Credits) of
+ %% Use our count if we can, more accurate
+ {ok, #credit{ count = LocalCount }} -> LocalCount;
+ %% But if this is new, take it from the adapter
+ _ -> Count0
+ end,
+ {NewCredit, NewCount} = maybe_drain(Len, Drain, CTag, ChPid, Credit, Count),
+ NewCredits = write_credit(CTag, NewCredit, NewCount, Drain, Credits),
+ case NewCredit > 0 of
true -> {[CTag], NewCredits};
false -> {[], NewCredits}
end.