summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2018-12-04 12:31:50 +0000
committerDiana Corbacho <diana@rabbitmq.com>2018-12-04 12:31:50 +0000
commit36e5a0fa2164b810ee11edd5a2d078083c4d0575 (patch)
treed2118f8f9925fdc1ac88f0577d73ab964e0f6c18 /test
parentadee8467f4e925ae472b17cf2fa0a0df0f2978ca (diff)
parent812706707f300b735f22a56b7ff713cd20f4c7b9 (diff)
downloadrabbitmq-server-git-36e5a0fa2164b810ee11edd5a2d078083c4d0575.tar.gz
Merge branch 'master' into dialyze-qq
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl213
1 files changed, 200 insertions, 13 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 531b645774..14a3650a87 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -54,12 +54,16 @@ groups() ->
delete_declare,
metrics_cleanup_on_leadership_takeover,
metrics_cleanup_on_leader_crash,
- consume_in_minority
- ]},
+ consume_in_minority]},
{cluster_size_5, [], [start_queue,
start_queue_concurrent,
quorum_cluster_size_3,
quorum_cluster_size_7
+ ]},
+ {clustered_with_partitions, [], [
+ reconnect_consumer_and_publish,
+ reconnect_consumer_and_wait,
+ reconnect_consumer_and_wait_channel_down
]}
]}
].
@@ -100,6 +104,7 @@ all_tests() ->
dead_letter_to_classic_queue,
dead_letter_to_quorum_queue,
dead_letter_from_classic_to_quorum_queue,
+ dead_letter_policy,
cleanup_queue_state_on_channel_after_publish,
cleanup_queue_state_on_channel_after_subscribe,
basic_cancel,
@@ -119,7 +124,9 @@ all_tests() ->
init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
- rabbit_ct_helpers:run_setup_steps(Config).
+ rabbit_ct_helpers:run_setup_steps(
+ Config,
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]).
end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).
@@ -128,6 +135,8 @@ init_per_group(clustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
init_per_group(unclustered, Config) ->
rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
+init_per_group(clustered_with_partitions, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]);
init_per_group(Group, Config) ->
ClusterSize = case Group of
single_node -> 1;
@@ -139,7 +148,8 @@ init_per_group(Group, Config) ->
[{rmq_nodes_count, ClusterSize},
{rmq_nodename_suffix, Group},
{tcp_ports_base}]),
- Config2 = rabbit_ct_helpers:run_steps(Config1,
+ Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]),
+ Config2 = rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps()),
ok = rabbit_ct_broker_helpers:rpc(
@@ -159,10 +169,28 @@ end_per_group(clustered, Config) ->
Config;
end_per_group(unclustered, Config) ->
Config;
+end_per_group(clustered_with_partitions, Config) ->
+ Config;
end_per_group(_, Config) ->
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_broker_helpers:teardown_steps()).
+init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
+ Testcase == reconnect_consumer_and_wait;
+ Testcase == reconnect_consumer_and_wait_channel_down ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1,
+ [{rmq_nodes_count, 3},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base},
+ {queue_name, Q}
+ ]),
+ rabbit_ct_helpers:run_steps(Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]);
init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
@@ -170,12 +198,18 @@ init_per_testcase(Testcase, Config) ->
Config2 = rabbit_ct_helpers:set_config(Config1,
[{queue_name, Q}
]),
- rabbit_ct_helpers:run_steps(Config2,
- rabbit_ct_client_helpers:setup_steps()).
+ rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()).
merge_app_env(Config) ->
rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}).
+end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish;
+ Testcase == reconnect_consumer_and_wait;
+ Testcase == reconnect_consumer_and_wait_channel_down ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase);
end_per_testcase(Testcase, Config) ->
catch delete_queues(),
Config1 = rabbit_ct_helpers:run_steps(
@@ -1066,22 +1100,47 @@ dead_letter_to_classic_queue(Config) ->
{<<"x-dead-letter-routing-key">>, longstr, CQ}
])),
?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
- RaName = ra_name(QQ),
- publish(Ch, QQ),
+ test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ).
+
+test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) ->
+ publish(Ch, Source),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0),
- wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
- DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]),
+ DeliveryTag = consume(Ch, Source, false),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 1),
- wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
+ wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = false}),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 0),
- wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]),
- _ = consume(Ch, CQ, false).
+ case PolicySet of
+ true ->
+ wait_for_messages(Config, [[Destination, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, Destination, true);
+ false ->
+ wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+dead_letter_policy(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ CQ = <<"classic-dead_letter_policy">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"dlx">>, <<"dead_letter.*">>, <<"queues">>,
+ [{<<"dead-letter-exchange">>, <<"">>},
+ {<<"dead-letter-routing-key">>, CQ}]),
+ RaName = ra_name(QQ),
+ test_dead_lettering(true, Config, Ch, Servers, RaName, QQ, CQ),
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>),
+ test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ).
dead_letter_to_quorum_queue(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1589,6 +1648,7 @@ delete_member(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
+
cleanup_data_dir(Config) ->
%% This test is slow, but also checks that we handle properly errors when
%% trying to delete a queue in minority. A case clause there had gone
@@ -1610,6 +1670,7 @@ cleanup_data_dir(Config) ->
?assertExit({{shutdown,
{connection_closing, {server_initiated_close, 541, _}}}, _},
amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
+ catch amqp_channel:call(Ch, #'queue.delete'{queue = QQ}),
?assert(filelib:is_dir(DataDir)),
?assertEqual(ok,
@@ -1617,6 +1678,132 @@ cleanup_data_dir(Config) ->
[])),
?assert(not filelib:is_dir(DataDir)).
+reconnect_consumer_and_publish(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 2),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = true}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+reconnect_consumer_and_wait(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = true}, _} ->
+ amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+reconnect_consumer_and_wait_channel_down(Config) ->
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ [F1, F2] = lists:delete(Leader, Servers),
+ ChF = rabbit_ct_client_helpers:open_channel(Config, F1),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(ChF, QQ, false),
+ receive
+ {#'basic.deliver'{redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1)
+ end,
+ Up = [Leader, F2],
+ rabbit_ct_broker_helpers:block_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:block_traffic_between(F1, F2),
+ wait_for_messages_ready(Up, RaName, 1),
+ wait_for_messages_pending_ack(Up, RaName, 0),
+ wait_for_messages_ready([F1], RaName, 0),
+ wait_for_messages_pending_ack([F1], RaName, 1),
+ rabbit_ct_client_helpers:close_channel(ChF),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader),
+ rabbit_ct_broker_helpers:allow_traffic_between(F1, F2),
+ %% Let's give it a few seconds to ensure it doesn't attempt to
+ %% deliver to the down channel - it shouldn't be monitored
+ %% at this time!
+ timer:sleep(5000),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
basic_recover(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),