diff options
| -rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 11 |
2 files changed, 13 insertions, 2 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 710097477a..e7678cdf68 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -162,13 +162,15 @@ handle_message(Other, State) -> %%--------------------------------------------------------------------------- -terminate(Reason, State = #ch{writer_pid = WriterPid}) -> +terminate(Reason, State = #ch{writer_pid = WriterPid, + limiter_pid = LimiterPid}) -> Res = notify_queues(internal_rollback(State)), case Reason of normal -> ok = Res; _ -> ok end, rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid), exit(Reason). return_ok(State, true, _Msg) -> {noreply, State}; diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index a9ec9e10f3..6ffa8c23b1 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -35,7 +35,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([start_link/1]). +-export([start_link/1, shutdown/1]). -export([limit/2, can_send/2, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -43,6 +43,7 @@ -ifdef(use_specs). -spec(start_link/1 :: (pid()) -> pid()). +-spec(shutdown/1 :: (pid()) -> 'ok'). -spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(can_send/2 :: (pid(), pid()) -> bool()). -spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -66,6 +67,11 @@ start_link(ChPid) -> {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []), Pid. +shutdown(undefined) -> + ok; +shutdown(LimiterPid) -> + gen_server:cast(LimiterPid, shutdown). + limit(LimiterPid, PrefetchCount) -> gen_server:cast(LimiterPid, {limit, PrefetchCount}). @@ -102,6 +108,9 @@ handle_call({can_send, _QPid}, _From, State = #lim{in_use = InUse}) -> false -> {reply, true, State#lim{in_use = InUse + 1}} end. +handle_cast(shutdown, State) -> + {stop, normal, State}; + handle_cast({limit, PrefetchCount}, State) -> {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; |
