diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_limiter.erl | 45 |
1 files changed, 23 insertions, 22 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e32f072e0f..e2c4fdbe93 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -145,7 +145,8 @@ is_blocked(Limiter) -> inform(Limiter = #token{q_state = Credits}, ChPid, Len, {basic_credit, CTag, Credit, Count, Drain, Reply}) -> - {Unblock, Credits2} = update_credit(CTag, Credit, Count, Drain, Credits), + {Unblock, Credits2} = update_credit( + CTag, Len, ChPid, Credit, Count, Drain, Credits), case Reply of true -> rabbit_channel:send_command( ChPid, #'basic.credit_ok'{available = Len}); @@ -179,16 +180,19 @@ record_send_q(CTag, Len, ChPid, Credits) -> decr_credit(CTag, Len, ChPid, Cred, Credits) -> #credit{credit = Credit, count = Count, drain = Drain} = Cred, - {NewCredit, NewCount} = - case {Len, Drain} of - {1, true} -> %% Drain, so advance til credit = 0 - NewCount0 = serial_add(Count, (Credit - 1)), - send_drained(ChPid, CTag, NewCount0), - {0, NewCount0}; %% Magic reduction to 0 - {_, _} -> {Credit - 1, serial_add(Count, 1)} - end, + {NewCredit, NewCount} = maybe_drain(Len - 1, Drain, CTag, ChPid, + Credit - 1, serial_add(Count, 1)), write_credit(CTag, NewCredit, NewCount, Drain, Credits). +maybe_drain(0, true, CTag, ChPid, Credit, Count) -> + %% Drain, so advance til credit = 0 + NewCount = serial_add(Count, Credit - 2), + send_drained(ChPid, CTag, NewCount), + {0, NewCount}; %% Magic reduction to 0 + +maybe_drain(_, _, _, _, Credit, Count) -> + {Credit, Count}. + send_drained(ChPid, CTag, Count) -> rabbit_channel:send_command(ChPid, #'basic.credit_state'{consumer_tag = CTag, @@ -197,19 +201,16 @@ send_drained(ChPid, CTag, Count) -> available = 0, drain = true}). -%% Update the credit state. -%% TODO Edge case: if the queue has nothing in it, and drain is set, -%% we want to send a basic.credit back. -update_credit(CTag, Credit, Count0, Drain, Credits) -> - Count = - case dict:find(CTag, Credits) of - %% Use our count if we can, more accurate - {ok, #credit{ count = LocalCount }} -> LocalCount; - %% But if this is new, take it from the adapter - _ -> Count0 - end, - NewCredits = write_credit(CTag, Credit, Count, Drain, Credits), - case Credit > 0 of +update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> + Count = case dict:find(CTag, Credits) of + %% Use our count if we can, more accurate + {ok, #credit{ count = LocalCount }} -> LocalCount; + %% But if this is new, take it from the adapter + _ -> Count0 + end, + {NewCredit, NewCount} = maybe_drain(Len, Drain, CTag, ChPid, Credit, Count), + NewCredits = write_credit(CTag, NewCredit, NewCount, Drain, Credits), + case NewCredit > 0 of true -> {[CTag], NewCredits}; false -> {[], NewCredits} end. |
