diff options
| -rw-r--r-- | src/rabbit_channel.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 6 |
2 files changed, 8 insertions, 11 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dae21389ef..9ffb9112dd 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -145,8 +145,8 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> send_credit_reply(Pid, Len) -> gen_server2:cast(Pid, {send_credit_reply, Len}). -send_drained(Pid, ConsumerTag, Count) -> - gen_server2:cast(Pid, {send_drained, ConsumerTag, Count}). +send_drained(Pid, ConsumerTag, CreditDrained) -> + gen_server2:cast(Pid, {send_drained, ConsumerTag, CreditDrained}). flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). @@ -330,14 +330,11 @@ handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> WriterPid, #'basic.credit_ok'{available = Len}), noreply(State); -handle_cast({send_drained, ConsumerTag, Count}, +handle_cast({send_drained, ConsumerTag, CreditDrained}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( - WriterPid, #'basic.credit_state'{consumer_tag = ConsumerTag, - credit = 0, - count = Count, - available = 0, - drain = true}), + WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, + credit_drained = CreditDrained}), noreply(State); handle_cast(force_event_refresh, State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b97d10732e..232be83c77 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -186,14 +186,14 @@ decr_credit(CTag, Len, ChPid, Cred, Credits) -> maybe_drain(0, true, CTag, ChPid, Credit, Count) -> %% Drain, so advance til credit = 0 NewCount = serial_add(Count, Credit - 2), - send_drained(ChPid, CTag, NewCount), + send_drained(ChPid, CTag, Credit), {0, NewCount}; %% Magic reduction to 0 maybe_drain(_, _, _, _, Credit, Count) -> {Credit, Count}. -send_drained(ChPid, CTag, Count) -> - rabbit_channel:send_drained(ChPid, CTag, Count). +send_drained(ChPid, CTag, CreditDrained) -> + rabbit_channel:send_drained(ChPid, CTag, CreditDrained). update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> Count = case dict:find(CTag, Credits) of |
