diff options
Diffstat (limited to 'deps/rabbit/test/sync_detection_SUITE.erl')
-rw-r--r-- | deps/rabbit/test/sync_detection_SUITE.erl | 243 |
1 files changed, 243 insertions, 0 deletions
diff --git a/deps/rabbit/test/sync_detection_SUITE.erl b/deps/rabbit/test/sync_detection_SUITE.erl new file mode 100644 index 0000000000..55a86b7b3d --- /dev/null +++ b/deps/rabbit/test/sync_detection_SUITE.erl @@ -0,0 +1,243 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(sync_detection_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +-define(LOOP_RECURSION_DELAY, 100). + +all() -> + [ + {group, cluster_size_2}, + {group, cluster_size_3} + ]. + +groups() -> + [ + {cluster_size_2, [], [ + follower_synchronization + ]}, + {cluster_size_3, [], [ + follower_synchronization_ttl + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(cluster_size_2, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]); +init_per_group(cluster_size_3, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]). + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + ClusterSize = ?config(rmq_nodes_count, Config), + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_count, ClusterSize}, + {rmq_nodes_clustered, true}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ [ + fun rabbit_ct_broker_helpers:set_ha_policy_two_pos/1, + fun rabbit_ct_broker_helpers:set_ha_policy_two_pos_batch_sync/1 + ]). + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +follower_synchronization(Config) -> + [Master, Slave] = rabbit_ct_broker_helpers:get_node_configs(Config, + nodename), + Channel = rabbit_ct_client_helpers:open_channel(Config, Master), + Queue = <<"ha.two.test">>, + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = Queue, + auto_delete = false}), + + %% The comments on the right are the queue length and the pending acks on + %% the master. + rabbit_ct_broker_helpers:stop_broker(Config, Slave), + + %% We get and ack one message when the mirror is down, and check that when we + %% start the mirror it's not marked as synced until ack the message. We also + %% publish another message when the mirror is up. + send_dummy_message(Channel, Queue), % 1 - 0 + {#'basic.get_ok'{delivery_tag = Tag1}, _} = + amqp_channel:call(Channel, #'basic.get'{queue = Queue}), % 0 - 1 + + rabbit_ct_broker_helpers:start_broker(Config, Slave), + + follower_unsynced(Master, Queue), + send_dummy_message(Channel, Queue), % 1 - 1 + follower_unsynced(Master, Queue), + + amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag1}), % 1 - 0 + + follower_synced(Master, Queue), + + %% We restart the mirror and we send a message, so that the mirror will only + %% have one of the messages. + rabbit_ct_broker_helpers:stop_broker(Config, Slave), + rabbit_ct_broker_helpers:start_broker(Config, Slave), + + send_dummy_message(Channel, Queue), % 2 - 0 + + follower_unsynced(Master, Queue), + + %% We reject the message that the mirror doesn't have, and verify that it's + %% still unsynced + {#'basic.get_ok'{delivery_tag = Tag2}, _} = + amqp_channel:call(Channel, #'basic.get'{queue = Queue}), % 1 - 1 + follower_unsynced(Master, Queue), + amqp_channel:cast(Channel, #'basic.reject'{ delivery_tag = Tag2, + requeue = true }), % 2 - 0 + follower_unsynced(Master, Queue), + {#'basic.get_ok'{delivery_tag = Tag3}, _} = + amqp_channel:call(Channel, #'basic.get'{queue = Queue}), % 1 - 1 + amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag3}), % 1 - 0 + follower_synced(Master, Queue), + {#'basic.get_ok'{delivery_tag = Tag4}, _} = + amqp_channel:call(Channel, #'basic.get'{queue = Queue}), % 0 - 1 + amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag4}), % 0 - 0 + follower_synced(Master, Queue). + +follower_synchronization_ttl(Config) -> + [Master, Slave, DLX] = rabbit_ct_broker_helpers:get_node_configs(Config, + nodename), + Channel = rabbit_ct_client_helpers:open_channel(Config, Master), + DLXChannel = rabbit_ct_client_helpers:open_channel(Config, DLX), + + %% We declare a DLX queue to wait for messages to be TTL'ed + DLXQueue = <<"dlx-queue">>, + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = DLXQueue, + auto_delete = false}), + + TestMsgTTL = 5000, + Queue = <<"ha.two.test">>, + %% Sadly we need fairly high numbers for the TTL because starting/stopping + %% nodes takes a fair amount of time. + Args = [{<<"x-message-ttl">>, long, TestMsgTTL}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, DLXQueue}], + #'queue.declare_ok'{} = + amqp_channel:call(Channel, #'queue.declare'{queue = Queue, + auto_delete = false, + arguments = Args}), + + follower_synced(Master, Queue), + + %% All unknown + rabbit_ct_broker_helpers:stop_broker(Config, Slave), + send_dummy_message(Channel, Queue), + send_dummy_message(Channel, Queue), + rabbit_ct_broker_helpers:start_broker(Config, Slave), + follower_unsynced(Master, Queue), + wait_for_messages(DLXQueue, DLXChannel, 2), + follower_synced(Master, Queue), + + %% 1 unknown, 1 known + rabbit_ct_broker_helpers:stop_broker(Config, Slave), + send_dummy_message(Channel, Queue), + rabbit_ct_broker_helpers:start_broker(Config, Slave), + follower_unsynced(Master, Queue), + send_dummy_message(Channel, Queue), + follower_unsynced(Master, Queue), + wait_for_messages(DLXQueue, DLXChannel, 2), + follower_synced(Master, Queue), + + %% %% both known + send_dummy_message(Channel, Queue), + send_dummy_message(Channel, Queue), + follower_synced(Master, Queue), + wait_for_messages(DLXQueue, DLXChannel, 2), + follower_synced(Master, Queue), + + ok. + +send_dummy_message(Channel, Queue) -> + Payload = <<"foo">>, + Publish = #'basic.publish'{exchange = <<>>, routing_key = Queue}, + amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}). + +follower_pids(Node, Queue) -> + {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, + [rabbit_misc:r(<<"/">>, queue, Queue)]), + SSP = synchronised_slave_pids, + [{SSP, Pids}] = rpc:call(Node, rabbit_amqqueue, info, [Q, [SSP]]), + case Pids of + '' -> []; + _ -> Pids + end. + +%% The mnesia synchronization takes a while, but we don't want to wait for the +%% test to fail, since the timetrap is quite high. +wait_for_sync_status(Status, Node, Queue) -> + Max = 90000 / ?LOOP_RECURSION_DELAY, + wait_for_sync_status(0, Max, Status, Node, Queue). + +wait_for_sync_status(N, Max, Status, Node, Queue) when N >= Max -> + erlang:error({sync_status_max_tries_failed, + [{queue, Queue}, + {node, Node}, + {expected_status, Status}, + {max_tried, Max}]}); +wait_for_sync_status(N, Max, Status, Node, Queue) -> + Synced = length(follower_pids(Node, Queue)) =:= 1, + case Synced =:= Status of + true -> ok; + false -> timer:sleep(?LOOP_RECURSION_DELAY), + wait_for_sync_status(N + 1, Max, Status, Node, Queue) + end. + +follower_synced(Node, Queue) -> + wait_for_sync_status(true, Node, Queue). + +follower_unsynced(Node, Queue) -> + wait_for_sync_status(false, Node, Queue). + +wait_for_messages(Queue, Channel, N) -> + Sub = #'basic.consume'{queue = Queue}, + #'basic.consume_ok'{consumer_tag = CTag} = amqp_channel:call(Channel, Sub), + receive + #'basic.consume_ok'{} -> ok + end, + lists:foreach( + fun (_) -> receive + {#'basic.deliver'{delivery_tag = Tag}, _Content} -> + amqp_channel:cast(Channel, + #'basic.ack'{delivery_tag = Tag}) + end + end, lists:seq(1, N)), + amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = CTag}). |