diff options
Diffstat (limited to 'src/rabbit.erl')
| -rw-r--r-- | src/rabbit.erl | 159 |
1 files changed, 113 insertions, 46 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 29e38c1f5f..191f04a425 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -22,10 +22,9 @@ stop_and_halt/0, await_startup/0, status/0, is_running/0, is_running/1, environment/0, rotate_logs/1, force_event_refresh/1, start_fhc/0]). - -export([start/2, stop/1]). - --export([log_location/1]). %% for testing +-export([start_apps/1, stop_apps/1]). +-export([log_location/1, config_files/0]). %% for testing and mgmt-agent %%--------------------------------------------------------------------------- %% Boot steps. @@ -202,6 +201,7 @@ %% practice 2 processes seems just as fast as any other number > 1, %% and keeps the progress bar realistic-ish. -define(HIPE_PROCESSES, 2). +-define(ASYNC_THREADS_WARNING_THRESHOLD, 8). %%---------------------------------------------------------------------------- @@ -211,6 +211,7 @@ %% this really should be an abstract type -type(log_location() :: 'tty' | 'undefined' | file:filename()). -type(param() :: atom()). +-type(app_name() :: atom()). -spec(start/0 :: () -> 'ok'). -spec(boot/0 :: () -> 'ok'). @@ -242,6 +243,8 @@ -spec(maybe_insert_default_data/0 :: () -> 'ok'). -spec(boot_delegate/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). +-spec(start_apps/1 :: ([app_name()]) -> 'ok'). +-spec(stop_apps/1 :: ([app_name()]) -> 'ok'). -endif. @@ -312,9 +315,7 @@ start() -> ok = ensure_working_log_handlers(), rabbit_node_monitor:prepare_cluster_status_files(), rabbit_mnesia:check_cluster_consistency(), - ok = app_utils:start_applications( - app_startup_order(), fun handle_app_error/2), - ok = log_broker_started(rabbit_plugins:active()) + broker_start() end). boot() -> @@ -329,21 +330,14 @@ boot() -> %% the upgrade, since if we are a secondary node the %% primary node will have forgotten us rabbit_mnesia:check_cluster_consistency(), - Plugins = rabbit_plugins:setup(), - ToBeLoaded = Plugins ++ ?APPS, - ok = app_utils:load_applications(ToBeLoaded), - StartupApps = app_utils:app_dependency_order(ToBeLoaded, - false), - ok = app_utils:start_applications( - StartupApps, fun handle_app_error/2), - ok = log_broker_started(Plugins) + broker_start() end). -handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> - throw({could_not_start, App, Reason}); - -handle_app_error(App, Reason) -> - throw({could_not_start, App, Reason}). +broker_start() -> + Plugins = rabbit_plugins:setup(), + ToBeLoaded = Plugins ++ ?APPS, + start_apps(ToBeLoaded), + ok = log_broker_started(rabbit_plugins:active()). start_it(StartFun) -> Marker = spawn_link(fun() -> receive stop -> ok end end), @@ -371,10 +365,11 @@ start_it(StartFun) -> stop() -> case whereis(rabbit_boot) of undefined -> ok; - _ -> await_startup() + _ -> await_startup(true) end, rabbit_log:info("Stopping RabbitMQ~n"), - ok = app_utils:stop_applications(app_shutdown_order()). + Apps = ?APPS ++ rabbit_plugins:active(), + stop_apps(app_utils:app_dependency_order(Apps, true)). stop_and_halt() -> try @@ -385,8 +380,51 @@ stop_and_halt() -> end, ok. +start_apps(Apps) -> + app_utils:load_applications(Apps), + OrderedApps = app_utils:app_dependency_order(Apps, false), + case lists:member(rabbit, Apps) of + false -> run_boot_steps(Apps); %% plugin activation + true -> ok %% will run during start of rabbit app + end, + ok = app_utils:start_applications(OrderedApps, + handle_app_error(could_not_start)). + +stop_apps(Apps) -> + ok = app_utils:stop_applications( + Apps, handle_app_error(error_during_shutdown)), + case lists:member(rabbit, Apps) of + false -> run_cleanup_steps(Apps); %% plugin deactivation + true -> ok %% it's all going anyway + end, + ok. + +handle_app_error(Term) -> + fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) -> + throw({Term, App, ExitReason}); + (App, Reason) -> + throw({Term, App, Reason}) + end. + +run_cleanup_steps(Apps) -> + [run_step(Name, Attrs, cleanup) || {_, Name, Attrs} <- find_steps(Apps)], + ok. + await_startup() -> - app_utils:wait_for_applications(app_startup_order()). + await_startup(false). + +await_startup(HaveSeenRabbitBoot) -> + %% We don't take absence of rabbit_boot as evidence we've started, + %% since there's a small window before it is registered. + case whereis(rabbit_boot) of + undefined -> case HaveSeenRabbitBoot orelse is_running() of + true -> ok; + false -> timer:sleep(100), + await_startup(false) + end; + _ -> timer:sleep(100), + await_startup(true) + end. status() -> S1 = [{pid, list_to_integer(os:getpid())}, @@ -437,6 +475,9 @@ listeners() -> ip_address = IP, port = Port} <- Listeners, Node =:= node()]. +%% TODO this only determines if the rabbit application has started, +%% not if it is running, never mind plugins. It would be nice to have +%% more nuance here. is_running() -> is_running(node()). is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit). @@ -468,7 +509,8 @@ start(normal, []) -> true = register(rabbit, self()), print_banner(), log_banner(), - [ok = run_boot_step(Step) || Step <- boot_steps()], + warn_if_kernel_config_dubious(), + run_boot_steps(), {ok, SupPid}; Error -> Error @@ -483,42 +525,42 @@ stop(_State) -> ok. %%--------------------------------------------------------------------------- -%% application life cycle +%% boot step logic -app_startup_order() -> - ok = app_utils:load_applications(?APPS), - app_utils:app_dependency_order(?APPS, false). +run_boot_steps() -> + run_boot_steps([App || {App, _, _} <- application:loaded_applications()]). -app_shutdown_order() -> - Apps = ?APPS ++ rabbit_plugins:active(), - app_utils:app_dependency_order(Apps, true). +run_boot_steps(Apps) -> + [ok = run_step(Step, Attrs, mfa) || {_, Step, Attrs} <- find_steps(Apps)], + ok. -%%--------------------------------------------------------------------------- -%% boot step logic +find_steps(Apps) -> + All = sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)), + [Step || {App, _, _} = Step <- All, lists:member(App, Apps)]. -run_boot_step({_StepName, Attributes}) -> - case [MFA || {mfa, MFA} <- Attributes] of +run_step(StepName, Attributes, AttributeName) -> + case [MFA || {Key, MFA} <- Attributes, + Key =:= AttributeName] of [] -> ok; MFAs -> [try apply(M,F,A) of - ok -> ok; - {error, Reason} -> boot_error(Reason, not_available) + ok -> ok; + {error, Reason} -> boot_error({boot_step, StepName, Reason}, + not_available) catch - _:Reason -> boot_error(Reason, erlang:get_stacktrace()) + _:Reason -> boot_error({boot_step, StepName, Reason}, + erlang:get_stacktrace()) end || {M,F,A} <- MFAs], ok end. -boot_steps() -> - sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). +vertices({AppName, _Module, Steps}) -> + [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps]. -vertices(_Module, Steps) -> - [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. - -edges(_Module, Steps) -> +edges({_AppName, _Module, Steps}) -> [case Key of requires -> {StepName, OtherStep}; enables -> {OtherStep, StepName} @@ -527,7 +569,7 @@ edges(_Module, Steps) -> Key =:= requires orelse Key =:= enables]. sort_boot_steps(UnsortedSteps) -> - case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2, + case rabbit_misc:build_acyclic_graph(fun vertices/1, fun edges/1, UnsortedSteps) of {ok, G} -> %% Use topological sort to find a consistent ordering (if @@ -541,8 +583,8 @@ sort_boot_steps(UnsortedSteps) -> digraph:delete(G), %% Check that all mentioned {M,F,A} triples are exported. case [{StepName, {M,F,A}} || - {StepName, Attributes} <- SortedSteps, - {mfa, {M,F,A}} <- Attributes, + {_App, StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, not erlang:function_exported(M, F, length(A))] of [] -> SortedSteps; MissingFunctions -> basic_boot_error( @@ -782,6 +824,31 @@ log_banner() -> end || S <- Settings]), error_logger:info_msg("~s", [Banner]). +warn_if_kernel_config_dubious() -> + case erlang:system_info(kernel_poll) of + true -> ok; + false -> error_logger:warning_msg( + "Kernel poll (epoll, kqueue, etc) is disabled. Throughput " + "and CPU utilization may worsen.~n") + end, + AsyncThreads = erlang:system_info(thread_pool_size), + case AsyncThreads < ?ASYNC_THREADS_WARNING_THRESHOLD of + true -> error_logger:warning_msg( + "Erlang VM is running with ~b I/O threads, " + "file I/O performance may worsen~n", [AsyncThreads]); + false -> ok + end, + IDCOpts = case application:get_env(kernel, inet_default_connect_options) of + undefined -> []; + {ok, Val} -> Val + end, + case proplists:get_value(nodelay, IDCOpts, false) of + false -> error_logger:warning_msg( + "Nagle's algorithm is enabled for sockets, " + "network I/O latency will be higher~n"); + true -> ok + end. + home_dir() -> case init:get_argument(home) of {ok, [[Home]]} -> Home; |
