diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-05-03 13:27:29 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-05-03 13:27:29 +0300 |
| commit | 36972c3dae8ec81d3227fb4dd8ee9685cf69651f (patch) | |
| tree | 16f5603a15b5f5ed33468b7a5dd3cc4321ac16ba /src | |
| parent | 3976d87c9a3e2e4e45a7519ab52157a82524fd0a (diff) | |
| parent | 1bdd460e60d758412ef2a5dc9d3861136e05833f (diff) | |
| download | rabbitmq-server-git-36972c3dae8ec81d3227fb4dd8ee9685cf69651f.tar.gz | |
Merge pull request #1998 from rabbitmq/consumer_timeouts_take_2
Always close channel on consumer timeout
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 55 |
1 files changed, 9 insertions, 46 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 69add85057..19d4449e21 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2712,58 +2712,21 @@ queue_fold(Fun, Init, Q) -> end. evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, - capabilities = Capabilities, consumer_timeout = Timeout}, - queue_names = QNames, - queue_consumers = QCons, unacked_message_q = UAMQ}) -> Now = os:system_time(millisecond), case ?QUEUE:peek(UAMQ) of - {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} + {value, {_DTag, ConsumerTag, Time, {_QPid, _Msg}}} when is_integer(Timeout) andalso Time < Now - Timeout -> - rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " - "waiting on ack", - [rabbit_data_coercion:to_binary(ConsumerTag), - Channel]), - SupportsCancel = case rabbit_misc:table_lookup( - Capabilities, - <<"consumer_cancel_notify">>) of - {bool, true} when is_binary(ConsumerTag) -> - true; - _ -> false - end, - case SupportsCancel of - false -> - Ex = rabbit_misc:amqp_error(precondition_failed, - "consumer ack timed out on channel ~w", - [Channel], none), - handle_exception(Ex, State0); - true -> - QRef = qpid_to_ref(QPid), - QName = maps:get(QRef, QNames), - %% cancel the consumer with the client - State2 = cancel_consumer(ConsumerTag, QName, State0), - [Q] = rabbit_amqqueue:lookup([QName]), - %% send basic cancel to the queue - {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, undefined, - <<"broker">>, State2#ch.queue_states), - %% return all in-flight messages for the consumer - {MsgIds, Rem} = lists:foldl( - fun({_DelTag, ConTag, _Time, {_, MsgId}}, - {Ids, Rem}) when ConTag == ConsumerTag -> - {[MsgId | Ids], Rem}; - (Unacked, {Ids, Rem}) -> - {Ids, ?QUEUE:in(Unacked, Rem)} - end, {[], ?QUEUE:new()}, - ?QUEUE:to_list(UAMQ)), - QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, - self(), QueueStates2), - {noreply, State2#ch{queue_states = QueueStates, - queue_consumers = maps:remove(QRef, QCons), - unacked_message_q = Rem}} - end; + rabbit_log_channel:warning("Consumer ~s on channel ~w has timed out " + "waiting on consumer acknowledgement. Timeout used: ~p ms", + [rabbit_data_coercion:to_binary(ConsumerTag), + Channel, Timeout]), + Ex = rabbit_misc:amqp_error(precondition_failed, + "consumer ack timed out on channel ~w", + [Channel], none), + handle_exception(Ex, State0); _ -> {noreply, State0} end. |
