summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-05-01 14:00:51 +0100
committerkjnilsson <knilsson@pivotal.io>2019-05-01 14:11:05 +0100
commit9933cc44876cd5b277a0ecf3c7a2e31f636384c7 (patch)
treec1b78c4298d4c9bdb1699b0f9d9126717550143e
parent45402b9454a12f6b1a9be3ff1827ed4ee74da7d1 (diff)
downloadrabbitmq-server-git-9933cc44876cd5b277a0ecf3c7a2e31f636384c7.tar.gz
Always close channel on consumer timeout
In order to keep things simple and consistent this changes consumer timeouts to always close the channel instead of cancelling and returning.
-rw-r--r--src/rabbit_channel.erl47
-rw-r--r--test/consumer_timeout_SUITE.erl28
2 files changed, 19 insertions, 56 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 69add85057..048254f5e9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2712,58 +2712,21 @@ queue_fold(Fun, Init, Q) ->
end.
evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
- capabilities = Capabilities,
consumer_timeout = Timeout},
- queue_names = QNames,
- queue_consumers = QCons,
unacked_message_q = UAMQ}) ->
Now = os:system_time(millisecond),
case ?QUEUE:peek(UAMQ) of
- {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
+ {value, {_DTag, ConsumerTag, Time, {_QPid, _Msg}}}
when is_integer(Timeout)
andalso Time < Now - Timeout ->
rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
"waiting on ack",
[rabbit_data_coercion:to_binary(ConsumerTag),
Channel]),
- SupportsCancel = case rabbit_misc:table_lookup(
- Capabilities,
- <<"consumer_cancel_notify">>) of
- {bool, true} when is_binary(ConsumerTag) ->
- true;
- _ -> false
- end,
- case SupportsCancel of
- false ->
- Ex = rabbit_misc:amqp_error(precondition_failed,
- "consumer ack timed out on channel ~w",
- [Channel], none),
- handle_exception(Ex, State0);
- true ->
- QRef = qpid_to_ref(QPid),
- QName = maps:get(QRef, QNames),
- %% cancel the consumer with the client
- State2 = cancel_consumer(ConsumerTag, QName, State0),
- [Q] = rabbit_amqqueue:lookup([QName]),
- %% send basic cancel to the queue
- {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag, undefined,
- <<"broker">>, State2#ch.queue_states),
- %% return all in-flight messages for the consumer
- {MsgIds, Rem} = lists:foldl(
- fun({_DelTag, ConTag, _Time, {_, MsgId}},
- {Ids, Rem}) when ConTag == ConsumerTag ->
- {[MsgId | Ids], Rem};
- (Unacked, {Ids, Rem}) ->
- {Ids, ?QUEUE:in(Unacked, Rem)}
- end, {[], ?QUEUE:new()},
- ?QUEUE:to_list(UAMQ)),
- QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
- self(), QueueStates2),
- {noreply, State2#ch{queue_states = QueueStates,
- queue_consumers = maps:remove(QRef, QCons),
- unacked_message_q = Rem}}
- end;
+ Ex = rabbit_misc:amqp_error(precondition_failed,
+ "consumer ack timed out on channel ~w",
+ [Channel], none),
+ handle_exception(Ex, State0);
_ ->
{noreply, State0}
end.
diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl
index 8817b93c03..b1b55879cc 100644
--- a/test/consumer_timeout_SUITE.erl
+++ b/test/consumer_timeout_SUITE.erl
@@ -129,26 +129,26 @@ end_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_finished(Config, Testcase).
consumer_timeout(Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
declare_queue(Ch, Config, QName),
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
subscribe(Ch, QName, false),
+ erlang:monitor(process, Conn),
+ erlang:monitor(process, Ch),
receive
- {#'basic.deliver'{delivery_tag = _,
- redelivered = false}, _} ->
- %% do nothing with the delivery should trigger timeout
- receive
- #'basic.cancel'{ } ->
- wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- ok
- after 20000 ->
- flush(1),
- exit(cancel_never_happened)
- end
- after 5000 ->
- exit(deliver_timeout)
+ {'DOWN', _, process, Ch, _} -> ok
+ after 30000 ->
+ flush(1),
+ exit(channel_exit_expected)
+ end,
+ receive
+ {'DOWN', _, process, Conn, _} ->
+ flush(1),
+ exit(unexpected_connection_exit)
+ after 2000 ->
+ ok
end,
rabbit_ct_client_helpers:close_channel(Ch),
ok.