diff options
Diffstat (limited to 'src/rabbit.erl')
| -rw-r--r-- | src/rabbit.erl | 142 |
1 files changed, 128 insertions, 14 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 81c7eee580..2cc353d7b8 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -284,16 +284,120 @@ broker_start() -> Plugins = rabbit_plugins:setup(), ToBeLoaded = Plugins ++ ?APPS, start_apps(ToBeLoaded), - case os:type() of - {win32, _} -> ok; - _ -> case code:load_file(sd_notify) of - {module, sd_notify} -> SDNotify = sd_notify, - SDNotify:sd_notify(0, "READY=1"); - {error, _} -> os:cmd("systemd-notify --ready") - end - end, + maybe_sd_notify(), ok = log_broker_started(rabbit_plugins:active()). +%% Try to send systemd ready notification if it makes sense in the +%% current environment. standard_error is used intentionally in all +%% logging statements, so all this messages will end in systemd +%% journal. +maybe_sd_notify() -> + case sd_notify_ready() of + false -> + io:format(standard_error, "systemd READY notification failed, beware of timeouts~n", []); + _ -> + ok + end. + +sd_notify_ready() -> + case {os:type(), os:getenv("NOTIFY_SOCKET")} of + {{win32, _}, _} -> + true; + {_, [_|_]} -> %% Non-empty NOTIFY_SOCKET, give it a try + sd_notify_legacy() orelse sd_notify_socat(); + _ -> + true + end. + +sd_notify_data() -> + "READY=1\nSTATUS=Initialized\nMAINPID=" ++ os:getpid() ++ "\n". + +sd_notify_legacy() -> + case code:load_file(sd_notify) of + {module, sd_notify} -> + SDNotify = sd_notify, + SDNotify:sd_notify(0, sd_notify_data()), + true; + {error, _} -> + false + end. + +%% socat(1) is the most portable way the sd_notify could be +%% implemented in erlang, without introducing some NIF. Currently the +%% following issues prevent us from implementing it in a more +%% reasonable way: +%% - systemd-notify(1) is unstable for non-root users +%% - erlang doesn't support unix domain sockets. +%% +%% Some details on how we ended with such a solution: +%% https://github.com/rabbitmq/rabbitmq-server/issues/664 +sd_notify_socat() -> + case sd_current_unit() of + {ok, Unit} -> + io:format(standard_error, "systemd unit for activation check: \"~s\"~n", [Unit]), + sd_notify_socat(Unit); + _ -> + false + end. + +socat_socket_arg("@" ++ AbstractUnixSocket) -> + "abstract-sendto:" ++ AbstractUnixSocket; +socat_socket_arg(UnixSocket) -> + "unix-sendto:" ++ UnixSocket. + +sd_open_port() -> + open_port( + {spawn_executable, os:find_executable("socat")}, + [{args, [socat_socket_arg(os:getenv("NOTIFY_SOCKET")), "STDIO"]}, + use_stdio, out]). + +sd_notify_socat(Unit) -> + case sd_open_port() of + {'EXIT', Exit} -> + io:format(standard_error, "Failed to start socat ~p~n", [Exit]), + false; + Port -> + Port ! {self(), {command, sd_notify_data()}}, + Result = sd_wait_activation(Port, Unit), + port_close(Port), + Result + end. + +sd_current_unit() -> + case catch re:run(os:cmd("systemctl status " ++ os:getpid()), "([-.@0-9a-zA-Z]+)", [unicode, {capture, all_but_first, list}]) of + {'EXIT', _} -> + error; + {match, [Unit]} -> + {ok, Unit}; + _ -> + error + end. + +sd_wait_activation(Port, Unit) -> + case os:find_executable("systemctl") of + false -> + io:format(standard_error, "'systemctl' unavailable, falling back to sleep~n", []), + timer:sleep(5000), + true; + _ -> + sd_wait_activation(Port, Unit, 10) + end. + +sd_wait_activation(_, _, 0) -> + io:format(standard_error, "Service still in 'activating' state, bailing out~n", []), + false; +sd_wait_activation(Port, Unit, AttemptsLeft) -> + case os:cmd("systemctl show --property=ActiveState " ++ Unit) of + "ActiveState=activating\n" -> + timer:sleep(1000), + sd_wait_activation(Port, Unit, AttemptsLeft - 1); + "ActiveState=" ++ _ -> + true; + _ = Err-> + io:format(standard_error, "Unexpected status from systemd ~p~n", [Err]), + false + end. + start_it(StartFun) -> Marker = spawn_link(fun() -> receive stop -> ok end end), case catch register(rabbit_boot, Marker) of @@ -332,6 +436,10 @@ stop_and_halt() -> stop() after rabbit_log:info("Halting Erlang VM~n", []), + %% Also duplicate this information to stderr, so console where + %% foreground broker was running (or systemd journal) will + %% contain information about graceful termination. + io:format(standard_error, "Gracefully halting Erlang VM~n", []), init:stop() end, ok. @@ -693,7 +801,8 @@ print_banner() -> "~n ########## Logs: ~s" "~n ###### ## ~s" "~n ##########" - "~n Starting broker...", + "~n Starting broker..." + "~n", [Product, Version, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE, log_location(kernel), log_location(sasl)]). @@ -722,11 +831,16 @@ log_banner() -> rabbit_log:info("~s", [Banner]). warn_if_kernel_config_dubious() -> - case erlang:system_info(kernel_poll) of - true -> ok; - false -> rabbit_log:warning( - "Kernel poll (epoll, kqueue, etc) is disabled. Throughput " - "and CPU utilization may worsen.~n") + case os:type() of + {win32, _} -> + ok; + _ -> + case erlang:system_info(kernel_poll) of + true -> ok; + false -> rabbit_log:warning( + "Kernel poll (epoll, kqueue, etc) is disabled. Throughput " + "and CPU utilization may worsen.~n") + end end, AsyncThreads = erlang:system_info(thread_pool_size), case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of |
