summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-05 20:51:26 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-05 20:51:26 +0000
commit373c989397d96b9655f70c44aac6b056e520fe94 (patch)
tree9549e3b8fb3459dd946c665afddd8605b12419e2 /src
parentb7cfc6fa743cf201aaf6f6050b6fc016e8443fc1 (diff)
parent57b490004da5a5bad7d41aa443f081b20d570bad (diff)
downloadrabbitmq-server-git-373c989397d96b9655f70c44aac6b056e520fe94.tar.gz
merge bug22310 into default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_limiter.erl12
2 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3aa9adfee6..b9940eb9fd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -540,12 +540,14 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{ limiter_pid = LimiterPid }) ->
+ _, State = #ch{ limiter_pid = LimiterPid,
+ unacked_message_q = UAMQ }) ->
NewLimiterPid = case {LimiterPid, PrefetchCount} of
{undefined, 0} ->
undefined;
{undefined, _} ->
- LPid = rabbit_limiter:start_link(self()),
+ LPid = rabbit_limiter:start_link(self(),
+ queue:len(UAMQ)),
ok = limit_queues(LPid, State),
LPid;
{_, 0} ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 6bd803a27b..83df15ce20 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -35,7 +35,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
--export([start_link/1, shutdown/1]).
+-export([start_link/2, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
-export([get_limit/1]).
@@ -45,7 +45,7 @@
-type(maybe_pid() :: pid() | 'undefined').
--spec(start_link/1 :: (pid()) -> pid()).
+-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
@@ -70,8 +70,8 @@
%% API
%%----------------------------------------------------------------------------
-start_link(ChPid) ->
- {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid], []),
+start_link(ChPid, UnackedMsgCount) ->
+ {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []),
Pid.
shutdown(undefined) ->
@@ -117,8 +117,8 @@ get_limit(Pid) ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([ChPid]) ->
- {ok, #lim{ch_pid = ChPid} }.
+init([ChPid, UnackedMsgCount]) ->
+ {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->