summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/consumer_timeout_SUITE.erl272
-rw-r--r--test/queue_parallel_SUITE.erl115
-rw-r--r--test/quorum_queue_SUITE.erl2
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl2
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl98
5 files changed, 415 insertions, 74 deletions
diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl
new file mode 100644
index 0000000000..8817b93c03
--- /dev/null
+++ b/test/consumer_timeout_SUITE.erl
@@ -0,0 +1,272 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
+%%
+%%
+-module(consumer_timeout_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(TIMEOUT, 30000).
+
+-import(quorum_queue_utils, [wait_for_messages/2]).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ AllTests = [consumer_timeout,
+ consumer_timeout_basic_get,
+ consumer_timeout_no_basic_cancel_capability
+ ],
+ [
+ {parallel_tests, [],
+ [
+ {classic_queue, [parallel], AllTests},
+ {mirrored_queue, [parallel], AllTests},
+ {quorum_queue, [parallel], AllTests}
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(classic_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]);
+init_per_group(quorum_queue, Config) ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ Skip ->
+ Skip
+ end;
+init_per_group(mirrored_queue, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(Group, Config0) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{channel_tick_interval, 1000},
+ {quorum_tick_interval, 1000},
+ {consumer_timeout, 5000}]}),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config0, [])
+ end.
+
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q},
+ {queue_name_2, Q2}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+consumer_timeout(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ redelivered = false}, _} ->
+ %% do nothing with the delivery should trigger timeout
+ receive
+ #'basic.cancel'{ } ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ ok
+ after 20000 ->
+ flush(1),
+ exit(cancel_never_happened)
+ end
+ after 5000 ->
+ exit(deliver_timeout)
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
+
+consumer_timeout_basic_get(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_DelTag] = consume(Ch, QName, [<<"msg1">>]),
+ erlang:monitor(process, Conn),
+ erlang:monitor(process, Ch),
+ receive
+ {'DOWN', _, process, Ch, _} -> ok
+ after 30000 ->
+ flush(1),
+ exit(channel_exit_expected)
+ end,
+ receive
+ {'DOWN', _, process, Conn, _} ->
+ flush(1),
+ exit(unexpected_connection_exit)
+ after 2000 ->
+ ok
+ end,
+ ok.
+
+
+-define(CLIENT_CAPABILITIES,
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, false},
+ {<<"connection.blocked">>, bool, true},
+ {<<"authentication_failure_close">>, bool, true}]).
+
+consumer_timeout_no_basic_cancel_capability(Config) ->
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ Props = [{<<"capabilities">>, table, ?CLIENT_CAPABILITIES}],
+ AmqpParams = #amqp_params_network{port = Port,
+ host = "localhost",
+ virtual_host = <<"/">>,
+ client_properties = Props
+ },
+ {ok, Conn} = amqp_connection:start(AmqpParams),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ erlang:monitor(process, Conn),
+ erlang:monitor(process, Ch),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ redelivered = false}, _} ->
+ %% do nothing with the delivery should trigger timeout
+ ok
+ after 5000 ->
+ exit(deliver_timeout)
+ end,
+ receive
+ {'DOWN', _, process, Ch, _} -> ok
+ after 30000 ->
+ flush(1),
+ exit(channel_exit_expected)
+ end,
+ receive
+ {'DOWN', _, process, Conn, _} ->
+ flush(1),
+ exit(unexpected_connection_exit)
+ after 2000 ->
+ ok
+ end,
+ ok.
+%%%%%%%%%%%%%%%%%%%%%%%%
+%% Test helpers
+%%%%%%%%%%%%%%%%%%%%%%%%
+
+declare_queue(Ch, Config, QName) ->
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = Durable}).
+publish(Ch, QName, Payloads) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
+ || Payload <- Payloads].
+
+consume(Ch, QName, Payloads) ->
+ consume(Ch, QName, false, Payloads).
+
+consume(Ch, QName, NoAck, Payloads) ->
+ [begin
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName,
+ no_ack = NoAck}),
+ DTag
+ end || Payload <- Payloads].
+
+subscribe(Ch, Queue, NoAck) ->
+ subscribe(Ch, Queue, NoAck, <<"ctag">>).
+
+subscribe(Ch, Queue, NoAck, Ctag) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = Ctag},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = Ctag} ->
+ ok
+ end.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index eba0965608..632a314d21 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -58,7 +58,7 @@ groups() ->
delete_immediately_by_resource
],
[
- {parallel_tests, [],
+ {parallel_tests, [],
[
{classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
{mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
@@ -129,19 +129,22 @@ init_per_group(mirrored_queue, Config) ->
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]),
rabbit_ct_helpers:run_steps(Config1, []);
-init_per_group(Group, Config) ->
+init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 2,
- Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodename_suffix, Group},
- {rmq_nodes_count, ClusterSize}
- ]),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{channel_tick_interval, 1000},
+ {quorum_tick_interval, 1000}]}),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps());
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
false ->
- rabbit_ct_helpers:run_steps(Config, [])
+ rabbit_ct_helpers:run_steps(Config0, [])
end.
end_per_group(Group, Config) ->
@@ -193,7 +196,7 @@ consume_first_empty(Config) ->
consume_empty(Ch, QName),
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- consume(Ch, QName, [<<"msg1">>]),
+ consume(Ch, QName, true, [<<"msg1">>]),
rabbit_ct_client_helpers:close_channel(Ch).
consume_from_empty_queue(Config) ->
@@ -268,7 +271,9 @@ consume_and_ack(Config) ->
[DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_multiple_ack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -281,7 +286,9 @@ consume_and_multiple_ack(Config) ->
wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_ack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -296,7 +303,9 @@ subscribe_and_ack(Config) ->
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_multiple_ack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -314,7 +323,9 @@ subscribe_and_multiple_ack(Config) ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_requeue_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -343,7 +354,9 @@ subscribe_and_requeue_multiple_nack(Config) ->
multiple = true}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
end
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_requeue_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -357,7 +370,9 @@ consume_and_requeue_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true}),
- wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -371,7 +386,9 @@ consume_and_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = false}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_requeue_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -385,7 +402,9 @@ consume_and_requeue_multiple_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = true,
requeue = true}),
- wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -399,7 +418,9 @@ consume_and_multiple_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = true,
requeue = false}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_requeue_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -423,7 +444,9 @@ subscribe_and_requeue_nack(Config) ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
end
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -441,7 +464,9 @@ subscribe_and_nack(Config) ->
multiple = false,
requeue = false}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -461,7 +486,9 @@ subscribe_and_multiple_nack(Config) ->
multiple = true,
requeue = false}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
%% TODO test with single active
basic_cancel(Config) ->
@@ -472,11 +499,12 @@ basic_cancel(Config) ->
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- subscribe(Ch, QName, false),
+ CTag = atom_to_binary(?FUNCTION_NAME, utf8),
+ subscribe(Ch, QName, false, CTag),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
- amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
?assertEqual([], lists:filter(fun(Props) ->
@@ -489,7 +517,9 @@ basic_cancel(Config) ->
wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]])
after 5000 ->
exit(basic_deliver_timeout)
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
purge(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -501,7 +531,9 @@ purge(Config) ->
[_] = consume(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
{'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}),
- wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]).
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
basic_recover(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -513,7 +545,9 @@ basic_recover(Config) ->
[_] = consume(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
- wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
delete_immediately_by_pid_fails(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -531,7 +565,9 @@ delete_immediately_by_pid_fails(Config) ->
durable = Durable,
passive = true,
auto_delete = false,
- arguments = Args})).
+ arguments = Args})),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
delete_immediately_by_pid_succeeds(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -550,7 +586,9 @@ delete_immediately_by_pid_succeeds(Config) ->
durable = Durable,
passive = true,
auto_delete = false,
- arguments = Args})).
+ arguments = Args})),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
delete_immediately_by_resource(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -568,7 +606,9 @@ delete_immediately_by_resource(Config) ->
durable = Durable,
passive = true,
auto_delete = false,
- arguments = Args})).
+ arguments = Args})),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
%%%%%%%%%%%%%%%%%%%%%%%%
%% Test helpers
@@ -600,12 +640,15 @@ consume_empty(Ch, QName) ->
amqp_channel:call(Ch, #'basic.get'{queue = QName})).
subscribe(Ch, Queue, NoAck) ->
+ subscribe(Ch, Queue, NoAck, <<"ctag">>).
+
+subscribe(Ch, Queue, NoAck, Ctag) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
no_ack = NoAck,
- consumer_tag = <<"ctag">>},
+ consumer_tag = Ctag},
self()),
receive
- #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ #'basic.consume_ok'{consumer_tag = Ctag} ->
ok
end.
@@ -614,3 +657,11 @@ receive_basic_deliver(Redelivered) ->
{#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
ok
end.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 61f9328855..c23b7ac85e 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -172,7 +172,7 @@ init_per_group(Group, Config) ->
ok ->
ok = rabbit_ct_broker_helpers:rpc(
Config2, 0, application, set_env,
- [rabbit, channel_queue_cleanup_interval, 100]),
+ [rabbit, channel_tick_interval, 100]),
%% HACK: the larger cluster sizes benefit for a bit more time
%% after clustering before running the tests.
case Group of
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index ea6e973ca2..c44b799caa 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -177,6 +177,8 @@ channel_metrics(Config) ->
amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}),
amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"queue_metrics">>},
#amqp_msg{payload = <<"hello">>}),
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"won't route $ยข% anywhere">>},
+ #amqp_msg{payload = <<"hello">>}),
{#'basic.get_ok'{}, _} = amqp_channel:call(Ch, #'basic.get'{queue = <<"queue_metrics">>,
no_ack=true}),
timer:sleep(150),
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index 866d529489..ed64fcf1c5 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -39,7 +39,8 @@ groups() ->
file_handle_cache, %% Change FHC limit.
head_message_timestamp_statistics, %% Expect specific statistics.
log_management, %% Check log files.
- log_management_during_startup, %% Check log files.
+ log_file_initialised_during_startup,
+ log_file_fails_to_initialise_during_startup,
externally_rotated_logs_are_automatically_reopened %% Check log files.
]}
].
@@ -271,11 +272,11 @@ log_management1(_Config) ->
ok = test_logs_working([LogFile]),
passed.
-log_management_during_startup(Config) ->
+log_file_initialised_during_startup(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, log_management_during_startup1, [Config]).
+ ?MODULE, log_file_initialised_during_startup1, [Config]).
-log_management_during_startup1(_Config) ->
+log_file_initialised_during_startup1(_Config) ->
[LogFile|_] = rabbit:log_locations(),
Suffix = ".0",
@@ -299,57 +300,72 @@ log_management_during_startup1(_Config) ->
application:unset_env(lager, extra_sinks),
ok = rabbit:start(),
- %% start application with logging to directory with no
- %% write permissions
- ok = rabbit:stop(),
- NoPermission1 = "/var/empty/test.log",
- delete_file(NoPermission1),
- delete_file(filename:dirname(NoPermission1)),
- ok = rabbit:stop(),
- ok = application:set_env(rabbit, lager_default_file, NoPermission1),
+ %% clean up
+ ok = application:set_env(rabbit, lager_default_file, LogFile),
application:unset_env(rabbit, log),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = try rabbit:start() of
+ ok = rabbit:start(),
+ passed.
+
+
+log_file_fails_to_initialise_during_startup(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, log_file_fails_to_initialise_during_startup1, [Config]).
+
+log_file_fails_to_initialise_during_startup1(_Config) ->
+ [LogFile|_] = rabbit:log_locations(),
+ Suffix = ".0",
+
+ %% start application with logging to directory with no
+ %% write permissions
+ ok = rabbit:stop(),
+
+ Run1 = fun() ->
+ NoPermission1 = "/var/empty/test.log",
+ delete_file(NoPermission1),
+ delete_file(filename:dirname(NoPermission1)),
+ ok = rabbit:stop(),
+ ok = application:set_env(rabbit, lager_default_file, NoPermission1),
+ application:unset_env(rabbit, log),
+ application:unset_env(lager, handlers),
+ application:unset_env(lager, extra_sinks),
+ rabbit:start()
+ end,
+
+ ok = try Run1() of
ok -> exit({got_success_but_expected_failure,
log_rotation_no_write_permission_dir_test})
catch
- _:{error, {cannot_log_to_file, _, Reason1}}
- when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok;
- _:{error, {cannot_log_to_file, _,
- {cannot_create_parent_dirs, _, Reason1}}}
- when Reason1 =:= eperm orelse
- Reason1 =:= eacces orelse
- Reason1 =:= enoent-> ok
+ _:could_not_initialise_logger -> ok
end,
%% start application with logging to a subdirectory which
%% parent directory has no write permissions
NoPermission2 = "/var/empty/non-existent/test.log",
- delete_file(NoPermission2),
- delete_file(filename:dirname(NoPermission2)),
- case rabbit:stop() of
- ok -> ok;
- {error, lager_not_running} -> ok
+
+ Run2 = fun() ->
+ delete_file(NoPermission2),
+ delete_file(filename:dirname(NoPermission2)),
+ case rabbit:stop() of
+ ok -> ok;
+ {error, lager_not_running} -> ok
+ end,
+ ok = application:set_env(rabbit, lager_default_file, NoPermission2),
+ application:unset_env(rabbit, log),
+ application:unset_env(lager, handlers),
+ application:unset_env(lager, extra_sinks),
+ rabbit:start()
end,
- ok = application:set_env(rabbit, lager_default_file, NoPermission2),
- application:unset_env(rabbit, log),
- application:unset_env(lager, handlers),
- application:unset_env(lager, extra_sinks),
- ok = try rabbit:start() of
+
+ ok = try Run2() of
ok -> exit({got_success_but_expected_failure,
log_rotation_parent_dirs_test})
catch
- _:{error, {cannot_log_to_file, _, Reason2}}
- when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok;
- _:{error, {cannot_log_to_file, _,
- {cannot_create_parent_dirs, _, Reason2}}}
- when Reason2 =:= eperm orelse
- Reason2 =:= eacces orelse
- Reason2 =:= enoent-> ok
+ _:could_not_initialise_logger -> ok
end,
- %% cleanup
+ %% clean up
ok = application:set_env(rabbit, lager_default_file, LogFile),
application:unset_env(rabbit, log),
application:unset_env(lager, handlers),
@@ -494,7 +510,7 @@ channel_statistics1(_Config) ->
[{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 0}] = ets:lookup(
channel_queue_metrics,
{Ch, QRes}),
- [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
channel_exchange_metrics,
{Ch, X}),
[{{Ch, {QRes, X}}, 1, 0}] = ets:lookup(
@@ -509,7 +525,7 @@ channel_statistics1(_Config) ->
[{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 1}] = ets:lookup(
channel_queue_metrics,
{Ch, QRes}),
- [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
channel_exchange_metrics,
{Ch, X}),
[{{Ch, {QRes, X}}, 1, 1}] = ets:lookup(
@@ -522,7 +538,7 @@ channel_statistics1(_Config) ->
force_metric_gc(),
Check4 = fun() ->
[] = ets:lookup(channel_queue_metrics, {Ch, QRes}),
- [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
channel_exchange_metrics,
{Ch, X}),
[] = ets:lookup(channel_queue_exchange_metrics,