summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile3
-rw-r--r--src/rabbit_channel.erl62
-rw-r--r--test/consumer_timeout_SUITE.erl272
-rw-r--r--test/queue_parallel_SUITE.erl56
-rw-r--r--test/quorum_queue_SUITE.erl2
5 files changed, 307 insertions, 88 deletions
diff --git a/Makefile b/Makefile
index 6620c8359b..9e818a8937 100644
--- a/Makefile
+++ b/Makefile
@@ -126,7 +126,8 @@ define PROJECT_ENV
{vhost_restart_strategy, continue},
%% {global, prefetch count}
{default_consumer_prefetch, {false, 0}},
- {channel_queue_cleanup_interval, 60000},
+ %% interval at which the channel can perform periodic actions
+ {channel_tick_interval, 60000},
%% Default max message size is 128 MB
{max_message_size, 134217728}
]
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a4d3ec6321..b705dbcbdc 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -169,7 +169,7 @@
delivery_flow,
interceptor_state,
queue_states,
- queue_cleanup_timer
+ tick_timer
}).
-define(QUEUE, lqueue).
@@ -489,7 +489,6 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
end,
MaxMessageSize = get_max_message_size(),
ConsumerTimeout = get_consumer_timeout(),
- rabbit_log:info("consumer timeout ~w", [ConsumerTimeout]),
State = #ch{cfg = #conf{state = starting,
protocol = Protocol,
channel = Channel,
@@ -535,7 +534,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
rabbit_event:if_enabled(State2, #ch.stats_timer,
fun() -> emit_stats(State2) end),
put_operation_timeout(),
- State3 = init_queue_cleanup_timer(State2),
+ State3 = init_tick_timer(State2),
{ok, State3, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -828,12 +827,13 @@ handle_info({{Ref, Node}, LateAnswer},
[Channel, LateAnswer, Node]),
noreply(State);
-handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
- consumer_timeout = Timeout},
- queue_states = QueueStates0,
- queue_names = QNames,
- queue_consumers = QCons,
- unacked_message_q = UAMQ}) ->
+handle_info(tick, State0 = #ch{cfg = #conf{channel = Channel,
+ capabilities = Capabilities,
+ consumer_timeout = Timeout},
+ queue_states = QueueStates0,
+ queue_names = QNames,
+ queue_consumers = QCons,
+ unacked_message_q = UAMQ}) ->
QueueStates1 =
maps:filter(fun(_, QS) ->
QName = rabbit_quorum_queue:queue_name(QS),
@@ -845,22 +845,24 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
{value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
when is_integer(Timeout)
andalso Time < Now - Timeout ->
- case ConsumerTag of
- _ when is_integer(ConsumerTag) ->
- %% basic.get - there is no mechanims so we just crash the
- %% channel
+ rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
+ "waiting on ack",
+ [rabbit_data_coercion:to_binary(ConsumerTag),
+ Channel]),
+ SupportsCancel = case rabbit_misc:table_lookup(
+ Capabilities,
+ <<"consumer_cancel_notify">>) of
+ {bool, true} when is_binary(ConsumerTag) ->
+ true;
+ _ -> false
+ end,
+ case SupportsCancel of
+ false ->
Ex = rabbit_misc:amqp_error(precondition_failed,
- "basic.get ack timed out on channel ~w",
+ "consumer ack timed out on channel ~w",
[Channel], none),
handle_exception(Ex, State0);
- % rabbit_misc:protocol_error(precondition_failed,
- % "basic.get ack timed out on channel ~w ",
- % [Channel]);
- _ ->
- rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
- "waiting on ack",
- [rabbit_data_coercion:to_binary(ConsumerTag),
- Channel]),
+ true ->
QRef = qpid_to_ref(QPid),
QName = maps:get(QRef, QNames),
%% cancel the consumer with the client
@@ -881,15 +883,14 @@ handle_info(queue_cleanup, State0 = #ch{cfg = #conf{channel = Channel,
?QUEUE:to_list(UAMQ)),
QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
self(), QueueStates2),
-
State = State1#ch{queue_states = QueueStates,
queue_consumers = maps:remove(QRef, QCons),
unacked_message_q = Rem},
- noreply(init_queue_cleanup_timer(State))
+ noreply(init_tick_timer(State))
end;
_ ->
noreply(
- init_queue_cleanup_timer(
+ init_tick_timer(
State0#ch{queue_states = QueueStates1}))
end;
handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
@@ -1910,10 +1911,7 @@ cancel_consumer(CTag, QName,
consumer_mapping = CMap}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
- {bool, true} -> ok =
-
- rabbit_log:info("Consumer cancel notify suppoerted ~w", [CTag]),
- send(#'basic.cancel'{consumer_tag = CTag,
+ {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
nowait = true}, State);
_ -> ok
end,
@@ -2692,9 +2690,9 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
State1 = track_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
-init_queue_cleanup_timer(State) ->
- {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval),
- State#ch{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}.
+init_tick_timer(State) ->
+ {ok, Interval} = application:get_env(rabbit, channel_tick_interval),
+ State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}.
%% only classic queues need monitoring so rather than special casing
%% everywhere monitors are set up we wrap it here for this module
diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl
new file mode 100644
index 0000000000..8817b93c03
--- /dev/null
+++ b/test/consumer_timeout_SUITE.erl
@@ -0,0 +1,272 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
+%%
+%%
+-module(consumer_timeout_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(TIMEOUT, 30000).
+
+-import(quorum_queue_utils, [wait_for_messages/2]).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ AllTests = [consumer_timeout,
+ consumer_timeout_basic_get,
+ consumer_timeout_no_basic_cancel_capability
+ ],
+ [
+ {parallel_tests, [],
+ [
+ {classic_queue, [parallel], AllTests},
+ {mirrored_queue, [parallel], AllTests},
+ {quorum_queue, [parallel], AllTests}
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(classic_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]);
+init_per_group(quorum_queue, Config) ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ Skip ->
+ Skip
+ end;
+init_per_group(mirrored_queue, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(Group, Config0) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{channel_tick_interval, 1000},
+ {quorum_tick_interval, 1000},
+ {consumer_timeout, 5000}]}),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config0, [])
+ end.
+
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q},
+ {queue_name_2, Q2}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+consumer_timeout(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ redelivered = false}, _} ->
+ %% do nothing with the delivery should trigger timeout
+ receive
+ #'basic.cancel'{ } ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ ok
+ after 20000 ->
+ flush(1),
+ exit(cancel_never_happened)
+ end
+ after 5000 ->
+ exit(deliver_timeout)
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
+
+consumer_timeout_basic_get(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_DelTag] = consume(Ch, QName, [<<"msg1">>]),
+ erlang:monitor(process, Conn),
+ erlang:monitor(process, Ch),
+ receive
+ {'DOWN', _, process, Ch, _} -> ok
+ after 30000 ->
+ flush(1),
+ exit(channel_exit_expected)
+ end,
+ receive
+ {'DOWN', _, process, Conn, _} ->
+ flush(1),
+ exit(unexpected_connection_exit)
+ after 2000 ->
+ ok
+ end,
+ ok.
+
+
+-define(CLIENT_CAPABILITIES,
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, false},
+ {<<"connection.blocked">>, bool, true},
+ {<<"authentication_failure_close">>, bool, true}]).
+
+consumer_timeout_no_basic_cancel_capability(Config) ->
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ Props = [{<<"capabilities">>, table, ?CLIENT_CAPABILITIES}],
+ AmqpParams = #amqp_params_network{port = Port,
+ host = "localhost",
+ virtual_host = <<"/">>,
+ client_properties = Props
+ },
+ {ok, Conn} = amqp_connection:start(AmqpParams),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ erlang:monitor(process, Conn),
+ erlang:monitor(process, Ch),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ redelivered = false}, _} ->
+ %% do nothing with the delivery should trigger timeout
+ ok
+ after 5000 ->
+ exit(deliver_timeout)
+ end,
+ receive
+ {'DOWN', _, process, Ch, _} -> ok
+ after 30000 ->
+ flush(1),
+ exit(channel_exit_expected)
+ end,
+ receive
+ {'DOWN', _, process, Conn, _} ->
+ flush(1),
+ exit(unexpected_connection_exit)
+ after 2000 ->
+ ok
+ end,
+ ok.
+%%%%%%%%%%%%%%%%%%%%%%%%
+%% Test helpers
+%%%%%%%%%%%%%%%%%%%%%%%%
+
+declare_queue(Ch, Config, QName) ->
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = Durable}).
+publish(Ch, QName, Payloads) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
+ || Payload <- Payloads].
+
+consume(Ch, QName, Payloads) ->
+ consume(Ch, QName, false, Payloads).
+
+consume(Ch, QName, NoAck, Payloads) ->
+ [begin
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName,
+ no_ack = NoAck}),
+ DTag
+ end || Payload <- Payloads].
+
+subscribe(Ch, Queue, NoAck) ->
+ subscribe(Ch, Queue, NoAck, <<"ctag">>).
+
+subscribe(Ch, Queue, NoAck, Ctag) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = Ctag},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = Ctag} ->
+ ok
+ end.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index 00d597c75a..632a314d21 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -52,8 +52,6 @@ groups() ->
consume_and_nack,
consume_and_requeue_multiple_nack,
consume_and_multiple_nack,
- consumer_timeout,
- consumer_timeout_basic_get,
basic_cancel,
purge,
basic_recover,
@@ -136,9 +134,8 @@ init_per_group(Group, Config0) ->
true ->
ClusterSize = 2,
Config = rabbit_ct_helpers:merge_app_env(
- Config0, {rabbit, [{channel_queue_cleanup_interval, 1000},
- {quorum_tick_interval, 1000},
- {consumer_timeout, 15000}]}),
+ Config0, {rabbit, [{channel_tick_interval, 1000},
+ {quorum_tick_interval, 1000}]}),
Config1 = rabbit_ct_helpers:set_config(
Config, [ {rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
@@ -425,55 +422,6 @@ consume_and_multiple_nack(Config) ->
rabbit_ct_client_helpers:close_channel(Ch),
ok.
-consumer_timeout(Config) ->
- {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = ?config(queue_name, Config),
- declare_queue(Ch, Config, QName),
- publish(Ch, QName, [<<"msg1">>]),
- wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- subscribe(Ch, QName, false),
- receive
- {#'basic.deliver'{delivery_tag = _,
- redelivered = false}, _} ->
- %% do nothing with the delivery should trigger timeout
- receive
- #'basic.cancel'{ } ->
- wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- ok
- after 30000 ->
- flush(1),
- exit(cancel_never_happened)
- end
- after 5000 ->
- exit(deliver_timeout)
- end,
- rabbit_ct_client_helpers:close_channel(Ch),
- ok.
-
-consumer_timeout_basic_get(Config) ->
- {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- QName = ?config(queue_name, Config),
- declare_queue(Ch, Config, QName),
- publish(Ch, QName, [<<"msg1">>]),
- wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- [_DelTag] = consume(Ch, QName, [<<"msg1">>]),
- erlang:monitor(process, Conn),
- erlang:monitor(process, Ch),
- receive
- {'DOWN', _, process, Ch, _} -> ok
- after 30000 ->
- flush(1),
- exit(channel_exit_expected)
- end,
- receive
- {'DOWN', _, process, Conn, _} ->
- flush(1),
- exit(unexpected_connection_exit)
- after 2000 ->
- ok
- end,
- ok.
-
subscribe_and_requeue_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 61f9328855..c23b7ac85e 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -172,7 +172,7 @@ init_per_group(Group, Config) ->
ok ->
ok = rabbit_ct_broker_helpers:rpc(
Config2, 0, application, set_env,
- [rabbit, channel_queue_cleanup_interval, 100]),
+ [rabbit, channel_tick_interval, 100]),
%% HACK: the larger cluster sizes benefit for a bit more time
%% after clustering before running the tests.
case Group of