summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_limiter.erl11
2 files changed, 13 insertions, 2 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 710097477a..e7678cdf68 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -162,13 +162,15 @@ handle_message(Other, State) ->
%%---------------------------------------------------------------------------
-terminate(Reason, State = #ch{writer_pid = WriterPid}) ->
+terminate(Reason, State = #ch{writer_pid = WriterPid,
+ limiter_pid = LimiterPid}) ->
Res = notify_queues(internal_rollback(State)),
case Reason of
normal -> ok = Res;
_ -> ok
end,
rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid),
exit(Reason).
return_ok(State, true, _Msg) -> {noreply, State};
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index a9ec9e10f3..6ffa8c23b1 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -35,7 +35,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
--export([start_link/1]).
+-export([start_link/1, shutdown/1]).
-export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
%%----------------------------------------------------------------------------
@@ -43,6 +43,7 @@
-ifdef(use_specs).
-spec(start_link/1 :: (pid()) -> pid()).
+-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(limit/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(can_send/2 :: (pid(), pid()) -> bool()).
-spec(ack/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -66,6 +67,11 @@ start_link(ChPid) ->
{ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
Pid.
+shutdown(undefined) ->
+ ok;
+shutdown(LimiterPid) ->
+ gen_server:cast(LimiterPid, shutdown).
+
limit(LimiterPid, PrefetchCount) ->
gen_server:cast(LimiterPid, {limit, PrefetchCount}).
@@ -102,6 +108,9 @@ handle_call({can_send, _QPid}, _From, State = #lim{in_use = InUse}) ->
false -> {reply, true, State#lim{in_use = InUse + 1}}
end.
+handle_cast(shutdown, State) ->
+ {stop, normal, State};
+
handle_cast({limit, PrefetchCount}, State) ->
{noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};