diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2021-03-17 19:02:54 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2021-03-17 21:32:42 +0100 |
commit | 9b3b5d48ec44317d06514d506d471a1b064cc1c0 (patch) | |
tree | bf94001da0cdf4e410df9947d2a73c2c2a89dde9 | |
parent | a57b8e354d3860fb8de51bb1f4d65a553a10cc1a (diff) | |
download | rabbitmq-server-git-parallel-stream-suite.tar.gz |
Run most stream tests in parallelparallel-stream-suite
The test suite isn't faster, I guess some contention on the coordinator,
but is finding some bugs.
-rw-r--r-- | deps/rabbit/src/rabbit_stream_coordinator.erl | 43 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 21 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_stream_queue_SUITE.erl | 423 |
3 files changed, 264 insertions, 223 deletions
diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index 7b3664a2ab..cd3439ddff 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -236,9 +236,7 @@ register_listener(Q) when ?is_amqqueue(Q)-> stream_id => StreamId}}). process_command(Cmd) -> - global:set_lock(?STREAM_COORDINATOR_STARTUP), Servers = ensure_coordinator_started(), - global:del_lock(?STREAM_COORDINATOR_STARTUP), process_command(Servers, Cmd). process_command([], _Cmd) -> @@ -260,24 +258,29 @@ ensure_coordinator_started() -> AllNodes = all_coord_members(), case whereis(?MODULE) of undefined -> - case ra:restart_server(Local) of - {error, Reason} when Reason == not_started orelse - Reason == name_not_registered -> - OtherNodes = all_coord_members() -- [Local], - %% We can't use find_members/0 here as a process that timeouts means the cluster is up - case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end, OtherNodes) of - [] -> - start_coordinator_cluster(); - _ -> - OtherNodes - end; - ok -> - AllNodes; - {error, {already_started, _}} -> - AllNodes; - _ -> - AllNodes - end; + global:set_lock(?STREAM_COORDINATOR_STARTUP), + Nodes = + case ra:restart_server(Local) of + {error, Reason} when Reason == not_started orelse + Reason == name_not_registered -> + OtherNodes = all_coord_members() -- [Local], + %% We can't use find_members/0 here as a process that timeouts means the cluster is up + case lists:filter(fun(N) -> global:whereis_name(N) =/= undefined end, + OtherNodes) of + [] -> + start_coordinator_cluster(); + _ -> + OtherNodes + end; + ok -> + AllNodes; + {error, {already_started, _}} -> + AllNodes; + _ -> + AllNodes + end, + global:del_lock(?STREAM_COORDINATOR_STARTUP), + Nodes; _ -> AllNodes end. diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index fd61729f84..7c9cb00599 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -428,13 +428,16 @@ i(members, Q) when ?is_amqqueue(Q) -> Nodes; i(online, Q) -> #{name := StreamId} = amqqueue:get_type_state(Q), - {ok, Members} = rabbit_stream_coordinator:members(StreamId), - rabbit_log:warning("MEMBERS ~p", [Members]), - maps:fold(fun(_, {undefined, _}, Acc) -> - Acc; - (Key, _, Acc) -> - [Key | Acc] - end, [], Members); + case rabbit_stream_coordinator:members(StreamId) of + {ok, Members} -> + maps:fold(fun(_, {undefined, _}, Acc) -> + Acc; + (Key, _, Acc) -> + [Key | Acc] + end, [], Members); + {error, not_found} -> + [] + end; i(state, Q) when ?is_amqqueue(Q) -> %% TODO the coordinator should answer this, I guess?? running; @@ -464,9 +467,11 @@ i(messages_unacknowledged, Q) when ?is_amqqueue(Q) -> end; i(committed_offset, Q) -> %% TODO should it be on a metrics table? + %% The queue could be removed between the list() and this call + %% to retrieve the overview. Let's default to '' if it's gone. Data = osiris_counters:overview(), maps:get(committed_offset, - maps:get({osiris_writer, amqqueue:get_name(Q)}, Data)); + maps:get({osiris_writer, amqqueue:get_name(Q)}, Data, #{}), ''); i(policy, Q) -> case rabbit_policy:name(Q) of none -> ''; diff --git a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl index f24fa766a7..06bc8f876f 100644 --- a/deps/rabbit/test/rabbit_stream_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_queue_SUITE.erl @@ -28,8 +28,11 @@ suite() -> all() -> [ {group, single_node}, + {group, single_node_parallel}, {group, cluster_size_2}, + {group, cluster_size_2_parallel}, {group, cluster_size_3}, + {group, cluster_size_3_parallel}, {group, unclustered_size_3_1}, {group, unclustered_size_3_2}, {group, unclustered_size_3_3}, @@ -38,24 +41,27 @@ all() -> groups() -> [ - {single_node, [], [restart_single_node] ++ all_tests()}, - {cluster_size_2, [], all_tests()}, - {cluster_size_3, [], all_tests() ++ - [delete_replica, + {single_node, [], [restart_single_node, recover]}, + {single_node_parallel, [parallel], all_tests()}, + {cluster_size_2, [], [recover]}, + {cluster_size_2_parallel, [parallel], all_tests()}, + {cluster_size_3, [], + [recover, delete_down_replica, - delete_classic_replica, - delete_quorum_replica, - consume_from_replica, replica_recovery, leader_failover, - leader_failover_dedupe, - initial_cluster_size_one, - initial_cluster_size_two, - initial_cluster_size_one_policy, - leader_locator_client_local, - leader_locator_random, - leader_locator_least_leaders, - leader_locator_policy]}, + leader_failover_dedupe]}, + {cluster_size_3_parallel, [parallel], [delete_replica, + delete_classic_replica, + delete_quorum_replica, + consume_from_replica, + initial_cluster_size_one, + initial_cluster_size_two, + initial_cluster_size_one_policy, + leader_locator_client_local, + leader_locator_random, + leader_locator_least_leaders, + leader_locator_policy] ++ all_tests()}, {unclustered_size_3_1, [], [add_replica]}, {unclustered_size_3_2, [], [consume_without_local_replica]}, {unclustered_size_3_3, [], [grow_coordinator_cluster]}, @@ -72,7 +78,6 @@ all_tests() -> delete_queue, publish, publish_confirm, - recover, consume_without_qos, consume, consume_offset, @@ -117,8 +122,11 @@ end_per_suite(Config) -> init_per_group(Group, Config) -> ClusterSize = case Group of single_node -> 1; + single_node_parallel -> 1; cluster_size_2 -> 2; + cluster_size_2_parallel -> 2; cluster_size_3 -> 3; + cluster_size_3_parallel -> 3; cluster_size_3_1 -> 3; unclustered_size_3_1 -> 3; unclustered_size_3_2 -> 3; @@ -172,8 +180,7 @@ merge_app_env(Config) -> {rabbit, [{core_metrics_gc_interval, 100}]}). end_per_testcase(Testcase, Config) -> - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), - Config1 = rabbit_ct_helpers:run_steps( + Config1 = rabbit_ct_helpers:run_steps( Config, rabbit_ct_client_helpers:teardown_steps()), rabbit_ct_helpers:testcase_finished(Config1, Testcase). @@ -190,7 +197,8 @@ declare_args(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-max-length">>, long, 2000}])), - assert_queue_type(Server, Q, rabbit_stream_queue). + assert_queue_type(Server, Q, rabbit_stream_queue), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). declare_max_age(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -207,7 +215,8 @@ declare_max_age(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-max-age">>, longstr, <<"1Y">>}])), - assert_queue_type(Server, Q, rabbit_stream_queue). + assert_queue_type(Server, Q, rabbit_stream_queue), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). declare_invalid_properties(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -235,7 +244,8 @@ declare_invalid_properties(Config) -> rabbit_ct_client_helpers:open_channel(Config, Server), #'queue.declare'{queue = Q, durable = false, - arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})). + arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). declare_server_named(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -256,14 +266,23 @@ declare_queue(Config) -> %% Test declare an existing queue ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - - ?assertMatch([_], rpc:call(Server, supervisor, which_children, - [osiris_server_sup])), - + + ?assertMatch([_], find_queue_info(Config, [])), + %% Test declare an existing queue with different arguments ?assertExit(_, declare(Ch, Q, [])), - ok. + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). +find_queue_info(Config, Keys) -> + find_queue_info(Config, 0, Keys). + +find_queue_info(Config, Node, Keys) -> + Name = ?config(queue_name, Config), + QName = rabbit_misc:r(<<"/">>, queue, Name), + Infos = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, info_all, + [<<"/">>, [name] ++ Keys]), + [Info] = [Props || Props <- Infos, lists:member({name, QName}, Props)], + Info. delete_queue(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -320,7 +339,7 @@ add_replica(Config) -> [<<"/">>, Q, Server1])), %% replicas must be recorded on the state, and if we publish messages then they must %% be stored on disk - check_leader_and_replicas(Config, Q, [Server0, Server1]), + check_leader_and_replicas(Config, [Server0, Server1]), %% And if we try again? Idempotent ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, [<<"/">>, Q, Server1])), @@ -330,7 +349,8 @@ add_replica(Config) -> rabbit_control_helper:command(start_app, Server2), ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, add_replica, [<<"/">>, Q, Server2])), - check_leader_and_replicas(Config, Q, [Server0, Server1, Server2]). + check_leader_and_replicas(Config, [Server0, Server1, Server2]), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). delete_replica(Config) -> [Server0, Server1, Server2] = @@ -339,7 +359,7 @@ delete_replica(Config) -> Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - check_leader_and_replicas(Config, Q, [Server0, Server1, Server2]), + check_leader_and_replicas(Config, [Server0, Server1, Server2]), %% Not a member of the cluster, what would happen? ?assertEqual({error, node_not_running}, rpc:call(Server0, rabbit_stream_queue, delete_replica, @@ -348,14 +368,15 @@ delete_replica(Config) -> rpc:call(Server0, rabbit_stream_queue, delete_replica, [<<"/">>, Q, Server1])), %% check it's gone - check_leader_and_replicas(Config, Q, [Server0, Server2]), + check_leader_and_replicas(Config, [Server0, Server2]), %% And if we try again? Idempotent ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, [<<"/">>, Q, Server1])), %% Delete the last replica ?assertEqual(ok, rpc:call(Server0, rabbit_stream_queue, delete_replica, [<<"/">>, Q, Server2])), - check_leader_and_replicas(Config, Q, [Server0]). + check_leader_and_replicas(Config, [Server0]), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). grow_coordinator_cluster(Config) -> [Server0, Server1, _Server2] = @@ -379,7 +400,8 @@ grow_coordinator_cluster(Config) -> _ -> false end - end, 60000). + end, 60000), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). shrink_coordinator_cluster(Config) -> [Server0, Server1, Server2] = @@ -402,7 +424,8 @@ shrink_coordinator_cluster(Config) -> _ -> false end - end, 60000). + end, 60000), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). delete_classic_replica(Config) -> [Server0, Server1, _Server2] = @@ -417,7 +440,8 @@ delete_classic_replica(Config) -> [<<"/">>, Q, 'zen@rabbit'])), ?assertEqual({error, classic_queue_not_supported}, rpc:call(Server0, rabbit_stream_queue, delete_replica, - [<<"/">>, Q, Server1])). + [<<"/">>, Q, Server1])), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). delete_quorum_replica(Config) -> [Server0, Server1, _Server2] = @@ -432,7 +456,8 @@ delete_quorum_replica(Config) -> [<<"/">>, Q, 'zen@rabbit'])), ?assertEqual({error, quorum_queue_not_supported}, rpc:call(Server0, rabbit_stream_queue, delete_replica, - [<<"/">>, Q, Server1])). + [<<"/">>, Q, Server1])), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). delete_down_replica(Config) -> [Server0, Server1, Server2] = @@ -441,19 +466,20 @@ delete_down_replica(Config) -> Q = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - check_leader_and_replicas(Config, Q, [Server0, Server1, Server2]), + check_leader_and_replicas(Config, [Server0, Server1, Server2]), ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), ?assertEqual({error, node_not_running}, rpc:call(Server0, rabbit_stream_queue, delete_replica, [<<"/">>, Q, Server1])), %% check it isn't gone - check_leader_and_replicas(Config, Q, [Server0, Server1, Server2]), + check_leader_and_replicas(Config, [Server0, Server1, Server2]), ok = rabbit_ct_broker_helpers:start_node(Config, Server1), rabbit_ct_helpers:await_condition( fun() -> ok == rpc:call(Server0, rabbit_stream_queue, delete_replica, [<<"/">>, Q, Server1]) - end). + end), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). publish(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -464,7 +490,8 @@ publish(Config) -> declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), publish(Ch, Q), - quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]). + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). publish_confirm(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -478,7 +505,8 @@ publish_confirm(Config) -> amqp_channel:register_confirm_handler(Ch, self()), publish(Ch, Q), amqp_channel:wait_for_confirms(Ch, 5), - quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]). + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). restart_single_node(Config) -> [Server] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -496,7 +524,8 @@ restart_single_node(Config) -> quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]), Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), publish(Ch1, Q), - quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]). + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). recover(Config) -> [Server | _] = Servers0 = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -527,7 +556,7 @@ recover(Config) -> Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), publish(Ch1, Q), quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"2">>, <<"0">>]]), - ok. + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_without_qos(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -539,7 +568,8 @@ consume_without_qos(Config) -> ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>}, - self())). + self())), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_without_local_replica(Config) -> [Server0, Server1 | _] = @@ -558,7 +588,8 @@ consume_without_local_replica(Config) -> qos(Ch1, 10, false), ?assertExit({{shutdown, {server_initiated_close, 406, _}}, _}, amqp_channel:subscribe(Ch1, #'basic.consume'{queue = Q, consumer_tag = <<"ctag">>}, - self())). + self())), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -585,7 +616,8 @@ consume(Config) -> ok after 5000 -> exit(timeout) - end. + end, + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_offset(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -620,7 +652,8 @@ consume_offset(Config) -> amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), true end) - end, [], 25). + end, [], 25), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_timestamp_offset(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -651,10 +684,13 @@ consume_timestamp_offset(Config) -> receive #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> ok + after 5000 -> + exit(consume_ok_timeout) end, %% It has subscribed to a very old timestamp, so we will receive the whole stream - receive_batch(Ch1, 0, 99). + receive_batch(Ch1, 0, 99), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_timestamp_last_offset(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -685,6 +721,8 @@ consume_timestamp_last_offset(Config) -> receive #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> ok + after 5000 -> + exit(missing_consume_ok) end, receive @@ -701,7 +739,8 @@ consume_timestamp_last_offset(Config) -> amqp_channel:wait_for_confirms(Ch, 5), %% Yeah! we got them - receive_batch(Ch1, 100, 199). + receive_batch(Ch1, 100, 199), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). basic_get(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -712,7 +751,8 @@ basic_get(Config) -> declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, - amqp_channel:call(Ch, #'basic.get'{queue = Q})). + amqp_channel:call(Ch, #'basic.get'{queue = Q})), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_with_autoack(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -727,7 +767,8 @@ consume_with_autoack(Config) -> ?assertExit( {{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, - subscribe(Ch1, Q, true, 0)). + subscribe(Ch1, Q, true, 0)), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_and_nack(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -757,7 +798,8 @@ consume_and_nack(Config) -> declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])) after 10000 -> exit(timeout) - end. + end, + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). basic_cancel(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -774,19 +816,31 @@ basic_cancel(Config) -> Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), - subscribe(Ch1, Q, false, 0), + CTag = <<"basic_cancel">>, + subscribe(Ch1, Q, false, 0, CTag), rabbit_ct_helpers:await_condition( fun() -> - 1 == length(rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list, - [consumer_created])) + 1 == length(filter_consumers(Config, Server, CTag)) end, 30000), receive {#'basic.deliver'{}, _} -> - amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = <<"ctag">>}), - ?assertMatch([], rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list, [consumer_created])) + amqp_channel:call(Ch1, #'basic.cancel'{consumer_tag = CTag}), + ?assertMatch([], filter_consumers(Config, Server, CTag)) after 10000 -> exit(timeout) - end. + end, + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). + +filter_consumers(Config, Server, CTag) -> + CInfo = rabbit_ct_broker_helpers:rpc(Config, Server, ets, tab2list, [consumer_created]), + lists:foldl(fun(Tuple, Acc) -> + Key = element(1, Tuple), + case Key of + {_, _, CTag} -> + [Key | Acc]; + _ -> Acc + end + end, [], CInfo). consume_and_reject(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -815,7 +869,8 @@ consume_and_reject(Config) -> declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])) after 10000 -> exit(timeout) - end. + end, + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_and_ack(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -845,7 +900,8 @@ consume_and_ack(Config) -> quorum_queue_utils:wait_for_messages(Config, [[Q, <<"1">>, <<"1">>, <<"0">>]]) after 5000 -> exit(timeout) - end. + end, + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_from_last(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -864,14 +920,17 @@ consume_from_last(Config) -> Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), - [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [committed_offset]]), + rabbit_ct_helpers:await_condition( + fun () -> + Info = find_queue_info(Config, [committed_offset]), + %% We'll receive data from the last committed offset, let's check that is not the + %% first offset + proplists:get_value(committed_offset, Info) > 0 + end), + + CommittedOffset = proplists:get_value(committed_offset, + find_queue_info(Config, [committed_offset])), - %% We'll receive data from the last committed offset, let's check that is not the - %% first offset - CommittedOffset = proplists:get_value(committed_offset, Info), - ?assert(CommittedOffset > 0), - %% If the offset is not provided, we're subscribing to the tail of the stream amqp_channel:subscribe( Ch1, #'basic.consume'{queue = Q, @@ -892,7 +951,8 @@ consume_from_last(Config) -> amqp_channel:wait_for_confirms(Ch, 5), %% Yeah! we got them - receive_batch(Ch1, 100, 199). + receive_batch(Ch1, 100, 199), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_from_next(Config) -> consume_from_next(Config, [{<<"x-stream-offset">>, longstr, <<"next">>}]). @@ -917,13 +977,14 @@ consume_from_next(Config, Args) -> Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 10, false), - [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [committed_offset]]), - %% We'll receive data from the last committed offset, let's check that is not the - %% first offset - CommittedOffset = proplists:get_value(committed_offset, Info), - ?assert(CommittedOffset > 0), + rabbit_ct_helpers:await_condition( + fun () -> + Info = find_queue_info(Config, [committed_offset]), + %% We'll receive data from the last committed offset, let's check that is not the + %% first offset + proplists:get_value(committed_offset, Info) > 0 + end), %% If the offset is not provided, we're subscribing to the tail of the stream amqp_channel:subscribe( @@ -935,6 +996,8 @@ consume_from_next(Config, Args) -> receive #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> ok + after 10000 -> + exit(consume_ok_failed) end, %% Publish a few more @@ -942,7 +1005,8 @@ consume_from_next(Config, Args) -> amqp_channel:wait_for_confirms(Ch, 5), %% Yeah! we got them - receive_batch(Ch1, 100, 199). + receive_batch(Ch1, 100, 199), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_from_replica(Config) -> [Server1, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -958,11 +1022,18 @@ consume_from_replica(Config) -> [publish(Ch1, Q, <<"msg1">>) || _ <- lists:seq(1, 100)], amqp_channel:wait_for_confirms(Ch1, 5), + rabbit_ct_helpers:await_condition( + fun () -> + Info = find_queue_info(Config, 1, [online]), + length(proplists:get_value(online, Info)) == 3 + end), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server2), qos(Ch2, 10, false), subscribe(Ch2, Q, false, 0), - receive_batch(Ch2, 0, 99). + receive_batch(Ch2, 0, 99), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_credit(Config) -> %% Because osiris provides one chunk on every read and we don't want to buffer @@ -1023,7 +1094,8 @@ consume_credit(Config) -> ok after 5000 -> exit(timeout) - end. + end, + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_credit_out_of_order_ack(Config) -> %% Like consume_credit but acknowledging the messages out of order. @@ -1084,7 +1156,8 @@ consume_credit_out_of_order_ack(Config) -> ok after 5000 -> exit(timeout) - end. + end, + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). consume_credit_multiple_ack(Config) -> %% Like consume_credit but acknowledging the messages out of order. @@ -1126,7 +1199,8 @@ consume_credit_multiple_ack(Config) -> ok after 5000 -> exit(timeout) - end. + end, + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). max_length_bytes(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1152,7 +1226,8 @@ max_length_bytes(Config) -> qos(Ch1, 100, false), subscribe(Ch1, Q, false, 0), - ?assert(length(receive_batch()) < 100). + ?assert(length(receive_batch()) < 100), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). max_age(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1182,7 +1257,8 @@ max_age(Config) -> Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), qos(Ch1, 200, false), subscribe(Ch1, Q, false, 0), - ?assertEqual(100, length(receive_batch())). + ?assertEqual(100, length(receive_batch())), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). replica_recovery(Config) -> Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1222,7 +1298,7 @@ replica_recovery(Config) -> amqp_channel:close(Ch2) end || PNodes <- permute(Nodes)], - ok. + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). leader_failover(Config) -> [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1238,25 +1314,20 @@ leader_failover(Config) -> [publish(Ch1, Q, <<"msg">>) || _ <- lists:seq(1, 100)], amqp_channel:wait_for_confirms(Ch1, 5), - check_leader_and_replicas(Config, Q, [Server1, Server2, Server3]), + check_leader_and_replicas(Config, [Server1, Server2, Server3]), ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), timer:sleep(30000), rabbit_ct_helpers:await_condition( fun () -> - [Info] = - lists:filter( - fun(Props) -> - QName = rabbit_misc:r(<<"/">>, queue, Q), - lists:member({name, QName}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader, members]])), + Info = find_queue_info(Config, 1, [leader, members]), + NewLeader = proplists:get_value(leader, Info), NewLeader =/= Server1 end), - ok = rabbit_ct_broker_helpers:start_node(Config, Server1). + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). leader_failover_dedupe(Config) -> %% tests that in-flight messages are automatically handled in the case where @@ -1274,7 +1345,7 @@ leader_failover_dedupe(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch1, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - check_leader_and_replicas(Config, Q, Nodes), + check_leader_and_replicas(Config, Nodes), Ch2 = rabbit_ct_client_helpers:open_channel(Config, PubNode), #'confirm.select_ok'{} = amqp_channel:call(Ch2, #'confirm.select'{}), @@ -1309,14 +1380,7 @@ leader_failover_dedupe(Config) -> ct:pal("preinfo", []), rabbit_ct_helpers:await_condition( fun() -> - [Info] = lists:filter( - fun(Props) -> - QName = rabbit_misc:r(<<"/">>, queue, Q), - lists:member({name, QName}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, PubNode, rabbit_amqqueue, - info_all, - [<<"/">>, [name, leader, members]])), + Info = find_queue_info(Config, PubNode, [leader, members]), ct:pal("info ~p", [Info]), NewLeader = proplists:get_value(leader, Info), NewLeader =/= DownNode @@ -1338,7 +1402,7 @@ leader_failover_dedupe(Config) -> subscribe(Ch2, Q, false, 0), validate_dedupe(Ch2, 1, N), - ok. + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). initial_cluster_size_one(Config) -> [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1349,10 +1413,11 @@ initial_cluster_size_one(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-initial-cluster-size">>, long, 1}])), - check_leader_and_replicas(Config, Q, [Server1]), + check_leader_and_replicas(Config, [Server1]), ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})). + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). initial_cluster_size_two(Config) -> [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1364,17 +1429,14 @@ initial_cluster_size_two(Config) -> declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-initial-cluster-size">>, long, 2}])), - [Info] = lists:filter( - fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader, members]])), + Info = find_queue_info(Config, [leader, members]), + ?assertEqual(Server1, proplists:get_value(leader, Info)), ?assertEqual(2, length(proplists:get_value(members, Info))), ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch, #'queue.delete'{queue = Q})). + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). initial_cluster_size_one_policy(Config) -> [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1389,12 +1451,13 @@ initial_cluster_size_one_policy(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-initial-cluster-size">>, long, 1}])), - check_leader_and_replicas(Config, Q, [Server1]), + check_leader_and_replicas(Config, [Server1]), ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = Q})), - ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"cluster-size">>). + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"cluster-size">>), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). leader_locator_client_local(Config) -> [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1406,12 +1469,8 @@ leader_locator_client_local(Config) -> declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), - [Info] = lists:filter( - fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader]])), + Info = find_queue_info(Config, [leader]), + ?assertEqual(Server1, proplists:get_value(leader, Info)), ?assertMatch(#'queue.delete_ok'{}, @@ -1423,12 +1482,7 @@ leader_locator_client_local(Config) -> declare(Ch2, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), - [Info2] = lists:filter( - fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader]])), + Info2 = find_queue_info(Config, [leader]), ?assertEqual(Server2, proplists:get_value(leader, Info2)), ?assertMatch(#'queue.delete_ok'{}, @@ -1440,16 +1494,13 @@ leader_locator_client_local(Config) -> declare(Ch3, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"client-local">>}])), - [Info3] = lists:filter( - fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader]])), + + Info3 = find_queue_info(Config, [leader]), ?assertEqual(Server3, proplists:get_value(leader, Info3)), ?assertMatch(#'queue.delete_ok'{}, - amqp_channel:call(Ch3, #'queue.delete'{queue = Q})). + amqp_channel:call(Ch3, #'queue.delete'{queue = Q})), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). leader_locator_random(Config) -> [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1461,12 +1512,7 @@ leader_locator_random(Config) -> declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), - [Info] = lists:filter( - fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader]])), + Info = find_queue_info(Config, [leader]), Leader = proplists:get_value(leader, Info), ?assertMatch(#'queue.delete_ok'{}, @@ -1481,16 +1527,12 @@ leader_locator_random(Config) -> declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"random">>}])), - [Info2] = lists:filter( - fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader]])), + Info2 = find_queue_info(Config, [leader]), Leader2 = proplists:get_value(leader, Info2), Leader =/= Leader2 - end, 10). + end, 10), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). leader_locator_least_leaders(Config) -> [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1511,15 +1553,11 @@ leader_locator_least_leaders(Config) -> declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, {<<"x-queue-leader-locator">>, longstr, <<"least-leaders">>}])), - [Info] = lists:filter( - fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader]])), + Info = find_queue_info(Config, [leader]), Leader = proplists:get_value(leader, Info), - ?assert(lists:member(Leader, [Server2, Server3])). + ?assert(lists:member(Leader, [Server2, Server3])), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). leader_locator_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1534,10 +1572,7 @@ leader_locator_policy(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [policy, operator_policy, - effective_policy_definition, - name, leader]]), + Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition, leader]), ?assertEqual(<<"leader-locator">>, proplists:get_value(policy, Info)), ?assertEqual('', proplists:get_value(operator_policy, Info)), @@ -1554,17 +1589,13 @@ leader_locator_policy(Config) -> ?assertEqual({'queue.declare_ok', Q, 0, 0}, declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), - [Info2] = lists:filter( - fun(Props) -> - lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader]])), + Info2 = find_queue_info(Config, [leader]), Leader2 = proplists:get_value(leader, Info2), Leader =/= Leader2 end, 10), - ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>). + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"leader-locator">>), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). repeat_until(_, 0) -> ct:fail("Condition did not materialize in the expected amount of attempts"); @@ -1588,15 +1619,14 @@ invalid_policy(Config) -> Config, 0, <<"ttl">>, <<"invalid_policy.*">>, <<"queues">>, [{<<"message-ttl">>, 5}]), - [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [policy, operator_policy, - effective_policy_definition]]), + Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]), ?assertEqual('', proplists:get_value(policy, Info)), ?assertEqual('', proplists:get_value(operator_policy, Info)), ?assertEqual([], proplists:get_value(effective_policy_definition, Info)), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ha">>), - ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>). + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"ttl">>), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). max_age_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1610,16 +1640,15 @@ max_age_policy(Config) -> Config, 0, <<"age">>, <<"max_age_policy.*">>, <<"queues">>, [{<<"max-age">>, <<"1Y">>}]), - [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [policy, operator_policy, - effective_policy_definition]]), + Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]), ?assertEqual(<<"age">>, proplists:get_value(policy, Info)), ?assertEqual('', proplists:get_value(operator_policy, Info)), ?assertEqual([{<<"max-age">>, <<"1Y">>}], proplists:get_value(effective_policy_definition, Info)), - ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>). + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"age">>), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). update_retention_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1636,13 +1665,12 @@ update_retention_policy(Config) -> {ok, Q0} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, Q)]), - timer:sleep(1000), + timer:sleep(2000), ok = rabbit_ct_broker_helpers:set_policy( Config, 0, <<"retention">>, <<"update_retention_policy.*">>, <<"queues">>, [{<<"max-age">>, <<"1s">>}]), - timer:sleep(1000), - quorum_queue_utils:wait_for_max_messages(Config, Q, 1000), + quorum_queue_utils:wait_for_max_messages(Config, Q, 3000), {ok, Q1} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [rabbit_misc:r(<<"/">>, queue, Q)]), @@ -1650,7 +1678,8 @@ update_retention_policy(Config) -> %% If there are changes only in the retention policy, processes should not be restarted ?assertEqual(amqqueue:get_pid(Q0), amqqueue:get_pid(Q1)), - ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"retention">>). + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"retention">>), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). queue_info(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1662,12 +1691,12 @@ queue_info(Config) -> rabbit_ct_helpers:await_condition( fun() -> - [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [name, leader, online, members]]), + Info = find_queue_info(Config, [leader, online, members]), lists:member(proplists:get_value(leader, Info), Servers) andalso (lists:sort(Servers) == lists:sort(proplists:get_value(members, Info))) andalso (lists:sort(Servers) == lists:sort(proplists:get_value(online, Info))) - end). + end), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). max_segment_size_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1680,15 +1709,14 @@ max_segment_size_policy(Config) -> Config, 0, <<"segment">>, <<"max_segment_size.*">>, <<"queues">>, [{<<"max-segment-size">>, 5000}]), - [Info] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, [policy, operator_policy, - effective_policy_definition]]), + Info = find_queue_info(Config, [policy, operator_policy, effective_policy_definition]), ?assertEqual(<<"segment">>, proplists:get_value(policy, Info)), ?assertEqual('', proplists:get_value(operator_policy, Info)), ?assertEqual([{<<"max-segment-size">>, 5000}], proplists:get_value(effective_policy_definition, Info)), - ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"segment">>). + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"segment">>), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). purge(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -1699,7 +1727,8 @@ purge(Config) -> declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, - amqp_channel:call(Ch, #'queue.purge'{queue = Q})). + amqp_channel:call(Ch, #'queue.purge'{queue = Q})), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). %%---------------------------------------------------------------------------- @@ -1707,6 +1736,15 @@ delete_queues() -> [{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) || Q <- rabbit_amqqueue:list()]. +delete_testcase_queue(Name) -> + QName = rabbit_misc:r(<<"/">>, queue, Name), + case rabbit_amqqueue:lookup(QName) of + {ok, Q} -> + {ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>); + _ -> + ok + end. + declare(Ch, Q) -> declare(Ch, Q, []). @@ -1724,22 +1762,14 @@ get_queue_type(Server, Q0) -> {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), amqqueue:get_type(Q1). -check_leader_and_replicas(Config, Name, Members) -> - QNameRes = rabbit_misc:r(<<"/">>, queue, Name), +check_leader_and_replicas(Config, Members) -> rabbit_ct_helpers:await_condition( fun() -> - [Info] = lists:filter( - fun(Props) -> - lists:member({name, QNameRes}, Props) - end, - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, - info_all, [<<"/">>, - [name, leader, - members]])), + Info = find_queue_info(Config, [leader, members]), ct:pal("~s members ~w ~p", [?FUNCTION_NAME, Members, Info]), lists:member(proplists:get_value(leader, Info), Members) andalso (lists:sort(Members) == lists:sort(proplists:get_value(members, Info))) - end). + end, 60000). publish(Ch, Queue) -> publish(Ch, Queue, <<"msg">>). @@ -1751,13 +1781,16 @@ publish(Ch, Queue, Msg) -> payload = Msg}). subscribe(Ch, Queue, NoAck, Offset) -> + subscribe(Ch, Queue, NoAck, Offset, <<"ctag">>). + +subscribe(Ch, Queue, NoAck, Offset, CTag) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, no_ack = NoAck, - consumer_tag = <<"ctag">>, + consumer_tag = CTag, arguments = [{<<"x-stream-offset">>, long, Offset}]}, self()), receive - #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + #'basic.consume_ok'{consumer_tag = CTag} -> ok end. |