diff options
| -rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 40 |
2 files changed, 17 insertions, 28 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 9ffb9112dd..7f6dc3c867 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1115,13 +1115,12 @@ handle_method(#'channel.flow'{active = false}, _, handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, - count = Count, drain = Drain}, _, State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of {ok, Q} -> ok = rabbit_amqqueue:inform_limiter( Q, self(), - {basic_credit, CTag, Credit, Count, Drain, true}), + {basic_credit, CTag, Credit, Drain, true}), {noreply, State}; error -> precondition_failed("unknown consumer tag '~s'", [CTag]) end; @@ -1199,7 +1198,7 @@ maybe_set_initial_credit(Arguments, CTag, Q) -> {{long, Credit}, {boolean, Drain}} -> ok = rabbit_amqqueue:inform_limiter( Q, self(), - {basic_credit, CTag, Credit, 0, Drain, + {basic_credit, CTag, Credit, Drain, false}); _ -> ok diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 232be83c77..ae9c79180b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -75,7 +75,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {count = 0, credit = 0, drain = false}). +-record(credit, {credit = 0, drain = false}). %%---------------------------------------------------------------------------- %% API @@ -144,9 +144,9 @@ is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). inform(Limiter = #token{q_state = Credits}, - ChPid, Len, {basic_credit, CTag, Credit, Count, Drain, Reply}) -> + ChPid, Len, {basic_credit, CTag, Credit, Drain, Reply}) -> {Unblock, Credits2} = update_credit( - CTag, Len, ChPid, Credit, Count, Drain, Credits), + CTag, Len, ChPid, Credit, Drain, Credits), case Reply of true -> rabbit_channel:send_credit_reply(ChPid, Len); false -> ok @@ -178,40 +178,30 @@ record_send_q(CTag, Len, ChPid, Credits) -> end. decr_credit(CTag, Len, ChPid, Cred, Credits) -> - #credit{credit = Credit, count = Count, drain = Drain} = Cred, - {NewCredit, NewCount} = maybe_drain(Len - 1, Drain, CTag, ChPid, - Credit - 1, serial_add(Count, 1)), - write_credit(CTag, NewCredit, NewCount, Drain, Credits). - -maybe_drain(0, true, CTag, ChPid, Credit, Count) -> - %% Drain, so advance til credit = 0 - NewCount = serial_add(Count, Credit - 2), + #credit{credit = Credit, drain = Drain} = Cred, + NewCredit = maybe_drain(Len - 1, Drain, CTag, ChPid, Credit - 1), + write_credit(CTag, NewCredit, Drain, Credits). + +maybe_drain(0, true, CTag, ChPid, Credit) -> send_drained(ChPid, CTag, Credit), - {0, NewCount}; %% Magic reduction to 0 + 0; %% Magic reduction to 0 -maybe_drain(_, _, _, _, Credit, Count) -> - {Credit, Count}. +maybe_drain(_, _, _, _, Credit) -> + Credit. send_drained(ChPid, CTag, CreditDrained) -> rabbit_channel:send_drained(ChPid, CTag, CreditDrained). -update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> - Count = case dict:find(CTag, Credits) of - %% Use our count if we can, more accurate - {ok, #credit{ count = LocalCount }} -> LocalCount; - %% But if this is new, take it from the adapter - _ -> Count0 - end, - {NewCredit, NewCount} = maybe_drain(Len, Drain, CTag, ChPid, Credit, Count), - NewCredits = write_credit(CTag, NewCredit, NewCount, Drain, Credits), +update_credit(CTag, Len, ChPid, Credit, Drain, Credits) -> + NewCredit = maybe_drain(Len, Drain, CTag, ChPid, Credit), + NewCredits = write_credit(CTag, NewCredit, Drain, Credits), case NewCredit > 0 of true -> {[CTag], NewCredits}; false -> {[], NewCredits} end. -write_credit(CTag, Credit, Count, Drain, Credits) -> +write_credit(CTag, Credit, Drain, Credits) -> dict:store(CTag, #credit{credit = Credit, - count = Count, drain = Drain}, Credits). %%---------------------------------------------------------------------------- |
