summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@rabbitmq.com>2011-02-09 16:27:42 +0000
committerMichael Bridgen <mikeb@rabbitmq.com>2011-02-09 16:27:42 +0000
commit18fd3422bbd6ce30e7975b7306afd4bd501358e4 (patch)
treee1596ff1d03ca4344635fc7ace4d0b7ffe6be189
parentd5bf80b0ec626dd143dd3ce1fea40b2740ad58e6 (diff)
downloadrabbitmq-server-git-18fd3422bbd6ce30e7975b7306afd4bd501358e4.tar.gz
Use a base for basic.credit, to account for messages that have been delivered but not received by the client
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_limiter.erl58
2 files changed, 40 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f76026d275..5b264cc6d4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1030,7 +1030,9 @@ handle_method(#'channel.flow'{active = false}, _,
{noreply, State1#ch{blocking = dict:from_list(Queues)}}
end;
-handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit,
+handle_method(#'basic.credit'{consumer_tag = CTag,
+ credit = Credit,
+ count = Count,
drain = Drain}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
@@ -1052,7 +1054,7 @@ handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit,
Other -> Other
end,
LimiterPid2 =
- case rabbit_limiter:set_credit(LimiterPid1, CTag, Credit, Drain) of
+ case rabbit_limiter:set_credit(LimiterPid1, CTag, Credit, Count, Drain) of
ok -> limit_queues(LimiterPid1, State),
LimiterPid1;
stopped -> unlimit_queues(State)
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 4a05050f3d..a9c0406fa1 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -23,7 +23,7 @@
handle_info/2, prioritise_call/3]).
-export([start_link/2]).
-export([limit/2, can_send/5, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1, set_credit/4, is_blocked/1]).
+-export([get_limit/1, block/1, unblock/1, set_credit/5, is_blocked/1]).
%%----------------------------------------------------------------------------
@@ -54,7 +54,7 @@
queues = dict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
--record(credit, {credit = 0, drain = false}).
+-record(credit, {count = 0, credit = 0, drain = false}).
%% 'Notify' is a boolean that indicates whether a queue should be
%% notified of a change in the limit or volume that may allow it to
@@ -111,10 +111,11 @@ unblock(undefined) ->
unblock(LimiterPid) ->
gen_server2:call(LimiterPid, unblock, infinity).
-set_credit(undefined, _, _, _) ->
+set_credit(undefined, _, _, _, _) ->
ok;
-set_credit(LimiterPid, CTag, Credit, Drain) ->
- gen_server2:call(LimiterPid, {set_credit, CTag, Credit, Drain}, infinity).
+set_credit(LimiterPid, CTag, Credit, Count, Drain) ->
+ io:format("Set credit for ~p: credit ~p, count ~p, drain ~p~n", [CTag, Credit, Count, Drain]),
+ gen_server2:call(LimiterPid, {set_credit, CTag, Credit, Count, Drain}, infinity).
is_blocked(undefined) ->
false;
@@ -161,8 +162,8 @@ handle_call(block, _From, State) ->
handle_call(unblock, _From, State) ->
maybe_notify_reply(irrelevant, State, State#lim{blocked = false});
-handle_call({set_credit, CTag, Credit, Drain}, _From, State) ->
- maybe_notify_reply(CTag, State, update_credit(CTag, Credit, Drain, State));
+handle_call({set_credit, CTag, Credit, Count, Drain}, _From, State) ->
+ maybe_notify_reply(CTag, State, update_credit(CTag, Credit, Count, Drain, State));
handle_call(is_blocked, _From, State) ->
{reply, blocked(State), State}.
@@ -218,34 +219,47 @@ limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume,
_ -> false
end orelse (Limit =/= 0 andalso Volume >= Limit).
-decr_credit(CTag, Len, State = #lim{ credits = Credits, ch_pid = ChPid } ) ->
+decr_credit(CTag, Len, State = #lim{ credits = Credits,
+ ch_pid = ChPid } ) ->
case dict:find(CTag, Credits) of
- {ok, #credit{ credit = Credit, drain = Drain }} ->
- NewCredit = case {Credit, Len, Drain} of
- {1, _, _} -> 0; %% Usual reduction to 0
- {_, 1, true} -> send_drained(ChPid, CTag),
- 0; %% Magic reduction to 0
- {_, _, _} -> Credit - 1
- end,
- update_credit(CTag, NewCredit, Drain, State);
+ {ok, #credit{ credit = Credit, count = Count, drain = Drain }} ->
+ {NewCredit, NewCount} =
+ case {Credit, Len, Drain} of
+ {1, _, _} -> {0, Count + 1}; %% Usual reduction to 0
+ {_, 1, true} ->
+ NewCount0 = Count + (Credit - 1),
+ send_drained(ChPid, CTag, NewCount0),
+ {0, NewCount0}; %% Magic reduction to 0
+ {_, _, _} -> {Credit - 1, Count + 1}
+ end,
+ update_credit(CTag, NewCredit, NewCount, Drain, State);
error ->
State
end.
-send_drained(ChPid, CTag) ->
+send_drained(ChPid, CTag, Count) ->
rabbit_channel:send_command(ChPid,
#'basic.credit_state'{consumer_tag = CTag,
credit = 0,
+ count = Count,
available = 0,
drain = true}).
-update_credit(CTag, -1, _Drain, State = #lim{credits = Credits}) ->
+update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) ->
State#lim{credits = dict:erase(CTag, Credits)};
-update_credit(CTag, Credit, Drain, State = #lim{credits = Credits}) ->
- State#lim{credits = dict:store(CTag,
- #credit{credit = Credit, drain = Drain},
- Credits)}.
+%% Edge case: if the queue has nothing in it, and drain is set, we want to
+%% send a credit.
+update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) ->
+ New = case dict:find(CTag, Credits) of
+ #credit{ count = OldCount,
+ credit = OldCredit } = Old ->
+ Old#credit{ credit = erlang:max(
+ 0, OldCount + OldCredit - Count),
+ count = Count, drain = Drain };
+ _ -> #credit{ count = Count, credit = Credit, drain = Drain }
+ end,
+ State#lim{credits = dict:store(CTag, New, Credits)}.
blocked(#lim{blocked = Blocked}) -> Blocked.