diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 47 |
1 files changed, 37 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b705dbcbdc..6aa64a3336 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -66,8 +66,9 @@ -export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/4, - prioritise_cast/3, prioritise_info/3, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1, + prioritise_call/4, prioritise_cast/3, prioritise_info/3, + format_message_queue/2]). -deprecated([{force_event_refresh, 1, eventually}]). @@ -836,8 +837,8 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel, unacked_message_q = UAMQ}) -> QueueStates1 = maps:filter(fun(_, QS) -> - QName = rabbit_quorum_queue:queue_name(QS), - [] /= rabbit_amqqueue:lookup(QName) + QName = rabbit_quorum_queue:queue_name(QS), + [] /= rabbit_amqqueue:lookup([QName]) end, QueueStates0), Now = os:system_time(millisecond), @@ -886,26 +887,32 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel, State = State1#ch{queue_states = QueueStates, queue_consumers = maps:remove(QRef, QCons), unacked_message_q = Rem}, - noreply(init_tick_timer(State)) + noreply( + init_tick_timer(reset_tick_timer(State))) end; _ -> noreply( init_tick_timer( - State0#ch{queue_states = QueueStates1})) + reset_tick_timer(State0#ch{queue_states = QueueStates1}))) end; handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> noreply(State#ch{cfg = Cfg#conf{source = Source}}). -handle_pre_hibernate(State) -> +handle_pre_hibernate(State0) -> ok = clear_permission_cache(), + State = maybe_cancel_tick_timer(State0), rabbit_event:if_enabled( State, #ch.stats_timer, fun () -> emit_stats(State, [{idle_since, os:system_time(milli_seconds)}]) - end), + end), {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. +handle_post_hibernate(State0) -> + State = init_tick_timer(State0), + {noreply, State}. + terminate(_Reason, State = #ch{cfg = #conf{user = #user{username = Username}}}) -> {_Res, _State1} = notify_queues(State), @@ -2690,9 +2697,29 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, State1 = track_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}. -init_tick_timer(State) -> +init_tick_timer(State = #ch{tick_timer = undefined}) -> {ok, Interval} = application:get_env(rabbit, channel_tick_interval), - State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}. + State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}; +init_tick_timer(State) -> + State. + +reset_tick_timer(State) -> + State#ch{tick_timer = undefined}. + +maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) -> + State; +maybe_cancel_tick_timer(#ch{tick_timer = TRef, + unacked_message_q = UMQ} = State) -> + case ?QUEUE:len(UMQ) of + 0 -> + %% we can only cancel the tick timer if the unacked messages + %% queue is empty. + _ = erlang:cancel_timer(TRef), + State#ch{tick_timer = undefined}; + _ -> + %% let the timer continue + State + end. %% only classic queues need monitoring so rather than special casing %% everywhere monitors are set up we wrap it here for this module |
