diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-05-01 14:00:51 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-05-01 14:11:05 +0100 |
| commit | 9933cc44876cd5b277a0ecf3c7a2e31f636384c7 (patch) | |
| tree | c1b78c4298d4c9bdb1699b0f9d9126717550143e /src | |
| parent | 45402b9454a12f6b1a9be3ff1827ed4ee74da7d1 (diff) | |
| download | rabbitmq-server-git-9933cc44876cd5b277a0ecf3c7a2e31f636384c7.tar.gz | |
Always close channel on consumer timeout
In order to keep things simple and consistent this changes consumer
timeouts to always close the channel instead of cancelling and
returning.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 47 |
1 files changed, 5 insertions, 42 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 69add85057..048254f5e9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2712,58 +2712,21 @@ queue_fold(Fun, Init, Q) -> end. evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, - capabilities = Capabilities, consumer_timeout = Timeout}, - queue_names = QNames, - queue_consumers = QCons, unacked_message_q = UAMQ}) -> Now = os:system_time(millisecond), case ?QUEUE:peek(UAMQ) of - {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} + {value, {_DTag, ConsumerTag, Time, {_QPid, _Msg}}} when is_integer(Timeout) andalso Time < Now - Timeout -> rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " "waiting on ack", [rabbit_data_coercion:to_binary(ConsumerTag), Channel]), - SupportsCancel = case rabbit_misc:table_lookup( - Capabilities, - <<"consumer_cancel_notify">>) of - {bool, true} when is_binary(ConsumerTag) -> - true; - _ -> false - end, - case SupportsCancel of - false -> - Ex = rabbit_misc:amqp_error(precondition_failed, - "consumer ack timed out on channel ~w", - [Channel], none), - handle_exception(Ex, State0); - true -> - QRef = qpid_to_ref(QPid), - QName = maps:get(QRef, QNames), - %% cancel the consumer with the client - State2 = cancel_consumer(ConsumerTag, QName, State0), - [Q] = rabbit_amqqueue:lookup([QName]), - %% send basic cancel to the queue - {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, undefined, - <<"broker">>, State2#ch.queue_states), - %% return all in-flight messages for the consumer - {MsgIds, Rem} = lists:foldl( - fun({_DelTag, ConTag, _Time, {_, MsgId}}, - {Ids, Rem}) when ConTag == ConsumerTag -> - {[MsgId | Ids], Rem}; - (Unacked, {Ids, Rem}) -> - {Ids, ?QUEUE:in(Unacked, Rem)} - end, {[], ?QUEUE:new()}, - ?QUEUE:to_list(UAMQ)), - QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, - self(), QueueStates2), - {noreply, State2#ch{queue_states = QueueStates, - queue_consumers = maps:remove(QRef, QCons), - unacked_message_q = Rem}} - end; + Ex = rabbit_misc:amqp_error(precondition_failed, + "consumer ack timed out on channel ~w", + [Channel], none), + handle_exception(Ex, State0); _ -> {noreply, State0} end. |
