diff options
| author | Michael Bridgen <mikeb@rabbitmq.com> | 2011-02-09 16:27:42 +0000 |
|---|---|---|
| committer | Michael Bridgen <mikeb@rabbitmq.com> | 2011-02-09 16:27:42 +0000 |
| commit | 18fd3422bbd6ce30e7975b7306afd4bd501358e4 (patch) | |
| tree | e1596ff1d03ca4344635fc7ace4d0b7ffe6be189 /src | |
| parent | d5bf80b0ec626dd143dd3ce1fea40b2740ad58e6 (diff) | |
| download | rabbitmq-server-git-18fd3422bbd6ce30e7975b7306afd4bd501358e4.tar.gz | |
Use a base for basic.credit, to account for messages that have been delivered but not received by the client
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 58 |
2 files changed, 40 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f76026d275..5b264cc6d4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1030,7 +1030,9 @@ handle_method(#'channel.flow'{active = false}, _, {noreply, State1#ch{blocking = dict:from_list(Queues)}} end; -handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + count = Count, drain = Drain}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> @@ -1052,7 +1054,7 @@ handle_method(#'basic.credit'{consumer_tag = CTag, credit = Credit, Other -> Other end, LimiterPid2 = - case rabbit_limiter:set_credit(LimiterPid1, CTag, Credit, Drain) of + case rabbit_limiter:set_credit(LimiterPid1, CTag, Credit, Count, Drain) of ok -> limit_queues(LimiterPid1, State), LimiterPid1; stopped -> unlimit_queues(State) diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 4a05050f3d..a9c0406fa1 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -23,7 +23,7 @@ handle_info/2, prioritise_call/3]). -export([start_link/2]). -export([limit/2, can_send/5, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1, set_credit/4, is_blocked/1]). +-export([get_limit/1, block/1, unblock/1, set_credit/5, is_blocked/1]). %%---------------------------------------------------------------------------- @@ -54,7 +54,7 @@ queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). --record(credit, {credit = 0, drain = false}). +-record(credit, {count = 0, credit = 0, drain = false}). %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to @@ -111,10 +111,11 @@ unblock(undefined) -> unblock(LimiterPid) -> gen_server2:call(LimiterPid, unblock, infinity). -set_credit(undefined, _, _, _) -> +set_credit(undefined, _, _, _, _) -> ok; -set_credit(LimiterPid, CTag, Credit, Drain) -> - gen_server2:call(LimiterPid, {set_credit, CTag, Credit, Drain}, infinity). +set_credit(LimiterPid, CTag, Credit, Count, Drain) -> + io:format("Set credit for ~p: credit ~p, count ~p, drain ~p~n", [CTag, Credit, Count, Drain]), + gen_server2:call(LimiterPid, {set_credit, CTag, Credit, Count, Drain}, infinity). is_blocked(undefined) -> false; @@ -161,8 +162,8 @@ handle_call(block, _From, State) -> handle_call(unblock, _From, State) -> maybe_notify_reply(irrelevant, State, State#lim{blocked = false}); -handle_call({set_credit, CTag, Credit, Drain}, _From, State) -> - maybe_notify_reply(CTag, State, update_credit(CTag, Credit, Drain, State)); +handle_call({set_credit, CTag, Credit, Count, Drain}, _From, State) -> + maybe_notify_reply(CTag, State, update_credit(CTag, Credit, Count, Drain, State)); handle_call(is_blocked, _From, State) -> {reply, blocked(State), State}. @@ -218,34 +219,47 @@ limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume, _ -> false end orelse (Limit =/= 0 andalso Volume >= Limit). -decr_credit(CTag, Len, State = #lim{ credits = Credits, ch_pid = ChPid } ) -> +decr_credit(CTag, Len, State = #lim{ credits = Credits, + ch_pid = ChPid } ) -> case dict:find(CTag, Credits) of - {ok, #credit{ credit = Credit, drain = Drain }} -> - NewCredit = case {Credit, Len, Drain} of - {1, _, _} -> 0; %% Usual reduction to 0 - {_, 1, true} -> send_drained(ChPid, CTag), - 0; %% Magic reduction to 0 - {_, _, _} -> Credit - 1 - end, - update_credit(CTag, NewCredit, Drain, State); + {ok, #credit{ credit = Credit, count = Count, drain = Drain }} -> + {NewCredit, NewCount} = + case {Credit, Len, Drain} of + {1, _, _} -> {0, Count + 1}; %% Usual reduction to 0 + {_, 1, true} -> + NewCount0 = Count + (Credit - 1), + send_drained(ChPid, CTag, NewCount0), + {0, NewCount0}; %% Magic reduction to 0 + {_, _, _} -> {Credit - 1, Count + 1} + end, + update_credit(CTag, NewCredit, NewCount, Drain, State); error -> State end. -send_drained(ChPid, CTag) -> +send_drained(ChPid, CTag, Count) -> rabbit_channel:send_command(ChPid, #'basic.credit_state'{consumer_tag = CTag, credit = 0, + count = Count, available = 0, drain = true}). -update_credit(CTag, -1, _Drain, State = #lim{credits = Credits}) -> +update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) -> State#lim{credits = dict:erase(CTag, Credits)}; -update_credit(CTag, Credit, Drain, State = #lim{credits = Credits}) -> - State#lim{credits = dict:store(CTag, - #credit{credit = Credit, drain = Drain}, - Credits)}. +%% Edge case: if the queue has nothing in it, and drain is set, we want to +%% send a credit. +update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) -> + New = case dict:find(CTag, Credits) of + #credit{ count = OldCount, + credit = OldCredit } = Old -> + Old#credit{ credit = erlang:max( + 0, OldCount + OldCredit - Count), + count = Count, drain = Drain }; + _ -> #credit{ count = Count, credit = Credit, drain = Drain } + end, + State#lim{credits = dict:store(CTag, New, Credits)}. blocked(#lim{blocked = Blocked}) -> Blocked. |
