diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 47 |
1 files changed, 5 insertions, 42 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 69add85057..048254f5e9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2712,58 +2712,21 @@ queue_fold(Fun, Init, Q) -> end. evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, - capabilities = Capabilities, consumer_timeout = Timeout}, - queue_names = QNames, - queue_consumers = QCons, unacked_message_q = UAMQ}) -> Now = os:system_time(millisecond), case ?QUEUE:peek(UAMQ) of - {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} + {value, {_DTag, ConsumerTag, Time, {_QPid, _Msg}}} when is_integer(Timeout) andalso Time < Now - Timeout -> rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " "waiting on ack", [rabbit_data_coercion:to_binary(ConsumerTag), Channel]), - SupportsCancel = case rabbit_misc:table_lookup( - Capabilities, - <<"consumer_cancel_notify">>) of - {bool, true} when is_binary(ConsumerTag) -> - true; - _ -> false - end, - case SupportsCancel of - false -> - Ex = rabbit_misc:amqp_error(precondition_failed, - "consumer ack timed out on channel ~w", - [Channel], none), - handle_exception(Ex, State0); - true -> - QRef = qpid_to_ref(QPid), - QName = maps:get(QRef, QNames), - %% cancel the consumer with the client - State2 = cancel_consumer(ConsumerTag, QName, State0), - [Q] = rabbit_amqqueue:lookup([QName]), - %% send basic cancel to the queue - {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, undefined, - <<"broker">>, State2#ch.queue_states), - %% return all in-flight messages for the consumer - {MsgIds, Rem} = lists:foldl( - fun({_DelTag, ConTag, _Time, {_, MsgId}}, - {Ids, Rem}) when ConTag == ConsumerTag -> - {[MsgId | Ids], Rem}; - (Unacked, {Ids, Rem}) -> - {Ids, ?QUEUE:in(Unacked, Rem)} - end, {[], ?QUEUE:new()}, - ?QUEUE:to_list(UAMQ)), - QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, - self(), QueueStates2), - {noreply, State2#ch{queue_states = QueueStates, - queue_consumers = maps:remove(QRef, QCons), - unacked_message_q = Rem}} - end; + Ex = rabbit_misc:amqp_error(precondition_failed, + "consumer ack timed out on channel ~w", + [Channel], none), + handle_exception(Ex, State0); _ -> {noreply, State0} end. |
