summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-04-09 10:00:51 +0100
committerkjnilsson <knilsson@pivotal.io>2019-04-23 14:36:10 +0100
commit4642bf0d7bd168300fd443ea2beb059b86862e8b (patch)
treeb7e749b8213bd8cbd31517cb7a526bcbfbffb5ee /src
parentec76fef8a84f4dee56e422564cac5634ed89976f (diff)
downloadrabbitmq-server-git-4642bf0d7bd168300fd443ea2beb059b86862e8b.tar.gz
Cancel tick timer if there are no pending acks
If there are no pending acks we should allow the channel to enter hibernate without being woken up by the tick timer. [#164212469]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl47
1 files changed, 37 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b705dbcbdc..6aa64a3336 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -66,8 +66,9 @@
-export([source/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
- prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
+ prioritise_call/4, prioritise_cast/3, prioritise_info/3,
+ format_message_queue/2]).
-deprecated([{force_event_refresh, 1, eventually}]).
@@ -836,8 +837,8 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
unacked_message_q = UAMQ}) ->
QueueStates1 =
maps:filter(fun(_, QS) ->
- QName = rabbit_quorum_queue:queue_name(QS),
- [] /= rabbit_amqqueue:lookup(QName)
+ QName = rabbit_quorum_queue:queue_name(QS),
+ [] /= rabbit_amqqueue:lookup([QName])
end, QueueStates0),
Now = os:system_time(millisecond),
@@ -886,26 +887,32 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
State = State1#ch{queue_states = QueueStates,
queue_consumers = maps:remove(QRef, QCons),
unacked_message_q = Rem},
- noreply(init_tick_timer(State))
+ noreply(
+ init_tick_timer(reset_tick_timer(State)))
end;
_ ->
noreply(
init_tick_timer(
- State0#ch{queue_states = QueueStates1}))
+ reset_tick_timer(State0#ch{queue_states = QueueStates1})))
end;
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
noreply(State#ch{cfg = Cfg#conf{source = Source}}).
-handle_pre_hibernate(State) ->
+handle_pre_hibernate(State0) ->
ok = clear_permission_cache(),
+ State = maybe_cancel_tick_timer(State0),
rabbit_event:if_enabled(
State, #ch.stats_timer,
fun () -> emit_stats(State,
[{idle_since,
os:system_time(milli_seconds)}])
- end),
+ end),
{hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
+handle_post_hibernate(State0) ->
+ State = init_tick_timer(State0),
+ {noreply, State}.
+
terminate(_Reason,
State = #ch{cfg = #conf{user = #user{username = Username}}}) ->
{_Res, _State1} = notify_queues(State),
@@ -2690,9 +2697,29 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
State1 = track_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
-init_tick_timer(State) ->
+init_tick_timer(State = #ch{tick_timer = undefined}) ->
{ok, Interval} = application:get_env(rabbit, channel_tick_interval),
- State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}.
+ State#ch{tick_timer = erlang:send_after(Interval, self(), tick)};
+init_tick_timer(State) ->
+ State.
+
+reset_tick_timer(State) ->
+ State#ch{tick_timer = undefined}.
+
+maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) ->
+ State;
+maybe_cancel_tick_timer(#ch{tick_timer = TRef,
+ unacked_message_q = UMQ} = State) ->
+ case ?QUEUE:len(UMQ) of
+ 0 ->
+ %% we can only cancel the tick timer if the unacked messages
+ %% queue is empty.
+ _ = erlang:cancel_timer(TRef),
+ State#ch{tick_timer = undefined};
+ _ ->
+ %% let the timer continue
+ State
+ end.
%% only classic queues need monitoring so rather than special casing
%% everywhere monitors are set up we wrap it here for this module