diff options
| -rw-r--r-- | src/rabbit_autoheal.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 57 |
2 files changed, 40 insertions, 19 deletions
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index af1795f953..d2cd5a02c2 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -190,7 +190,7 @@ handle_msg({winner_is, Winner}, end, erlang:demonitor(MRef, [flush]), rabbit:start() - end), + end, true), restarting; handle_msg(_, restarting, _Partitions) -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 0eb0882316..239f6b5deb 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -33,7 +33,7 @@ code_change/3]). %% Utils --export([all_rabbit_nodes_up/0, run_outside_applications/1, ping_all/0, +-export([all_rabbit_nodes_up/0, run_outside_applications/2, ping_all/0, alive_nodes/1, alive_rabbit_nodes/1]). -define(SERVER, ?MODULE). @@ -68,7 +68,7 @@ -spec(pause_partition_guard/0 :: () -> 'ok' | 'pausing'). -spec(all_rabbit_nodes_up/0 :: () -> boolean()). --spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). +-spec(run_outside_applications/2 :: (fun (() -> any()), boolean()) -> pid()). -spec(ping_all/0 :: () -> 'ok'). -spec(alive_nodes/1 :: ([node()]) -> [node()]). -spec(alive_rabbit_nodes/1 :: ([node()]) -> [node()]). @@ -655,30 +655,51 @@ await_cluster_recovery(Condition) -> run_outside_applications(fun () -> rabbit:stop(), wait_for_cluster_recovery(Condition) - end), + end, false), ok. -run_outside_applications(Fun) -> +run_outside_applications(Fun, WaitForExistingProcess) -> spawn(fun () -> %% If our group leader is inside an application we are about %% to stop, application:stop/1 does not return. group_leader(whereis(init), self()), - %% Ensure only one such process at a time, the - %% exit(badarg) is harmless if one is already running - try register(rabbit_outside_app_process, self()) of - true -> - try - Fun() - catch _:E -> - rabbit_log:error( - "rabbit_outside_app_process:~n~p~n~p~n", - [E, erlang:get_stacktrace()]) - end - catch error:badarg -> - ok - end + register_outside_app_process(Fun, WaitForExistingProcess) end). +register_outside_app_process(Fun, WaitForExistingProcess) -> + %% Ensure only one such process at a time, the exit(badarg) is + %% harmless if one is already running. + %% + %% If WaitForExistingProcess is false, the given fun is simply not + %% executed at all and the process exits. + %% + %% If WaitForExistingProcess is true, we wait for the end of the + %% currently running process before executing the given function. + try register(rabbit_outside_app_process, self()) of + true -> + do_run_outside_app_fun(Fun) + catch + error:badarg when WaitForExistingProcess -> + MRef = erlang:monitor(process, rabbit_outside_app_process), + receive + {'DOWN', MRef, _, _, _} -> + %% The existing process exited, let's try to + %% register again. + register_outside_app_process(Fun, WaitForExistingProcess) + end; + error:badarg -> + ok + end. + +do_run_outside_app_fun(Fun) -> + try + Fun() + catch _:E -> + rabbit_log:error( + "rabbit_outside_app_process:~n~p~n~p~n", + [E, erlang:get_stacktrace()]) + end. + wait_for_cluster_recovery(Condition) -> ping_all(), case Condition() of |
