diff options
| -rw-r--r-- | src/rabbit_channel.erl | 47 | ||||
| -rw-r--r-- | test/consumer_timeout_SUITE.erl | 28 |
2 files changed, 19 insertions, 56 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 69add85057..048254f5e9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2712,58 +2712,21 @@ queue_fold(Fun, Init, Q) -> end. evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, - capabilities = Capabilities, consumer_timeout = Timeout}, - queue_names = QNames, - queue_consumers = QCons, unacked_message_q = UAMQ}) -> Now = os:system_time(millisecond), case ?QUEUE:peek(UAMQ) of - {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} + {value, {_DTag, ConsumerTag, Time, {_QPid, _Msg}}} when is_integer(Timeout) andalso Time < Now - Timeout -> rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " "waiting on ack", [rabbit_data_coercion:to_binary(ConsumerTag), Channel]), - SupportsCancel = case rabbit_misc:table_lookup( - Capabilities, - <<"consumer_cancel_notify">>) of - {bool, true} when is_binary(ConsumerTag) -> - true; - _ -> false - end, - case SupportsCancel of - false -> - Ex = rabbit_misc:amqp_error(precondition_failed, - "consumer ack timed out on channel ~w", - [Channel], none), - handle_exception(Ex, State0); - true -> - QRef = qpid_to_ref(QPid), - QName = maps:get(QRef, QNames), - %% cancel the consumer with the client - State2 = cancel_consumer(ConsumerTag, QName, State0), - [Q] = rabbit_amqqueue:lookup([QName]), - %% send basic cancel to the queue - {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, undefined, - <<"broker">>, State2#ch.queue_states), - %% return all in-flight messages for the consumer - {MsgIds, Rem} = lists:foldl( - fun({_DelTag, ConTag, _Time, {_, MsgId}}, - {Ids, Rem}) when ConTag == ConsumerTag -> - {[MsgId | Ids], Rem}; - (Unacked, {Ids, Rem}) -> - {Ids, ?QUEUE:in(Unacked, Rem)} - end, {[], ?QUEUE:new()}, - ?QUEUE:to_list(UAMQ)), - QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, - self(), QueueStates2), - {noreply, State2#ch{queue_states = QueueStates, - queue_consumers = maps:remove(QRef, QCons), - unacked_message_q = Rem}} - end; + Ex = rabbit_misc:amqp_error(precondition_failed, + "consumer ack timed out on channel ~w", + [Channel], none), + handle_exception(Ex, State0); _ -> {noreply, State0} end. diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl index 8817b93c03..b1b55879cc 100644 --- a/test/consumer_timeout_SUITE.erl +++ b/test/consumer_timeout_SUITE.erl @@ -129,26 +129,26 @@ end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). consumer_timeout(Config) -> - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), declare_queue(Ch, Config, QName), publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), subscribe(Ch, QName, false), + erlang:monitor(process, Conn), + erlang:monitor(process, Ch), receive - {#'basic.deliver'{delivery_tag = _, - redelivered = false}, _} -> - %% do nothing with the delivery should trigger timeout - receive - #'basic.cancel'{ } -> - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - ok - after 20000 -> - flush(1), - exit(cancel_never_happened) - end - after 5000 -> - exit(deliver_timeout) + {'DOWN', _, process, Ch, _} -> ok + after 30000 -> + flush(1), + exit(channel_exit_expected) + end, + receive + {'DOWN', _, process, Conn, _} -> + flush(1), + exit(unexpected_connection_exit) + after 2000 -> + ok end, rabbit_ct_client_helpers:close_channel(Ch), ok. |
