summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_limiter.erl40
2 files changed, 17 insertions, 28 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9ffb9112dd..7f6dc3c867 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1115,13 +1115,12 @@ handle_method(#'channel.flow'{active = false}, _,
handle_method(#'basic.credit'{consumer_tag = CTag,
credit = Credit,
- count = Count,
drain = Drain}, _,
State = #ch{consumer_mapping = Consumers}) ->
case dict:find(CTag, Consumers) of
{ok, Q} -> ok = rabbit_amqqueue:inform_limiter(
Q, self(),
- {basic_credit, CTag, Credit, Count, Drain, true}),
+ {basic_credit, CTag, Credit, Drain, true}),
{noreply, State};
error -> precondition_failed("unknown consumer tag '~s'", [CTag])
end;
@@ -1199,7 +1198,7 @@ maybe_set_initial_credit(Arguments, CTag, Q) ->
{{long, Credit}, {boolean, Drain}} ->
ok = rabbit_amqqueue:inform_limiter(
Q, self(),
- {basic_credit, CTag, Credit, 0, Drain,
+ {basic_credit, CTag, Credit, Drain,
false});
_ ->
ok
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 232be83c77..ae9c79180b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -75,7 +75,7 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
--record(credit, {count = 0, credit = 0, drain = false}).
+-record(credit, {credit = 0, drain = false}).
%%----------------------------------------------------------------------------
%% API
@@ -144,9 +144,9 @@ is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
inform(Limiter = #token{q_state = Credits},
- ChPid, Len, {basic_credit, CTag, Credit, Count, Drain, Reply}) ->
+ ChPid, Len, {basic_credit, CTag, Credit, Drain, Reply}) ->
{Unblock, Credits2} = update_credit(
- CTag, Len, ChPid, Credit, Count, Drain, Credits),
+ CTag, Len, ChPid, Credit, Drain, Credits),
case Reply of
true -> rabbit_channel:send_credit_reply(ChPid, Len);
false -> ok
@@ -178,40 +178,30 @@ record_send_q(CTag, Len, ChPid, Credits) ->
end.
decr_credit(CTag, Len, ChPid, Cred, Credits) ->
- #credit{credit = Credit, count = Count, drain = Drain} = Cred,
- {NewCredit, NewCount} = maybe_drain(Len - 1, Drain, CTag, ChPid,
- Credit - 1, serial_add(Count, 1)),
- write_credit(CTag, NewCredit, NewCount, Drain, Credits).
-
-maybe_drain(0, true, CTag, ChPid, Credit, Count) ->
- %% Drain, so advance til credit = 0
- NewCount = serial_add(Count, Credit - 2),
+ #credit{credit = Credit, drain = Drain} = Cred,
+ NewCredit = maybe_drain(Len - 1, Drain, CTag, ChPid, Credit - 1),
+ write_credit(CTag, NewCredit, Drain, Credits).
+
+maybe_drain(0, true, CTag, ChPid, Credit) ->
send_drained(ChPid, CTag, Credit),
- {0, NewCount}; %% Magic reduction to 0
+ 0; %% Magic reduction to 0
-maybe_drain(_, _, _, _, Credit, Count) ->
- {Credit, Count}.
+maybe_drain(_, _, _, _, Credit) ->
+ Credit.
send_drained(ChPid, CTag, CreditDrained) ->
rabbit_channel:send_drained(ChPid, CTag, CreditDrained).
-update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
- Count = case dict:find(CTag, Credits) of
- %% Use our count if we can, more accurate
- {ok, #credit{ count = LocalCount }} -> LocalCount;
- %% But if this is new, take it from the adapter
- _ -> Count0
- end,
- {NewCredit, NewCount} = maybe_drain(Len, Drain, CTag, ChPid, Credit, Count),
- NewCredits = write_credit(CTag, NewCredit, NewCount, Drain, Credits),
+update_credit(CTag, Len, ChPid, Credit, Drain, Credits) ->
+ NewCredit = maybe_drain(Len, Drain, CTag, ChPid, Credit),
+ NewCredits = write_credit(CTag, NewCredit, Drain, Credits),
case NewCredit > 0 of
true -> {[CTag], NewCredits};
false -> {[], NewCredits}
end.
-write_credit(CTag, Credit, Count, Drain, Credits) ->
+write_credit(CTag, Credit, Drain, Credits) ->
dict:store(CTag, #credit{credit = Credit,
- count = Count,
drain = Drain}, Credits).
%%----------------------------------------------------------------------------