summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl3
-rw-r--r--test/quorum_queue_SUITE.erl26
-rw-r--r--test/rabbit_fifo_SUITE.erl2
3 files changed, 17 insertions, 14 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index e89f4a9e22..dcf8d071d5 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -533,7 +533,8 @@ state_enter(leader, #state{consumers = Cons,
Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
Mons = [{monitor, process, P} || P <- Pids],
Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
- Effects = Mons ++ Nots,
+ NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
+ Effects = Mons ++ Nots ++ NodeMons,
case BLH of
undefined ->
Effects;
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index ee73d1ed7b..c84eaf4a5e 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -340,7 +340,7 @@ start_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Test declare an existing queue
?assertEqual({'queue.declare_ok', LQ, 0, 0},
@@ -356,7 +356,7 @@ start_queue(Config) ->
%% Check that the application and process are still up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
start_queue_concurrent(Config) ->
Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -417,13 +417,13 @@ stop_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Delete the quorum queue
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -442,7 +442,7 @@ restart_queue(Config) ->
%% Check that the application and one ra node are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])).
idempotent_recover(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
@@ -521,7 +521,7 @@ restart_all_types(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -563,7 +563,7 @@ stop_start_rabbit_app(Config) ->
%% Check that the application and two ra nodes are up
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))),
- ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])),
%% Check the classic queues restarted correctly
Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
{#'basic.get_ok'{}, #amqp_msg{}} =
@@ -1263,7 +1263,7 @@ cleanup_queue_state_on_channel_after_publish(Config) ->
amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
[] == rpc:call(Server, supervisor, which_children,
- [ra_server_sup])
+ [ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1300,7 +1300,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) ->
wait_for_cleanup(Server, NCh2, 1),
?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
%% Check that all queue states have been cleaned
wait_for_cleanup(Server, NCh1, 0),
@@ -1964,7 +1964,7 @@ delete_immediately_by_resource(Config) ->
%% Check that the application and process are down
wait_until(fun() ->
- [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])
end),
?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
rpc:call(Server, application, which_applications, []))).
@@ -2235,7 +2235,8 @@ wait_for_cleanup(Server, Channel, Number) ->
wait_for_cleanup(Server, Channel, Number, 60).
wait_for_cleanup(Server, Channel, Number, 0) ->
- ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])));
+ ?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])),
+ Number);
wait_for_cleanup(Server, Channel, Number, N) ->
case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of
Length when Number == Length ->
@@ -2261,7 +2262,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) ->
(_) ->
-1
end, Msgs),
- ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]);
+ ?assertEqual([Number || _ <- lists:seq(1, length(Servers))],
+ Totals);
wait_for_messages(Servers, QName, Number, Fun, N) ->
Msgs = dirty_query(Servers, QName, Fun),
case lists:all(fun(M) when is_map(M) ->
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 3263a733a9..0512e8161a 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) ->
meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
- ra_server_sup:remove_all(),
+ ra_server_sup_sup:remove_all(),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),