diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-02-04 13:05:52 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-02-04 13:05:52 +0000 |
| commit | 7a02669af084489cb89a0778d028cd1f0e0a6558 (patch) | |
| tree | e30c715d9e564ec3e23645766069fdf0f1cc87ef | |
| parent | 2ce7e4b604f2750ee5031d67cee2d20a8f2f5a53 (diff) | |
| download | rabbitmq-server-git-7a02669af084489cb89a0778d028cd1f0e0a6558.tar.gz | |
Initialise a channel's limiter with the current number of unacked msgs
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 12 |
2 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f8e100979d..8f11b4edd4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -489,12 +489,14 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid }) -> + _, State = #ch{ limiter_pid = LimiterPid, + unacked_message_q = UAMQ }) -> NewLimiterPid = case {LimiterPid, PrefetchCount} of {undefined, 0} -> undefined; {undefined, _} -> - LPid = rabbit_limiter:start_link(self()), + LPid = rabbit_limiter:start_link(self(), + queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid; {_, 0} -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 087a9f64d9..cf91270e9e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -35,7 +35,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([start_link/1, shutdown/1]). +-export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -44,7 +44,7 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/1 :: (pid()) -> pid()). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). @@ -68,8 +68,8 @@ %% API %%---------------------------------------------------------------------------- -start_link(ChPid) -> - {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), +start_link(ChPid, UnackedMsgCount) -> + {ok, Pid} = gen_server:start_link(?MODULE, [ChPid, UnackedMsgCount], []), Pid. shutdown(undefined) -> @@ -108,8 +108,8 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}) %% gen_server callbacks %%---------------------------------------------------------------------------- -init([ChPid]) -> - {ok, #lim{ch_pid = ChPid} }. +init([ChPid, UnackedMsgCount]) -> + {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> |
