diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-04-09 10:00:51 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-04-23 14:36:10 +0100 |
| commit | 4642bf0d7bd168300fd443ea2beb059b86862e8b (patch) | |
| tree | b7e749b8213bd8cbd31517cb7a526bcbfbffb5ee | |
| parent | ec76fef8a84f4dee56e422564cac5634ed89976f (diff) | |
| download | rabbitmq-server-git-4642bf0d7bd168300fd443ea2beb059b86862e8b.tar.gz | |
Cancel tick timer if there are no pending acks
If there are no pending acks we should allow the channel to enter
hibernate without being woken up by the tick timer.
[#164212469]
| -rw-r--r-- | src/rabbit_channel.erl | 47 |
1 files changed, 37 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b705dbcbdc..6aa64a3336 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -66,8 +66,9 @@ -export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/4, - prioritise_cast/3, prioritise_info/3, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1, + prioritise_call/4, prioritise_cast/3, prioritise_info/3, + format_message_queue/2]). -deprecated([{force_event_refresh, 1, eventually}]). @@ -836,8 +837,8 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel, unacked_message_q = UAMQ}) -> QueueStates1 = maps:filter(fun(_, QS) -> - QName = rabbit_quorum_queue:queue_name(QS), - [] /= rabbit_amqqueue:lookup(QName) + QName = rabbit_quorum_queue:queue_name(QS), + [] /= rabbit_amqqueue:lookup([QName]) end, QueueStates0), Now = os:system_time(millisecond), @@ -886,26 +887,32 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel, State = State1#ch{queue_states = QueueStates, queue_consumers = maps:remove(QRef, QCons), unacked_message_q = Rem}, - noreply(init_tick_timer(State)) + noreply( + init_tick_timer(reset_tick_timer(State))) end; _ -> noreply( init_tick_timer( - State0#ch{queue_states = QueueStates1})) + reset_tick_timer(State0#ch{queue_states = QueueStates1}))) end; handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> noreply(State#ch{cfg = Cfg#conf{source = Source}}). -handle_pre_hibernate(State) -> +handle_pre_hibernate(State0) -> ok = clear_permission_cache(), + State = maybe_cancel_tick_timer(State0), rabbit_event:if_enabled( State, #ch.stats_timer, fun () -> emit_stats(State, [{idle_since, os:system_time(milli_seconds)}]) - end), + end), {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. +handle_post_hibernate(State0) -> + State = init_tick_timer(State0), + {noreply, State}. + terminate(_Reason, State = #ch{cfg = #conf{user = #user{username = Username}}}) -> {_Res, _State1} = notify_queues(State), @@ -2690,9 +2697,29 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, State1 = track_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}. -init_tick_timer(State) -> +init_tick_timer(State = #ch{tick_timer = undefined}) -> {ok, Interval} = application:get_env(rabbit, channel_tick_interval), - State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}. + State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}; +init_tick_timer(State) -> + State. + +reset_tick_timer(State) -> + State#ch{tick_timer = undefined}. + +maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) -> + State; +maybe_cancel_tick_timer(#ch{tick_timer = TRef, + unacked_message_q = UMQ} = State) -> + case ?QUEUE:len(UMQ) of + 0 -> + %% we can only cancel the tick timer if the unacked messages + %% queue is empty. + _ = erlang:cancel_timer(TRef), + State#ch{tick_timer = undefined}; + _ -> + %% let the timer continue + State + end. %% only classic queues need monitoring so rather than special casing %% everywhere monitors are set up we wrap it here for this module |
