summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl36
-rw-r--r--src/rabbit_limiter.erl17
2 files changed, 29 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a56955cb96..9b6c87d8cb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -214,6 +214,8 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
+handle_info({'EXIT', _OldLimiterPid, normal}, State) ->
+ {noreply, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -542,23 +544,23 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
_, State = #ch{ limiter_pid = LimiterPid,
unacked_message_q = UAMQ }) ->
- NewLimiterPid = case {LimiterPid, PrefetchCount} of
- {undefined, 0} ->
- undefined;
- {undefined, _} ->
- LPid = rabbit_limiter:start_link(self(),
- queue:len(UAMQ)),
- ok = limit_queues(LPid, State),
- LPid;
- {_, 0} ->
- ok = rabbit_limiter:shutdown(LimiterPid),
- ok = limit_queues(undefined, State),
- undefined;
- {_, _} ->
- LimiterPid
- end,
- ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
- {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
+ LimiterPid1 = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} ->
+ undefined;
+ {undefined, _} ->
+ LPid = rabbit_limiter:start_link(self(),
+ queue:len(UAMQ)),
+ ok = limit_queues(LPid, State),
+ LPid;
+ {_, _} ->
+ LimiterPid
+ end,
+ LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of
+ ok -> LimiterPid1;
+ stopped -> ok = limit_queues(undefined, State),
+ undefined
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index c9f8183fc9..d998499d76 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -47,7 +47,7 @@
-spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
--spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -77,13 +77,12 @@ start_link(ChPid, UnackedMsgCount) ->
shutdown(undefined) ->
ok;
shutdown(LimiterPid) ->
- unlink(LimiterPid),
gen_server2:cast(LimiterPid, shutdown).
limit(undefined, 0) ->
ok;
limit(LimiterPid, PrefetchCount) ->
- gen_server2:cast(LimiterPid, {limit, PrefetchCount}).
+ gen_server2:call(LimiterPid, {limit, PrefetchCount}).
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
@@ -130,14 +129,18 @@ handle_call({can_send, QPid, AckRequired}, _From,
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
- {reply, PrefetchCount, State}.
+ {reply, PrefetchCount, State};
+
+handle_call({limit, PrefetchCount}, _From, State) ->
+ State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}),
+ case PrefetchCount == 0 of
+ true -> {stop, normal, stopped, State1};
+ false -> {reply, ok, State1}
+ end.
handle_cast(shutdown, State) ->
{stop, normal, State};
-handle_cast({limit, PrefetchCount}, State) ->
- {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};
-
handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count