diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-04-25 21:10:52 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-04-25 21:10:52 +0300 |
| commit | 99d0adebb332fa45e9507e7b8d5a73bd4ab34338 (patch) | |
| tree | bf4e41bbf82592e5114901362a667d3fe7166894 /test | |
| parent | d279ccfee32764ad76517fb101015fa7ec77d9f5 (diff) | |
| parent | e237612ae74990aff612a79b617d1dad438aaf51 (diff) | |
| download | rabbitmq-server-git-99d0adebb332fa45e9507e7b8d5a73bd4ab34338.tar.gz | |
Merge branch 'master' into rabbitmq-erlang-client-91
Diffstat (limited to 'test')
| -rw-r--r-- | test/consumer_timeout_SUITE.erl | 272 | ||||
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 115 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_inbroker_non_parallel_SUITE.erl | 98 |
5 files changed, 415 insertions, 74 deletions
diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl new file mode 100644 index 0000000000..8817b93c03 --- /dev/null +++ b/test/consumer_timeout_SUITE.erl @@ -0,0 +1,272 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved. +%% +%% +-module(consumer_timeout_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-define(TIMEOUT, 30000). + +-import(quorum_queue_utils, [wait_for_messages/2]). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + AllTests = [consumer_timeout, + consumer_timeout_basic_get, + consumer_timeout_no_basic_cancel_capability + ], + [ + {parallel_tests, [], + [ + {classic_queue, [parallel], AllTests}, + {mirrored_queue, [parallel], AllTests}, + {quorum_queue, [parallel], AllTests} + ]} + ]. + +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]); +init_per_group(quorum_queue, Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of + ok -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); + Skip -> + Skip + end; +init_per_group(mirrored_queue, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]), + rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(Group, Config0) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{channel_tick_interval, 1000}, + {quorum_tick_interval, 1000}, + {consumer_timeout, 5000}]}), + Config1 = rabbit_ct_helpers:set_config( + Config, [ {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); + false -> + rabbit_ct_helpers:run_steps(Config0, []) + end. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +init_per_testcase(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}, + {queue_name_2, Q2}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +consumer_timeout(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = _, + redelivered = false}, _} -> + %% do nothing with the delivery should trigger timeout + receive + #'basic.cancel'{ } -> + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + ok + after 20000 -> + flush(1), + exit(cancel_never_happened) + end + after 5000 -> + exit(deliver_timeout) + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. + +consumer_timeout_basic_get(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [_DelTag] = consume(Ch, QName, [<<"msg1">>]), + erlang:monitor(process, Conn), + erlang:monitor(process, Ch), + receive + {'DOWN', _, process, Ch, _} -> ok + after 30000 -> + flush(1), + exit(channel_exit_expected) + end, + receive + {'DOWN', _, process, Conn, _} -> + flush(1), + exit(unexpected_connection_exit) + after 2000 -> + ok + end, + ok. + + +-define(CLIENT_CAPABILITIES, + [{<<"publisher_confirms">>, bool, true}, + {<<"exchange_exchange_bindings">>, bool, true}, + {<<"basic.nack">>, bool, true}, + {<<"consumer_cancel_notify">>, bool, false}, + {<<"connection.blocked">>, bool, true}, + {<<"authentication_failure_close">>, bool, true}]). + +consumer_timeout_no_basic_cancel_capability(Config) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Props = [{<<"capabilities">>, table, ?CLIENT_CAPABILITIES}], + AmqpParams = #amqp_params_network{port = Port, + host = "localhost", + virtual_host = <<"/">>, + client_properties = Props + }, + {ok, Conn} = amqp_connection:start(AmqpParams), + {ok, Ch} = amqp_connection:open_channel(Conn), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + erlang:monitor(process, Conn), + erlang:monitor(process, Ch), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = _, + redelivered = false}, _} -> + %% do nothing with the delivery should trigger timeout + ok + after 5000 -> + exit(deliver_timeout) + end, + receive + {'DOWN', _, process, Ch, _} -> ok + after 30000 -> + flush(1), + exit(channel_exit_expected) + end, + receive + {'DOWN', _, process, Conn, _} -> + flush(1), + exit(unexpected_connection_exit) + after 2000 -> + ok + end, + ok. +%%%%%%%%%%%%%%%%%%%%%%%% +%% Test helpers +%%%%%%%%%%%%%%%%%%%%%%%% + +declare_queue(Ch, Config, QName) -> + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = Durable}). +publish(Ch, QName, Payloads) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + || Payload <- Payloads]. + +consume(Ch, QName, Payloads) -> + consume(Ch, QName, false, Payloads). + +consume(Ch, QName, NoAck, Payloads) -> + [begin + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName, + no_ack = NoAck}), + DTag + end || Payload <- Payloads]. + +subscribe(Ch, Queue, NoAck) -> + subscribe(Ch, Queue, NoAck, <<"ctag">>). + +subscribe(Ch, Queue, NoAck, Ctag) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = Ctag}, + self()), + receive + #'basic.consume_ok'{consumer_tag = Ctag} -> + ok + end. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index eba0965608..632a314d21 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -58,7 +58,7 @@ groups() -> delete_immediately_by_resource ], [ - {parallel_tests, [], + {parallel_tests, [], [ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, @@ -129,19 +129,22 @@ init_per_group(mirrored_queue, Config) -> {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]), rabbit_ct_helpers:run_steps(Config1, []); -init_per_group(Group, Config) -> +init_per_group(Group, Config0) -> case lists:member({group, Group}, all()) of true -> ClusterSize = 2, - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group}, - {rmq_nodes_count, ClusterSize} - ]), + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{channel_tick_interval, 1000}, + {quorum_tick_interval, 1000}]}), + Config1 = rabbit_ct_helpers:set_config( + Config, [ {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()); + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); false -> - rabbit_ct_helpers:run_steps(Config, []) + rabbit_ct_helpers:run_steps(Config0, []) end. end_per_group(Group, Config) -> @@ -193,7 +196,7 @@ consume_first_empty(Config) -> consume_empty(Ch, QName), publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - consume(Ch, QName, [<<"msg1">>]), + consume(Ch, QName, true, [<<"msg1">>]), rabbit_ct_client_helpers:close_channel(Ch). consume_from_empty_queue(Config) -> @@ -268,7 +271,9 @@ consume_and_ack(Config) -> [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_multiple_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -281,7 +286,9 @@ consume_and_multiple_ack(Config) -> wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -296,7 +303,9 @@ subscribe_and_ack(Config) -> wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_multiple_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -314,7 +323,9 @@ subscribe_and_multiple_ack(Config) -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_requeue_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -343,7 +354,9 @@ subscribe_and_requeue_multiple_nack(Config) -> multiple = true}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) end - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_requeue_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -357,7 +370,9 @@ consume_and_requeue_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -371,7 +386,9 @@ consume_and_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_requeue_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -385,7 +402,9 @@ consume_and_requeue_multiple_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = true}), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -399,7 +418,9 @@ consume_and_multiple_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_requeue_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -423,7 +444,9 @@ subscribe_and_requeue_nack(Config) -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) end - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -441,7 +464,9 @@ subscribe_and_nack(Config) -> multiple = false, requeue = false}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -461,7 +486,9 @@ subscribe_and_multiple_nack(Config) -> multiple = true, requeue = false}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. %% TODO test with single active basic_cancel(Config) -> @@ -472,11 +499,12 @@ basic_cancel(Config) -> publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - subscribe(Ch, QName, false), + CTag = atom_to_binary(?FUNCTION_NAME, utf8), + subscribe(Ch, QName, false, CTag), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), - amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), ?assertEqual([], lists:filter(fun(Props) -> @@ -489,7 +517,9 @@ basic_cancel(Config) -> wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) after 5000 -> exit(basic_deliver_timeout) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. purge(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -501,7 +531,9 @@ purge(Config) -> [_] = consume(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]). + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. basic_recover(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -513,7 +545,9 @@ basic_recover(Config) -> [_] = consume(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. delete_immediately_by_pid_fails(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -531,7 +565,9 @@ delete_immediately_by_pid_fails(Config) -> durable = Durable, passive = true, auto_delete = false, - arguments = Args})). + arguments = Args})), + rabbit_ct_client_helpers:close_channel(Ch), + ok. delete_immediately_by_pid_succeeds(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -550,7 +586,9 @@ delete_immediately_by_pid_succeeds(Config) -> durable = Durable, passive = true, auto_delete = false, - arguments = Args})). + arguments = Args})), + rabbit_ct_client_helpers:close_channel(Ch), + ok. delete_immediately_by_resource(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -568,7 +606,9 @@ delete_immediately_by_resource(Config) -> durable = Durable, passive = true, auto_delete = false, - arguments = Args})). + arguments = Args})), + rabbit_ct_client_helpers:close_channel(Ch), + ok. %%%%%%%%%%%%%%%%%%%%%%%% %% Test helpers @@ -600,12 +640,15 @@ consume_empty(Ch, QName) -> amqp_channel:call(Ch, #'basic.get'{queue = QName})). subscribe(Ch, Queue, NoAck) -> + subscribe(Ch, Queue, NoAck, <<"ctag">>). + +subscribe(Ch, Queue, NoAck, Ctag) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, no_ack = NoAck, - consumer_tag = <<"ctag">>}, + consumer_tag = Ctag}, self()), receive - #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + #'basic.consume_ok'{consumer_tag = Ctag} -> ok end. @@ -614,3 +657,11 @@ receive_basic_deliver(Redelivered) -> {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> ok end. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 61f9328855..c23b7ac85e 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -172,7 +172,7 @@ init_per_group(Group, Config) -> ok -> ok = rabbit_ct_broker_helpers:rpc( Config2, 0, application, set_env, - [rabbit, channel_queue_cleanup_interval, 100]), + [rabbit, channel_tick_interval, 100]), %% HACK: the larger cluster sizes benefit for a bit more time %% after clustering before running the tests. case Group of diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index ea6e973ca2..c44b799caa 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -177,6 +177,8 @@ channel_metrics(Config) -> amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}), amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"queue_metrics">>}, #amqp_msg{payload = <<"hello">>}), + amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"won't route $ยข% anywhere">>}, + #amqp_msg{payload = <<"hello">>}), {#'basic.get_ok'{}, _} = amqp_channel:call(Ch, #'basic.get'{queue = <<"queue_metrics">>, no_ack=true}), timer:sleep(150), diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index 866d529489..ed64fcf1c5 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -39,7 +39,8 @@ groups() -> file_handle_cache, %% Change FHC limit. head_message_timestamp_statistics, %% Expect specific statistics. log_management, %% Check log files. - log_management_during_startup, %% Check log files. + log_file_initialised_during_startup, + log_file_fails_to_initialise_during_startup, externally_rotated_logs_are_automatically_reopened %% Check log files. ]} ]. @@ -271,11 +272,11 @@ log_management1(_Config) -> ok = test_logs_working([LogFile]), passed. -log_management_during_startup(Config) -> +log_file_initialised_during_startup(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, log_management_during_startup1, [Config]). + ?MODULE, log_file_initialised_during_startup1, [Config]). -log_management_during_startup1(_Config) -> +log_file_initialised_during_startup1(_Config) -> [LogFile|_] = rabbit:log_locations(), Suffix = ".0", @@ -299,57 +300,72 @@ log_management_during_startup1(_Config) -> application:unset_env(lager, extra_sinks), ok = rabbit:start(), - %% start application with logging to directory with no - %% write permissions - ok = rabbit:stop(), - NoPermission1 = "/var/empty/test.log", - delete_file(NoPermission1), - delete_file(filename:dirname(NoPermission1)), - ok = rabbit:stop(), - ok = application:set_env(rabbit, lager_default_file, NoPermission1), + %% clean up + ok = application:set_env(rabbit, lager_default_file, LogFile), application:unset_env(rabbit, log), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = try rabbit:start() of + ok = rabbit:start(), + passed. + + +log_file_fails_to_initialise_during_startup(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, log_file_fails_to_initialise_during_startup1, [Config]). + +log_file_fails_to_initialise_during_startup1(_Config) -> + [LogFile|_] = rabbit:log_locations(), + Suffix = ".0", + + %% start application with logging to directory with no + %% write permissions + ok = rabbit:stop(), + + Run1 = fun() -> + NoPermission1 = "/var/empty/test.log", + delete_file(NoPermission1), + delete_file(filename:dirname(NoPermission1)), + ok = rabbit:stop(), + ok = application:set_env(rabbit, lager_default_file, NoPermission1), + application:unset_env(rabbit, log), + application:unset_env(lager, handlers), + application:unset_env(lager, extra_sinks), + rabbit:start() + end, + + ok = try Run1() of ok -> exit({got_success_but_expected_failure, log_rotation_no_write_permission_dir_test}) catch - _:{error, {cannot_log_to_file, _, Reason1}} - when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok; - _:{error, {cannot_log_to_file, _, - {cannot_create_parent_dirs, _, Reason1}}} - when Reason1 =:= eperm orelse - Reason1 =:= eacces orelse - Reason1 =:= enoent-> ok + _:could_not_initialise_logger -> ok end, %% start application with logging to a subdirectory which %% parent directory has no write permissions NoPermission2 = "/var/empty/non-existent/test.log", - delete_file(NoPermission2), - delete_file(filename:dirname(NoPermission2)), - case rabbit:stop() of - ok -> ok; - {error, lager_not_running} -> ok + + Run2 = fun() -> + delete_file(NoPermission2), + delete_file(filename:dirname(NoPermission2)), + case rabbit:stop() of + ok -> ok; + {error, lager_not_running} -> ok + end, + ok = application:set_env(rabbit, lager_default_file, NoPermission2), + application:unset_env(rabbit, log), + application:unset_env(lager, handlers), + application:unset_env(lager, extra_sinks), + rabbit:start() end, - ok = application:set_env(rabbit, lager_default_file, NoPermission2), - application:unset_env(rabbit, log), - application:unset_env(lager, handlers), - application:unset_env(lager, extra_sinks), - ok = try rabbit:start() of + + ok = try Run2() of ok -> exit({got_success_but_expected_failure, log_rotation_parent_dirs_test}) catch - _:{error, {cannot_log_to_file, _, Reason2}} - when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok; - _:{error, {cannot_log_to_file, _, - {cannot_create_parent_dirs, _, Reason2}}} - when Reason2 =:= eperm orelse - Reason2 =:= eacces orelse - Reason2 =:= enoent-> ok + _:could_not_initialise_logger -> ok end, - %% cleanup + %% clean up ok = application:set_env(rabbit, lager_default_file, LogFile), application:unset_env(rabbit, log), application:unset_env(lager, handlers), @@ -494,7 +510,7 @@ channel_statistics1(_Config) -> [{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 0}] = ets:lookup( channel_queue_metrics, {Ch, QRes}), - [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup( + [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup( channel_exchange_metrics, {Ch, X}), [{{Ch, {QRes, X}}, 1, 0}] = ets:lookup( @@ -509,7 +525,7 @@ channel_statistics1(_Config) -> [{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 1}] = ets:lookup( channel_queue_metrics, {Ch, QRes}), - [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup( + [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup( channel_exchange_metrics, {Ch, X}), [{{Ch, {QRes, X}}, 1, 1}] = ets:lookup( @@ -522,7 +538,7 @@ channel_statistics1(_Config) -> force_metric_gc(), Check4 = fun() -> [] = ets:lookup(channel_queue_metrics, {Ch, QRes}), - [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup( + [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup( channel_exchange_metrics, {Ch, X}), [] = ets:lookup(channel_queue_exchange_metrics, |
