diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-02-02 17:36:48 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-02-02 17:36:48 +0000 |
| commit | 08ffa97ff979eb28302bba5eb2b1ab00fa690f07 (patch) | |
| tree | d12152aa90cde9a668e35074080af1d4349a26a2 /src | |
| parent | a8f202f13a00cd8819bd4a0bdc6a69420ad67389 (diff) | |
| download | rabbitmq-server-git-08ffa97ff979eb28302bba5eb2b1ab00fa690f07.tar.gz | |
Send a credit-state when magical draining happens.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 18 |
2 files changed, 15 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a06241017c..c4b9daf73f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -462,7 +462,7 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. -%% TODO port this +%% TODO port this(?) queue_blocked(QPid, State = #ch{blocking = Blocking}) -> case dict:find(QPid, Blocking) of error -> State; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index efe6023b9e..4a05050f3d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -15,6 +15,7 @@ %% -module(rabbit_limiter). +-include("rabbit_framing.hrl"). -behaviour(gen_server2). @@ -217,18 +218,27 @@ limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume, _ -> false end orelse (Limit =/= 0 andalso Volume >= Limit). -decr_credit(CTag, Len, State = #lim{ credits = Credits } ) -> +decr_credit(CTag, Len, State = #lim{ credits = Credits, ch_pid = ChPid } ) -> case dict:find(CTag, Credits) of {ok, #credit{ credit = Credit, drain = Drain }} -> - NewCredit = case {Len, Drain} of - {1, true} -> 0; - {_, _} -> Credit - 1 + NewCredit = case {Credit, Len, Drain} of + {1, _, _} -> 0; %% Usual reduction to 0 + {_, 1, true} -> send_drained(ChPid, CTag), + 0; %% Magic reduction to 0 + {_, _, _} -> Credit - 1 end, update_credit(CTag, NewCredit, Drain, State); error -> State end. +send_drained(ChPid, CTag) -> + rabbit_channel:send_command(ChPid, + #'basic.credit_state'{consumer_tag = CTag, + credit = 0, + available = 0, + drain = true}). + update_credit(CTag, -1, _Drain, State = #lim{credits = Credits}) -> State#lim{credits = dict:erase(CTag, Credits)}; |
