diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 3 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 26 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 2 |
3 files changed, 17 insertions, 14 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index e89f4a9e22..dcf8d071d5 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -533,7 +533,8 @@ state_enter(leader, #state{consumers = Cons, Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], - Effects = Mons ++ Nots, + NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), + Effects = Mons ++ Nots ++ NodeMons, case BLH of undefined -> Effects; diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index ee73d1ed7b..c84eaf4a5e 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -340,7 +340,7 @@ start_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Test declare an existing queue ?assertEqual({'queue.declare_ok', LQ, 0, 0}, @@ -356,7 +356,7 @@ start_queue(Config) -> %% Check that the application and process are still up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). start_queue_concurrent(Config) -> Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -417,13 +417,13 @@ stop_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Delete the quorum queue ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})), %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -442,7 +442,7 @@ restart_queue(Config) -> %% Check that the application and one ra node are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])). idempotent_recover(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), @@ -521,7 +521,7 @@ restart_all_types(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -563,7 +563,7 @@ stop_start_rabbit_app(Config) -> %% Check that the application and two ra nodes are up ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))), - ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup_sup])), %% Check the classic queues restarted correctly Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), {#'basic.get_ok'{}, #amqp_msg{}} = @@ -1263,7 +1263,7 @@ cleanup_queue_state_on_channel_after_publish(Config) -> amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> [] == rpc:call(Server, supervisor, which_children, - [ra_server_sup]) + [ra_server_sup_sup]) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1300,7 +1300,7 @@ cleanup_queue_state_on_channel_after_subscribe(Config) -> wait_for_cleanup(Server, NCh2, 1), ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), %% Check that all queue states have been cleaned wait_for_cleanup(Server, NCh1, 0), @@ -1964,7 +1964,7 @@ delete_immediately_by_resource(Config) -> %% Check that the application and process are down wait_until(fun() -> - [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup_sup]) end), ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, rpc:call(Server, application, which_applications, []))). @@ -2235,7 +2235,8 @@ wait_for_cleanup(Server, Channel, Number) -> wait_for_cleanup(Server, Channel, Number, 60). wait_for_cleanup(Server, Channel, Number, 0) -> - ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel]))); + ?assertEqual(length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])), + Number); wait_for_cleanup(Server, Channel, Number, N) -> case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of Length when Number == Length -> @@ -2261,7 +2262,8 @@ wait_for_messages(Servers, QName, Number, Fun, 0) -> (_) -> -1 end, Msgs), - ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]); + ?assertEqual([Number || _ <- lists:seq(1, length(Servers))], + Totals); wait_for_messages(Servers, QName, Number, Fun, N) -> Msgs = dirty_query(Servers, QName, Fun), case lists:all(fun(M) when is_map(M) -> diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 3263a733a9..0512e8161a 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -55,7 +55,7 @@ init_per_testcase(TestCase, Config) -> meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), - ra_server_sup:remove_all(), + ra_server_sup_sup:remove_all(), ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)), |
