diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-04-09 11:39:22 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-04-23 14:36:10 +0100 |
| commit | 08f7e23527530ec771b38db3f711a43b192ed673 (patch) | |
| tree | d5eae6110b1312d91beb65738b5c32fe46076aab | |
| parent | 4642bf0d7bd168300fd443ea2beb059b86862e8b (diff) | |
| download | rabbitmq-server-git-08f7e23527530ec771b38db3f711a43b192ed673.tar.gz | |
Move consumer timeout handler
To own function
| -rw-r--r-- | src/rabbit_channel.erl | 124 |
1 files changed, 63 insertions, 61 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6aa64a3336..b9f842e9e9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -828,72 +828,17 @@ handle_info({{Ref, Node}, LateAnswer}, [Channel, LateAnswer, Node]), noreply(State); -handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel, - capabilities = Capabilities, - consumer_timeout = Timeout}, - queue_states = QueueStates0, - queue_names = QNames, - queue_consumers = QCons, - unacked_message_q = UAMQ}) -> +handle_info(tick, State0 = #ch{queue_states = QueueStates0}) -> QueueStates1 = maps:filter(fun(_, QS) -> QName = rabbit_quorum_queue:queue_name(QS), [] /= rabbit_amqqueue:lookup([QName]) end, QueueStates0), - - Now = os:system_time(millisecond), - case ?QUEUE:peek(UAMQ) of - {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} - when is_integer(Timeout) - andalso Time < Now - Timeout -> - rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " - "waiting on ack", - [rabbit_data_coercion:to_binary(ConsumerTag), - Channel]), - SupportsCancel = case rabbit_misc:table_lookup( - Capabilities, - <<"consumer_cancel_notify">>) of - {bool, true} when is_binary(ConsumerTag) -> - true; - _ -> false - end, - case SupportsCancel of - false -> - Ex = rabbit_misc:amqp_error(precondition_failed, - "consumer ack timed out on channel ~w", - [Channel], none), - handle_exception(Ex, State0); - true -> - QRef = qpid_to_ref(QPid), - QName = maps:get(QRef, QNames), - %% cancel the consumer with the client - State1 = cancel_consumer(ConsumerTag, QName, State0), - [Q] = rabbit_amqqueue:lookup([QName]), - %% send basic cancel to the queue - {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, undefined, - <<"broker">>, QueueStates1), - %% return all in-flight messages for the consumer - {MsgIds, Rem} = lists:foldl( - fun({_DelTag, ConTag, _Time, {_, MsgId}}, - {Ids, Rem}) when ConTag == ConsumerTag -> - {[MsgId | Ids], Rem}; - (Unacked, {Ids, Rem}) -> - {Ids, ?QUEUE:in(Unacked, Rem)} - end, {[], ?QUEUE:new()}, - ?QUEUE:to_list(UAMQ)), - QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, - self(), QueueStates2), - State = State1#ch{queue_states = QueueStates, - queue_consumers = maps:remove(QRef, QCons), - unacked_message_q = Rem}, - noreply( - init_tick_timer(reset_tick_timer(State))) - end; - _ -> - noreply( - init_tick_timer( - reset_tick_timer(State0#ch{queue_states = QueueStates1}))) + case evalauate_consumer_timeout(State0#ch{queue_states = QueueStates1}) of + {noreply, State} -> + noreply(init_tick_timer(reset_tick_timer(State))); + Return -> + Return end; handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> noreply(State#ch{cfg = Cfg#conf{source = Source}}). @@ -2757,3 +2702,60 @@ queue_fold(Fun, Init, Q) -> {empty, _Q} -> Init; {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. + +evalauate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, + capabilities = Capabilities, + consumer_timeout = Timeout}, + queue_names = QNames, + queue_consumers = QCons, + unacked_message_q = UAMQ}) -> + Now = os:system_time(millisecond), + case ?QUEUE:peek(UAMQ) of + {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} + when is_integer(Timeout) + andalso Time < Now - Timeout -> + rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " + "waiting on ack", + [rabbit_data_coercion:to_binary(ConsumerTag), + Channel]), + SupportsCancel = case rabbit_misc:table_lookup( + Capabilities, + <<"consumer_cancel_notify">>) of + {bool, true} when is_binary(ConsumerTag) -> + true; + _ -> false + end, + case SupportsCancel of + false -> + Ex = rabbit_misc:amqp_error(precondition_failed, + "consumer ack timed out on channel ~w", + [Channel], none), + handle_exception(Ex, State0); + true -> + QRef = qpid_to_ref(QPid), + QName = maps:get(QRef, QNames), + %% cancel the consumer with the client + State2 = cancel_consumer(ConsumerTag, QName, State0), + [Q] = rabbit_amqqueue:lookup([QName]), + %% send basic cancel to the queue + {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( + Q, self(), ConsumerTag, undefined, + <<"broker">>, State2#ch.queue_states), + %% return all in-flight messages for the consumer + {MsgIds, Rem} = lists:foldl( + fun({_DelTag, ConTag, _Time, {_, MsgId}}, + {Ids, Rem}) when ConTag == ConsumerTag -> + {[MsgId | Ids], Rem}; + (Unacked, {Ids, Rem}) -> + {Ids, ?QUEUE:in(Unacked, Rem)} + end, {[], ?QUEUE:new()}, + ?QUEUE:to_list(UAMQ)), + QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, + self(), QueueStates2), + {noreply, State2#ch{queue_states = QueueStates, + queue_consumers = maps:remove(QRef, QCons), + unacked_message_q = Rem}} + end; + _ -> + {noreply, State0} + end. |
