summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-04-09 11:39:22 +0100
committerkjnilsson <knilsson@pivotal.io>2019-04-23 14:36:10 +0100
commit08f7e23527530ec771b38db3f711a43b192ed673 (patch)
treed5eae6110b1312d91beb65738b5c32fe46076aab
parent4642bf0d7bd168300fd443ea2beb059b86862e8b (diff)
downloadrabbitmq-server-git-08f7e23527530ec771b38db3f711a43b192ed673.tar.gz
Move consumer timeout handler
To own function
-rw-r--r--src/rabbit_channel.erl124
1 files changed, 63 insertions, 61 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6aa64a3336..b9f842e9e9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -828,72 +828,17 @@ handle_info({{Ref, Node}, LateAnswer},
[Channel, LateAnswer, Node]),
noreply(State);
-handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
- capabilities = Capabilities,
- consumer_timeout = Timeout},
- queue_states = QueueStates0,
- queue_names = QNames,
- queue_consumers = QCons,
- unacked_message_q = UAMQ}) ->
+handle_info(tick, State0 = #ch{queue_states = QueueStates0}) ->
QueueStates1 =
maps:filter(fun(_, QS) ->
QName = rabbit_quorum_queue:queue_name(QS),
[] /= rabbit_amqqueue:lookup([QName])
end, QueueStates0),
-
- Now = os:system_time(millisecond),
- case ?QUEUE:peek(UAMQ) of
- {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
- when is_integer(Timeout)
- andalso Time < Now - Timeout ->
- rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
- "waiting on ack",
- [rabbit_data_coercion:to_binary(ConsumerTag),
- Channel]),
- SupportsCancel = case rabbit_misc:table_lookup(
- Capabilities,
- <<"consumer_cancel_notify">>) of
- {bool, true} when is_binary(ConsumerTag) ->
- true;
- _ -> false
- end,
- case SupportsCancel of
- false ->
- Ex = rabbit_misc:amqp_error(precondition_failed,
- "consumer ack timed out on channel ~w",
- [Channel], none),
- handle_exception(Ex, State0);
- true ->
- QRef = qpid_to_ref(QPid),
- QName = maps:get(QRef, QNames),
- %% cancel the consumer with the client
- State1 = cancel_consumer(ConsumerTag, QName, State0),
- [Q] = rabbit_amqqueue:lookup([QName]),
- %% send basic cancel to the queue
- {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag, undefined,
- <<"broker">>, QueueStates1),
- %% return all in-flight messages for the consumer
- {MsgIds, Rem} = lists:foldl(
- fun({_DelTag, ConTag, _Time, {_, MsgId}},
- {Ids, Rem}) when ConTag == ConsumerTag ->
- {[MsgId | Ids], Rem};
- (Unacked, {Ids, Rem}) ->
- {Ids, ?QUEUE:in(Unacked, Rem)}
- end, {[], ?QUEUE:new()},
- ?QUEUE:to_list(UAMQ)),
- QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
- self(), QueueStates2),
- State = State1#ch{queue_states = QueueStates,
- queue_consumers = maps:remove(QRef, QCons),
- unacked_message_q = Rem},
- noreply(
- init_tick_timer(reset_tick_timer(State)))
- end;
- _ ->
- noreply(
- init_tick_timer(
- reset_tick_timer(State0#ch{queue_states = QueueStates1})))
+ case evalauate_consumer_timeout(State0#ch{queue_states = QueueStates1}) of
+ {noreply, State} ->
+ noreply(init_tick_timer(reset_tick_timer(State)));
+ Return ->
+ Return
end;
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
noreply(State#ch{cfg = Cfg#conf{source = Source}}).
@@ -2757,3 +2702,60 @@ queue_fold(Fun, Init, Q) ->
{empty, _Q} -> Init;
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
end.
+
+evalauate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
+ capabilities = Capabilities,
+ consumer_timeout = Timeout},
+ queue_names = QNames,
+ queue_consumers = QCons,
+ unacked_message_q = UAMQ}) ->
+ Now = os:system_time(millisecond),
+ case ?QUEUE:peek(UAMQ) of
+ {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
+ when is_integer(Timeout)
+ andalso Time < Now - Timeout ->
+ rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
+ "waiting on ack",
+ [rabbit_data_coercion:to_binary(ConsumerTag),
+ Channel]),
+ SupportsCancel = case rabbit_misc:table_lookup(
+ Capabilities,
+ <<"consumer_cancel_notify">>) of
+ {bool, true} when is_binary(ConsumerTag) ->
+ true;
+ _ -> false
+ end,
+ case SupportsCancel of
+ false ->
+ Ex = rabbit_misc:amqp_error(precondition_failed,
+ "consumer ack timed out on channel ~w",
+ [Channel], none),
+ handle_exception(Ex, State0);
+ true ->
+ QRef = qpid_to_ref(QPid),
+ QName = maps:get(QRef, QNames),
+ %% cancel the consumer with the client
+ State2 = cancel_consumer(ConsumerTag, QName, State0),
+ [Q] = rabbit_amqqueue:lookup([QName]),
+ %% send basic cancel to the queue
+ {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
+ Q, self(), ConsumerTag, undefined,
+ <<"broker">>, State2#ch.queue_states),
+ %% return all in-flight messages for the consumer
+ {MsgIds, Rem} = lists:foldl(
+ fun({_DelTag, ConTag, _Time, {_, MsgId}},
+ {Ids, Rem}) when ConTag == ConsumerTag ->
+ {[MsgId | Ids], Rem};
+ (Unacked, {Ids, Rem}) ->
+ {Ids, ?QUEUE:in(Unacked, Rem)}
+ end, {[], ?QUEUE:new()},
+ ?QUEUE:to_list(UAMQ)),
+ QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
+ self(), QueueStates2),
+ {noreply, State2#ch{queue_states = QueueStates,
+ queue_consumers = maps:remove(QRef, QCons),
+ unacked_message_q = Rem}}
+ end;
+ _ ->
+ {noreply, State0}
+ end.