summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl47
1 files changed, 37 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b705dbcbdc..6aa64a3336 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -66,8 +66,9 @@
-export([source/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
- prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
+ prioritise_call/4, prioritise_cast/3, prioritise_info/3,
+ format_message_queue/2]).
-deprecated([{force_event_refresh, 1, eventually}]).
@@ -836,8 +837,8 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
unacked_message_q = UAMQ}) ->
QueueStates1 =
maps:filter(fun(_, QS) ->
- QName = rabbit_quorum_queue:queue_name(QS),
- [] /= rabbit_amqqueue:lookup(QName)
+ QName = rabbit_quorum_queue:queue_name(QS),
+ [] /= rabbit_amqqueue:lookup([QName])
end, QueueStates0),
Now = os:system_time(millisecond),
@@ -886,26 +887,32 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
State = State1#ch{queue_states = QueueStates,
queue_consumers = maps:remove(QRef, QCons),
unacked_message_q = Rem},
- noreply(init_tick_timer(State))
+ noreply(
+ init_tick_timer(reset_tick_timer(State)))
end;
_ ->
noreply(
init_tick_timer(
- State0#ch{queue_states = QueueStates1}))
+ reset_tick_timer(State0#ch{queue_states = QueueStates1})))
end;
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
noreply(State#ch{cfg = Cfg#conf{source = Source}}).
-handle_pre_hibernate(State) ->
+handle_pre_hibernate(State0) ->
ok = clear_permission_cache(),
+ State = maybe_cancel_tick_timer(State0),
rabbit_event:if_enabled(
State, #ch.stats_timer,
fun () -> emit_stats(State,
[{idle_since,
os:system_time(milli_seconds)}])
- end),
+ end),
{hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
+handle_post_hibernate(State0) ->
+ State = init_tick_timer(State0),
+ {noreply, State}.
+
terminate(_Reason,
State = #ch{cfg = #conf{user = #user{username = Username}}}) ->
{_Res, _State1} = notify_queues(State),
@@ -2690,9 +2697,29 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
State1 = track_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
-init_tick_timer(State) ->
+init_tick_timer(State = #ch{tick_timer = undefined}) ->
{ok, Interval} = application:get_env(rabbit, channel_tick_interval),
- State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}.
+ State#ch{tick_timer = erlang:send_after(Interval, self(), tick)};
+init_tick_timer(State) ->
+ State.
+
+reset_tick_timer(State) ->
+ State#ch{tick_timer = undefined}.
+
+maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) ->
+ State;
+maybe_cancel_tick_timer(#ch{tick_timer = TRef,
+ unacked_message_q = UMQ} = State) ->
+ case ?QUEUE:len(UMQ) of
+ 0 ->
+ %% we can only cancel the tick timer if the unacked messages
+ %% queue is empty.
+ _ = erlang:cancel_timer(TRef),
+ State#ch{tick_timer = undefined};
+ _ ->
+ %% let the timer continue
+ State
+ end.
%% only classic queues need monitoring so rather than special casing
%% everywhere monitors are set up we wrap it here for this module