diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2018-12-04 12:31:50 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2018-12-04 12:31:50 +0000 |
| commit | 36e5a0fa2164b810ee11edd5a2d078083c4d0575 (patch) | |
| tree | d2118f8f9925fdc1ac88f0577d73ab964e0f6c18 /test | |
| parent | adee8467f4e925ae472b17cf2fa0a0df0f2978ca (diff) | |
| parent | 812706707f300b735f22a56b7ff713cd20f4c7b9 (diff) | |
| download | rabbitmq-server-git-36e5a0fa2164b810ee11edd5a2d078083c4d0575.tar.gz | |
Merge branch 'master' into dialyze-qq
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 213 |
1 files changed, 200 insertions, 13 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 531b645774..14a3650a87 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -54,12 +54,16 @@ groups() -> delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority - ]}, + consume_in_minority]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, quorum_cluster_size_3, quorum_cluster_size_7 + ]}, + {clustered_with_partitions, [], [ + reconnect_consumer_and_publish, + reconnect_consumer_and_wait, + reconnect_consumer_and_wait_channel_down ]} ]} ]. @@ -100,6 +104,7 @@ all_tests() -> dead_letter_to_classic_queue, dead_letter_to_quorum_queue, dead_letter_from_classic_to_quorum_queue, + dead_letter_policy, cleanup_queue_state_on_channel_after_publish, cleanup_queue_state_on_channel_after_subscribe, basic_cancel, @@ -119,7 +124,9 @@ all_tests() -> init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). + rabbit_ct_helpers:run_setup_steps( + Config, + [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). @@ -128,6 +135,8 @@ init_per_group(clustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]); init_per_group(unclustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]); +init_per_group(clustered_with_partitions, Config) -> + rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]); init_per_group(Group, Config) -> ClusterSize = case Group of single_node -> 1; @@ -139,7 +148,8 @@ init_per_group(Group, Config) -> [{rmq_nodes_count, ClusterSize}, {rmq_nodename_suffix, Group}, {tcp_ports_base}]), - Config2 = rabbit_ct_helpers:run_steps(Config1, + Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), + Config2 = rabbit_ct_helpers:run_steps(Config1b, [fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()), ok = rabbit_ct_broker_helpers:rpc( @@ -159,10 +169,28 @@ end_per_group(clustered, Config) -> Config; end_per_group(unclustered, Config) -> Config; +end_per_group(clustered_with_partitions, Config) -> + Config; end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). +init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{rmq_nodes_count, 3}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base}, + {queue_name, Q} + ]), + rabbit_ct_helpers:run_steps(Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1]); init_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -170,12 +198,18 @@ init_per_testcase(Testcase, Config) -> Config2 = rabbit_ct_helpers:set_config(Config1, [{queue_name, Q} ]), - rabbit_ct_helpers:run_steps(Config2, - rabbit_ct_client_helpers:setup_steps()). + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). merge_app_env(Config) -> rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}). +end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase); end_per_testcase(Testcase, Config) -> catch delete_queues(), Config1 = rabbit_ct_helpers:run_steps( @@ -1066,22 +1100,47 @@ dead_letter_to_classic_queue(Config) -> {<<"x-dead-letter-routing-key">>, longstr, CQ} ])), ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), - RaName = ra_name(QQ), - publish(Ch, QQ), + test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ). + +test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) -> + publish(Ch, Source), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0), - wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), - DeliveryTag = consume(Ch, QQ, false), + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]), + DeliveryTag = consume(Ch, Source, false), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 1), - wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 0), - wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]), - _ = consume(Ch, CQ, false). + case PolicySet of + true -> + wait_for_messages(Config, [[Destination, <<"1">>, <<"1">>, <<"0">>]]), + _ = consume(Ch, Destination, true); + false -> + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]) + end. + +dead_letter_policy(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + CQ = <<"classic-dead_letter_policy">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"dlx">>, <<"dead_letter.*">>, <<"queues">>, + [{<<"dead-letter-exchange">>, <<"">>}, + {<<"dead-letter-routing-key">>, CQ}]), + RaName = ra_name(QQ), + test_dead_lettering(true, Config, Ch, Servers, RaName, QQ, CQ), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>), + test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ). dead_letter_to_quorum_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1589,6 +1648,7 @@ delete_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). + cleanup_data_dir(Config) -> %% This test is slow, but also checks that we handle properly errors when %% trying to delete a queue in minority. A case clause there had gone @@ -1610,6 +1670,7 @@ cleanup_data_dir(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), + catch amqp_channel:call(Ch, #'queue.delete'{queue = QQ}), ?assert(filelib:is_dir(DataDir)), ?assertEqual(ok, @@ -1617,6 +1678,132 @@ cleanup_data_dir(Config) -> [])), ?assert(not filelib:is_dir(DataDir)). +reconnect_consumer_and_publish(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 2), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait_channel_down(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_client_helpers:close_channel(ChF), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + %% Let's give it a few seconds to ensure it doesn't attempt to + %% deliver to the down channel - it shouldn't be monitored + %% at this time! + timer:sleep(5000), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0). + basic_recover(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
