summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-04-09 10:00:51 +0100
committerkjnilsson <knilsson@pivotal.io>2019-04-23 14:36:10 +0100
commit4642bf0d7bd168300fd443ea2beb059b86862e8b (patch)
treeb7e749b8213bd8cbd31517cb7a526bcbfbffb5ee
parentec76fef8a84f4dee56e422564cac5634ed89976f (diff)
downloadrabbitmq-server-git-4642bf0d7bd168300fd443ea2beb059b86862e8b.tar.gz
Cancel tick timer if there are no pending acks
If there are no pending acks we should allow the channel to enter hibernate without being woken up by the tick timer. [#164212469]
-rw-r--r--src/rabbit_channel.erl47
1 files changed, 37 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b705dbcbdc..6aa64a3336 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -66,8 +66,9 @@
-export([source/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
- prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
+ prioritise_call/4, prioritise_cast/3, prioritise_info/3,
+ format_message_queue/2]).
-deprecated([{force_event_refresh, 1, eventually}]).
@@ -836,8 +837,8 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
unacked_message_q = UAMQ}) ->
QueueStates1 =
maps:filter(fun(_, QS) ->
- QName = rabbit_quorum_queue:queue_name(QS),
- [] /= rabbit_amqqueue:lookup(QName)
+ QName = rabbit_quorum_queue:queue_name(QS),
+ [] /= rabbit_amqqueue:lookup([QName])
end, QueueStates0),
Now = os:system_time(millisecond),
@@ -886,26 +887,32 @@ handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
State = State1#ch{queue_states = QueueStates,
queue_consumers = maps:remove(QRef, QCons),
unacked_message_q = Rem},
- noreply(init_tick_timer(State))
+ noreply(
+ init_tick_timer(reset_tick_timer(State)))
end;
_ ->
noreply(
init_tick_timer(
- State0#ch{queue_states = QueueStates1}))
+ reset_tick_timer(State0#ch{queue_states = QueueStates1})))
end;
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
noreply(State#ch{cfg = Cfg#conf{source = Source}}).
-handle_pre_hibernate(State) ->
+handle_pre_hibernate(State0) ->
ok = clear_permission_cache(),
+ State = maybe_cancel_tick_timer(State0),
rabbit_event:if_enabled(
State, #ch.stats_timer,
fun () -> emit_stats(State,
[{idle_since,
os:system_time(milli_seconds)}])
- end),
+ end),
{hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
+handle_post_hibernate(State0) ->
+ State = init_tick_timer(State0),
+ {noreply, State}.
+
terminate(_Reason,
State = #ch{cfg = #conf{user = #user{username = Username}}}) ->
{_Res, _State1} = notify_queues(State),
@@ -2690,9 +2697,29 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
State1 = track_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
-init_tick_timer(State) ->
+init_tick_timer(State = #ch{tick_timer = undefined}) ->
{ok, Interval} = application:get_env(rabbit, channel_tick_interval),
- State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}.
+ State#ch{tick_timer = erlang:send_after(Interval, self(), tick)};
+init_tick_timer(State) ->
+ State.
+
+reset_tick_timer(State) ->
+ State#ch{tick_timer = undefined}.
+
+maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) ->
+ State;
+maybe_cancel_tick_timer(#ch{tick_timer = TRef,
+ unacked_message_q = UMQ} = State) ->
+ case ?QUEUE:len(UMQ) of
+ 0 ->
+ %% we can only cancel the tick timer if the unacked messages
+ %% queue is empty.
+ _ = erlang:cancel_timer(TRef),
+ State#ch{tick_timer = undefined};
+ _ ->
+ %% let the timer continue
+ State
+ end.
%% only classic queues need monitoring so rather than special casing
%% everywhere monitors are set up we wrap it here for this module