diff options
| -rw-r--r-- | src/rabbit_channel.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 17 |
2 files changed, 29 insertions, 24 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a56955cb96..9b6c87d8cb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -214,6 +214,8 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; +handle_info({'EXIT', _OldLimiterPid, normal}, State) -> + {noreply, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -542,23 +544,23 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{ limiter_pid = LimiterPid, unacked_message_q = UAMQ }) -> - NewLimiterPid = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> - undefined; - {undefined, _} -> - LPid = rabbit_limiter:start_link(self(), - queue:len(UAMQ)), - ok = limit_queues(LPid, State), - LPid; - {_, 0} -> - ok = rabbit_limiter:shutdown(LimiterPid), - ok = limit_queues(undefined, State), - undefined; - {_, _} -> - LimiterPid - end, - ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), - {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; + LimiterPid1 = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> + undefined; + {undefined, _} -> + LPid = rabbit_limiter:start_link(self(), + queue:len(UAMQ)), + ok = limit_queues(LPid, State), + LPid; + {_, _} -> + LimiterPid + end, + LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of + ok -> LimiterPid1; + stopped -> ok = limit_queues(undefined, State), + undefined + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c9f8183fc9..d998499d76 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -47,7 +47,7 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -77,13 +77,12 @@ start_link(ChPid, UnackedMsgCount) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> - unlink(LimiterPid), gen_server2:cast(LimiterPid, shutdown). limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:cast(LimiterPid, {limit, PrefetchCount}). + gen_server2:call(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -130,14 +129,18 @@ handle_call({can_send, QPid, AckRequired}, _From, end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}. + {reply, PrefetchCount, State}; + +handle_call({limit, PrefetchCount}, _From, State) -> + State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}), + case PrefetchCount == 0 of + true -> {stop, normal, stopped, State1}; + false -> {reply, ok, State1} + end. handle_cast(shutdown, State) -> {stop, normal, State}; -handle_cast({limit, PrefetchCount}, State) -> - {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count |
