summaryrefslogtreecommitdiff
path: root/src/rabbit.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit.erl')
-rw-r--r--src/rabbit.erl187
1 files changed, 123 insertions, 64 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 29e38c1f5f..b00a1ad75c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -22,10 +22,9 @@
stop_and_halt/0, await_startup/0, status/0, is_running/0,
is_running/1, environment/0, rotate_logs/1, force_event_refresh/1,
start_fhc/0]).
-
-export([start/2, stop/1]).
-
--export([log_location/1]). %% for testing
+-export([start_apps/1, stop_apps/1]).
+-export([log_location/1, config_files/0]). %% for testing and mgmt-agent
%%---------------------------------------------------------------------------
%% Boot steps.
@@ -75,13 +74,6 @@
{requires, external_infrastructure},
{enables, kernel_ready}]}).
--rabbit_boot_step({rabbit_log,
- [{description, "logging server"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_log]}},
- {requires, external_infrastructure},
- {enables, kernel_ready}]}).
-
-rabbit_boot_step({rabbit_event,
[{description, "statistics event manager"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -202,6 +194,7 @@
%% practice 2 processes seems just as fast as any other number > 1,
%% and keeps the progress bar realistic-ish.
-define(HIPE_PROCESSES, 2).
+-define(ASYNC_THREADS_WARNING_THRESHOLD, 8).
%%----------------------------------------------------------------------------
@@ -211,6 +204,7 @@
%% this really should be an abstract type
-type(log_location() :: 'tty' | 'undefined' | file:filename()).
-type(param() :: atom()).
+-type(app_name() :: atom()).
-spec(start/0 :: () -> 'ok').
-spec(boot/0 :: () -> 'ok').
@@ -242,6 +236,8 @@
-spec(maybe_insert_default_data/0 :: () -> 'ok').
-spec(boot_delegate/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
+-spec(start_apps/1 :: ([app_name()]) -> 'ok').
+-spec(stop_apps/1 :: ([app_name()]) -> 'ok').
-endif.
@@ -263,7 +259,7 @@ maybe_hipe_compile() ->
warn_if_hipe_compilation_failed(true) ->
ok;
warn_if_hipe_compilation_failed(false) ->
- error_logger:warning_msg(
+ rabbit_log:warning(
"Not HiPE compiling: HiPE not found in this Erlang installation.~n").
%% HiPE compilation happens before we have log handlers and can take a
@@ -312,9 +308,7 @@ start() ->
ok = ensure_working_log_handlers(),
rabbit_node_monitor:prepare_cluster_status_files(),
rabbit_mnesia:check_cluster_consistency(),
- ok = app_utils:start_applications(
- app_startup_order(), fun handle_app_error/2),
- ok = log_broker_started(rabbit_plugins:active())
+ broker_start()
end).
boot() ->
@@ -329,21 +323,14 @@ boot() ->
%% the upgrade, since if we are a secondary node the
%% primary node will have forgotten us
rabbit_mnesia:check_cluster_consistency(),
- Plugins = rabbit_plugins:setup(),
- ToBeLoaded = Plugins ++ ?APPS,
- ok = app_utils:load_applications(ToBeLoaded),
- StartupApps = app_utils:app_dependency_order(ToBeLoaded,
- false),
- ok = app_utils:start_applications(
- StartupApps, fun handle_app_error/2),
- ok = log_broker_started(Plugins)
+ broker_start()
end).
-handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- throw({could_not_start, App, Reason});
-
-handle_app_error(App, Reason) ->
- throw({could_not_start, App, Reason}).
+broker_start() ->
+ Plugins = rabbit_plugins:setup(),
+ ToBeLoaded = Plugins ++ ?APPS,
+ start_apps(ToBeLoaded),
+ ok = log_broker_started(rabbit_plugins:active()).
start_it(StartFun) ->
Marker = spawn_link(fun() -> receive stop -> ok end end),
@@ -371,22 +358,66 @@ start_it(StartFun) ->
stop() ->
case whereis(rabbit_boot) of
undefined -> ok;
- _ -> await_startup()
+ _ -> await_startup(true)
end,
- rabbit_log:info("Stopping RabbitMQ~n"),
- ok = app_utils:stop_applications(app_shutdown_order()).
+ rabbit_log:info("Stopping RabbitMQ~n", []),
+ Apps = ?APPS ++ rabbit_plugins:active(),
+ stop_apps(app_utils:app_dependency_order(Apps, true)).
stop_and_halt() ->
try
stop()
after
- rabbit_misc:local_info_msg("Halting Erlang VM~n", []),
+ rabbit_log:info("Halting Erlang VM~n", []),
init:stop()
end,
ok.
+start_apps(Apps) ->
+ app_utils:load_applications(Apps),
+ OrderedApps = app_utils:app_dependency_order(Apps, false),
+ case lists:member(rabbit, Apps) of
+ false -> run_boot_steps(Apps); %% plugin activation
+ true -> ok %% will run during start of rabbit app
+ end,
+ ok = app_utils:start_applications(OrderedApps,
+ handle_app_error(could_not_start)).
+
+stop_apps(Apps) ->
+ ok = app_utils:stop_applications(
+ Apps, handle_app_error(error_during_shutdown)),
+ case lists:member(rabbit, Apps) of
+ false -> run_cleanup_steps(Apps); %% plugin deactivation
+ true -> ok %% it's all going anyway
+ end,
+ ok.
+
+handle_app_error(Term) ->
+ fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) ->
+ throw({Term, App, ExitReason});
+ (App, Reason) ->
+ throw({Term, App, Reason})
+ end.
+
+run_cleanup_steps(Apps) ->
+ [run_step(Name, Attrs, cleanup) || {_, Name, Attrs} <- find_steps(Apps)],
+ ok.
+
await_startup() ->
- app_utils:wait_for_applications(app_startup_order()).
+ await_startup(false).
+
+await_startup(HaveSeenRabbitBoot) ->
+ %% We don't take absence of rabbit_boot as evidence we've started,
+ %% since there's a small window before it is registered.
+ case whereis(rabbit_boot) of
+ undefined -> case HaveSeenRabbitBoot orelse is_running() of
+ true -> ok;
+ false -> timer:sleep(100),
+ await_startup(false)
+ end;
+ _ -> timer:sleep(100),
+ await_startup(true)
+ end.
status() ->
S1 = [{pid, list_to_integer(os:getpid())},
@@ -437,6 +468,9 @@ listeners() ->
ip_address = IP,
port = Port} <- Listeners, Node =:= node()].
+%% TODO this only determines if the rabbit application has started,
+%% not if it is running, never mind plugins. It would be nice to have
+%% more nuance here.
is_running() -> is_running(node()).
is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
@@ -447,7 +481,7 @@ environment() ->
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
- rabbit_misc:local_info_msg("Rotating logs with suffix '~s'~n", [Suffix]),
+ rabbit_log:info("Rotating logs with suffix '~s'~n", [Suffix]),
log_rotation_result(rotate_logs(log_location(kernel),
Suffix,
rabbit_error_logger_file_h),
@@ -461,14 +495,15 @@ start(normal, []) ->
case erts_version_check() of
ok ->
{ok, Vsn} = application:get_key(rabbit, vsn),
- error_logger:info_msg("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n",
- [Vsn, erlang:system_info(otp_release),
- ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
+ rabbit_log:info("Starting RabbitMQ ~s on Erlang ~s~n~s~n~s~n",
+ [Vsn, erlang:system_info(otp_release),
+ ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
print_banner(),
log_banner(),
- [ok = run_boot_step(Step) || Step <- boot_steps()],
+ warn_if_kernel_config_dubious(),
+ run_boot_steps(),
{ok, SupPid};
Error ->
Error
@@ -483,42 +518,42 @@ stop(_State) ->
ok.
%%---------------------------------------------------------------------------
-%% application life cycle
+%% boot step logic
-app_startup_order() ->
- ok = app_utils:load_applications(?APPS),
- app_utils:app_dependency_order(?APPS, false).
+run_boot_steps() ->
+ run_boot_steps([App || {App, _, _} <- application:loaded_applications()]).
-app_shutdown_order() ->
- Apps = ?APPS ++ rabbit_plugins:active(),
- app_utils:app_dependency_order(Apps, true).
+run_boot_steps(Apps) ->
+ [ok = run_step(Step, Attrs, mfa) || {_, Step, Attrs} <- find_steps(Apps)],
+ ok.
-%%---------------------------------------------------------------------------
-%% boot step logic
+find_steps(Apps) ->
+ All = sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)),
+ [Step || {App, _, _} = Step <- All, lists:member(App, Apps)].
-run_boot_step({_StepName, Attributes}) ->
- case [MFA || {mfa, MFA} <- Attributes] of
+run_step(StepName, Attributes, AttributeName) ->
+ case [MFA || {Key, MFA} <- Attributes,
+ Key =:= AttributeName] of
[] ->
ok;
MFAs ->
[try
apply(M,F,A)
of
- ok -> ok;
- {error, Reason} -> boot_error(Reason, not_available)
+ ok -> ok;
+ {error, Reason} -> boot_error({boot_step, StepName, Reason},
+ not_available)
catch
- _:Reason -> boot_error(Reason, erlang:get_stacktrace())
+ _:Reason -> boot_error({boot_step, StepName, Reason},
+ erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
ok
end.
-boot_steps() ->
- sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
-
-vertices(_Module, Steps) ->
- [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
+vertices({AppName, _Module, Steps}) ->
+ [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps].
-edges(_Module, Steps) ->
+edges({_AppName, _Module, Steps}) ->
[case Key of
requires -> {StepName, OtherStep};
enables -> {OtherStep, StepName}
@@ -527,7 +562,7 @@ edges(_Module, Steps) ->
Key =:= requires orelse Key =:= enables].
sort_boot_steps(UnsortedSteps) ->
- case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
+ case rabbit_misc:build_acyclic_graph(fun vertices/1, fun edges/1,
UnsortedSteps) of
{ok, G} ->
%% Use topological sort to find a consistent ordering (if
@@ -541,8 +576,8 @@ sort_boot_steps(UnsortedSteps) ->
digraph:delete(G),
%% Check that all mentioned {M,F,A} triples are exported.
case [{StepName, {M,F,A}} ||
- {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
+ {_App, StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
MissingFunctions -> basic_boot_error(
@@ -603,7 +638,7 @@ boot_error(Reason, Fmt, Args, Stacktrace) ->
basic_boot_error(Reason, Format, Args) ->
io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
- rabbit_misc:local_info_msg(Format, Args),
+ rabbit_log:info(Format, Args),
timer:sleep(1000),
exit({?MODULE, failure_during_boot, Reason}).
@@ -727,11 +762,11 @@ force_event_refresh(Ref) ->
%% misc
log_broker_started(Plugins) ->
- rabbit_misc:with_local_io(
+ rabbit_log:with_local_io(
fun() ->
PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P])
|| P <- Plugins]),
- error_logger:info_msg(
+ rabbit_log:info(
"Server startup complete; ~b plugins started.~n~s",
[length(Plugins), PluginList]),
io:format(" completed with ~p plugins.~n", [length(Plugins)])
@@ -780,7 +815,31 @@ log_banner() ->
{K, V} ->
Format(K, V)
end || S <- Settings]),
- error_logger:info_msg("~s", [Banner]).
+ rabbit_log:info("~s", [Banner]).
+
+warn_if_kernel_config_dubious() ->
+ case erlang:system_info(kernel_poll) of
+ true -> ok;
+ false -> rabbit_log:warning(
+ "Kernel poll (epoll, kqueue, etc) is disabled. Throughput "
+ "and CPU utilization may worsen.~n")
+ end,
+ AsyncThreads = erlang:system_info(thread_pool_size),
+ case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of
+ true -> rabbit_log:warning(
+ "Erlang VM is running with ~b I/O threads, "
+ "file I/O performance may worsen~n", [AsyncThreads]);
+ false -> ok
+ end,
+ IDCOpts = case application:get_env(kernel, inet_default_connect_options) of
+ undefined -> [];
+ {ok, Val} -> Val
+ end,
+ case proplists:get_value(nodelay, IDCOpts, false) of
+ false -> rabbit_log:warning("Nagle's algorithm is enabled for sockets, "
+ "network I/O latency will be higher~n");
+ true -> ok
+ end.
home_dir() ->
case init:get_argument(home) of