summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-02-02 17:36:48 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-02-02 17:36:48 +0000
commit08ffa97ff979eb28302bba5eb2b1ab00fa690f07 (patch)
treed12152aa90cde9a668e35074080af1d4349a26a2
parenta8f202f13a00cd8819bd4a0bdc6a69420ad67389 (diff)
downloadrabbitmq-server-git-08ffa97ff979eb28302bba5eb2b1ab00fa690f07.tar.gz
Send a credit-state when magical draining happens.
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_limiter.erl18
2 files changed, 15 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a06241017c..c4b9daf73f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -462,7 +462,7 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
-%% TODO port this
+%% TODO port this(?)
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
case dict:find(QPid, Blocking) of
error -> State;
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index efe6023b9e..4a05050f3d 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -15,6 +15,7 @@
%%
-module(rabbit_limiter).
+-include("rabbit_framing.hrl").
-behaviour(gen_server2).
@@ -217,18 +218,27 @@ limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume,
_ -> false
end orelse (Limit =/= 0 andalso Volume >= Limit).
-decr_credit(CTag, Len, State = #lim{ credits = Credits } ) ->
+decr_credit(CTag, Len, State = #lim{ credits = Credits, ch_pid = ChPid } ) ->
case dict:find(CTag, Credits) of
{ok, #credit{ credit = Credit, drain = Drain }} ->
- NewCredit = case {Len, Drain} of
- {1, true} -> 0;
- {_, _} -> Credit - 1
+ NewCredit = case {Credit, Len, Drain} of
+ {1, _, _} -> 0; %% Usual reduction to 0
+ {_, 1, true} -> send_drained(ChPid, CTag),
+ 0; %% Magic reduction to 0
+ {_, _, _} -> Credit - 1
end,
update_credit(CTag, NewCredit, Drain, State);
error ->
State
end.
+send_drained(ChPid, CTag) ->
+ rabbit_channel:send_command(ChPid,
+ #'basic.credit_state'{consumer_tag = CTag,
+ credit = 0,
+ available = 0,
+ drain = true}).
+
update_credit(CTag, -1, _Drain, State = #lim{credits = Credits}) ->
State#lim{credits = dict:erase(CTag, Credits)};