diff options
| -rwxr-xr-x | scripts/rabbitmq-plugins | 2 | ||||
| -rwxr-xr-x | scripts/rabbitmq-plugins.bat | 6 | ||||
| -rw-r--r-- | src/app_utils.erl | 9 | ||||
| -rw-r--r-- | src/rabbit.erl | 221 | ||||
| -rw-r--r-- | src/rabbit_alarm.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_boot.erl | 403 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_plugins.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_plugins_main.erl | 60 |
10 files changed, 539 insertions, 218 deletions
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins index 90eb5a5d77..c6f3ff9c49 100755 --- a/scripts/rabbitmq-plugins +++ b/scripts/rabbitmq-plugins @@ -21,6 +21,7 @@ ##--- Set environment vars RABBITMQ_<var_name> to defaults if not set +[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE} [ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR} @@ -35,4 +36,5 @@ exec ${ERL_DIR}erl \ -s rabbit_plugins_main \ -enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \ -plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \ + -nodename $RABBITMQ_NODENAME \ -extra "$@" diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat index 0d1f128ea1..252ecb3a20 100755 --- a/scripts/rabbitmq-plugins.bat +++ b/scripts/rabbitmq-plugins.bat @@ -31,6 +31,10 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!RABBITMQ_NODENAME!"=="" (
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -51,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" ( set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl index 5ae2d2954e..5eeb692425 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -17,7 +17,7 @@ -export([load_applications/1, start_applications/1, start_applications/2, stop_applications/1, stop_applications/2, app_dependency_order/2, - wait_for_applications/1]). + wait_for_applications/1, app_dependencies/1]). -ifdef(use_specs). @@ -30,6 +30,7 @@ -spec stop_applications([atom()], error_handler()) -> 'ok'. -spec wait_for_applications([atom()]) -> 'ok'. -spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()]. +-spec app_dependencies(atom()) -> [atom()]. -endif. @@ -68,7 +69,6 @@ stop_applications(Apps, ErrorHandler) -> ErrorHandler, Apps). - wait_for_applications(Apps) -> [wait_for_application(App) || App <- Apps], ok. @@ -80,8 +80,9 @@ app_dependency_order(RootApps, StripUnreachable) -> {App, _Desc, _Vsn} <- application:loaded_applications()]), try case StripUnreachable of - true -> digraph:del_vertices(G, digraph:vertices(G) -- - digraph_utils:reachable(RootApps, G)); + true -> digraph:del_vertices( + G, digraph:vertices(G) -- + digraph_utils:reachable(RootApps, G)); false -> ok end, digraph_utils:topsort(G) diff --git a/src/rabbit.erl b/src/rabbit.erl index bd4f1dbc88..081e2e229a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -305,76 +305,40 @@ ensure_application_loaded() -> end. start() -> - start_it(fun() -> - %% We do not want to HiPE compile or upgrade - %% mnesia after just restarting the app - ok = ensure_application_loaded(), - ok = ensure_working_log_handlers(), - rabbit_node_monitor:prepare_cluster_status_files(), - rabbit_mnesia:check_cluster_consistency(), - ok = app_utils:start_applications( - app_startup_order(), fun handle_app_error/2), - ok = log_broker_started(rabbit_plugins:active()) - end). + rabbit_boot:boot_with( + fun() -> + %% We do not want to HiPE compile or upgrade + %% mnesia after just restarting the app + ok = ensure_application_loaded(), + ok = ensure_working_log_handlers(), + rabbit_node_monitor:prepare_cluster_status_files(), + rabbit_mnesia:check_cluster_consistency(), + ok = rabbit_boot:start(app_startup_order()), + ok = log_broker_started(rabbit_plugins:active()) + end). boot() -> - start_it(fun() -> - ok = ensure_application_loaded(), - Success = maybe_hipe_compile(), - ok = ensure_working_log_handlers(), - warn_if_hipe_compilation_failed(Success), - rabbit_node_monitor:prepare_cluster_status_files(), - ok = rabbit_upgrade:maybe_upgrade_mnesia(), - %% It's important that the consistency check happens after - %% the upgrade, since if we are a secondary node the - %% primary node will have forgotten us - rabbit_mnesia:check_cluster_consistency(), - Plugins = rabbit_plugins:setup(), - ToBeLoaded = Plugins ++ ?APPS, - ok = app_utils:load_applications(ToBeLoaded), - StartupApps = app_utils:app_dependency_order(ToBeLoaded, - false), - ok = app_utils:start_applications( - StartupApps, fun handle_app_error/2), - ok = log_broker_started(Plugins) - end). - -handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> - throw({could_not_start, App, Reason}); - -handle_app_error(App, Reason) -> - throw({could_not_start, App, Reason}). - -start_it(StartFun) -> - Marker = spawn_link(fun() -> receive stop -> ok end end), - case catch register(rabbit_boot, Marker) of - true -> try - case is_running() of - true -> ok; - false -> StartFun() - end - catch - throw:{could_not_start, _App, _Reason}=Err -> - boot_error(Err, not_available); - _:Reason -> - boot_error(Reason, erlang:get_stacktrace()) - after - unlink(Marker), - Marker ! stop, - %% give the error loggers some time to catch up - timer:sleep(100) - end; - _ -> unlink(Marker), - Marker ! stop - end. + rabbit_boot:boot_with( + fun() -> + ok = ensure_application_loaded(), + Success = maybe_hipe_compile(), + ok = ensure_working_log_handlers(), + warn_if_hipe_compilation_failed(Success), + rabbit_node_monitor:prepare_cluster_status_files(), + ok = rabbit_upgrade:maybe_upgrade_mnesia(), + %% It's important that the consistency check happens after + %% the upgrade, since if we are a secondary node the + %% primary node will have forgotten us + rabbit_mnesia:check_cluster_consistency(), + Plugins = rabbit_plugins:setup(), + ToBeLoaded = Plugins ++ ?APPS, + ok = rabbit_boot:start(ToBeLoaded), + ok = log_broker_started(Plugins) + end). stop() -> - case whereis(rabbit_boot) of - undefined -> ok; - _ -> await_startup() - end, rabbit_log:info("Stopping RabbitMQ~n"), - ok = app_utils:stop_applications(app_shutdown_order()). + rabbit_boot:shutdown(app_shutdown_order()). stop_and_halt() -> try @@ -455,7 +419,7 @@ start(normal, []) -> true = register(rabbit, self()), print_banner(), log_banner(), - [ok = run_boot_step(Step) || Step <- boot_steps()], + rabbit_boot:run_boot_steps(), {ok, SupPid}; Error -> Error @@ -467,6 +431,7 @@ stop(_State) -> true -> rabbit_amqqueue:on_node_down(node()); false -> rabbit_table:clear_ram_only_tables() end, + ok = rabbit_boot:shutdown(), ok. %%--------------------------------------------------------------------------- @@ -481,120 +446,6 @@ app_shutdown_order() -> app_utils:app_dependency_order(Apps, true). %%--------------------------------------------------------------------------- -%% boot step logic - -run_boot_step({_StepName, Attributes}) -> - case [MFA || {mfa, MFA} <- Attributes] of - [] -> - ok; - MFAs -> - [try - apply(M,F,A) - of - ok -> ok; - {error, Reason} -> boot_error(Reason, not_available) - catch - _:Reason -> boot_error(Reason, erlang:get_stacktrace()) - end || {M,F,A} <- MFAs], - ok - end. - -boot_steps() -> - sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). - -vertices(_Module, Steps) -> - [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. - -edges(_Module, Steps) -> - [case Key of - requires -> {StepName, OtherStep}; - enables -> {OtherStep, StepName} - end || {StepName, Atts} <- Steps, - {Key, OtherStep} <- Atts, - Key =:= requires orelse Key =:= enables]. - -sort_boot_steps(UnsortedSteps) -> - case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2, - UnsortedSteps) of - {ok, G} -> - %% Use topological sort to find a consistent ordering (if - %% there is one, otherwise fail). - SortedSteps = lists:reverse( - [begin - {StepName, Step} = digraph:vertex(G, - StepName), - Step - end || StepName <- digraph_utils:topsort(G)]), - digraph:delete(G), - %% Check that all mentioned {M,F,A} triples are exported. - case [{StepName, {M,F,A}} || - {StepName, Attributes} <- SortedSteps, - {mfa, {M,F,A}} <- Attributes, - not erlang:function_exported(M, F, length(A))] of - [] -> SortedSteps; - MissingFunctions -> basic_boot_error( - {missing_functions, MissingFunctions}, - "Boot step functions not exported: ~p~n", - [MissingFunctions]) - end; - {error, {vertex, duplicate, StepName}} -> - basic_boot_error({duplicate_boot_step, StepName}, - "Duplicate boot step name: ~w~n", [StepName]); - {error, {edge, Reason, From, To}} -> - basic_boot_error( - {invalid_boot_step_dependency, From, To}, - "Could not add boot step dependency of ~w on ~w:~n~s", - [To, From, - case Reason of - {bad_vertex, V} -> - io_lib:format("Boot step not registered: ~w~n", [V]); - {bad_edge, [First | Rest]} -> - [io_lib:format("Cyclic dependency: ~w", [First]), - [io_lib:format(" depends on ~w", [Next]) || - Next <- Rest], - io_lib:format(" depends on ~w~n", [First])] - end]) - end. - --ifdef(use_specs). --spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()). --endif. -boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> - AllNodes = rabbit_mnesia:cluster_nodes(all), - {Err, Nodes} = - case AllNodes -- [node()] of - [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" - " shut down forcefully~nit cannot determine which nodes" - " are timing out.~n", []}; - Ns -> {rabbit_misc:format( - "Timeout contacting cluster nodes: ~p.~n", [Ns]), - Ns} - end, - basic_boot_error(Term, - Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []); -boot_error(Reason, Stacktrace) -> - Fmt = "Error description:~n ~p~n~n" ++ - "Log files (may contain more information):~n ~s~n ~s~n~n", - Args = [Reason, log_location(kernel), log_location(sasl)], - boot_error(Reason, Fmt, Args, Stacktrace). - --ifdef(use_specs). --spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()]) - -> no_return()). --endif. -boot_error(Reason, Fmt, Args, not_available) -> - basic_boot_error(Reason, Fmt, Args); -boot_error(Reason, Fmt, Args, Stacktrace) -> - basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n", - Args ++ [Stacktrace]). - -basic_boot_error(Reason, Format, Args) -> - io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), - rabbit_misc:local_info_msg(Format, Args), - timer:sleep(1000), - exit({?MODULE, failure_during_boot, Reason}). - -%%--------------------------------------------------------------------------- %% boot step functions boot_delegate() -> @@ -615,12 +466,12 @@ maybe_insert_default_data() -> end. insert_default_data() -> - {ok, DefaultUser} = application:get_env(default_user), - {ok, DefaultPass} = application:get_env(default_pass), - {ok, DefaultTags} = application:get_env(default_user_tags), - {ok, DefaultVHost} = application:get_env(default_vhost), + {ok, DefaultUser} = application:get_env(rabbit, default_user), + {ok, DefaultPass} = application:get_env(rabbit, default_pass), + {ok, DefaultTags} = application:get_env(rabbit, default_user_tags), + {ok, DefaultVHost} = application:get_env(rabbit, default_vhost), {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} = - application:get_env(default_permissions), + application:get_env(rabbit, default_permissions), ok = rabbit_vhost:add(DefaultVHost), ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass), ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index cd1d125bd5..ac2fb42f11 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -53,7 +53,8 @@ start_link() -> start() -> ok = rabbit_sup:start_restartable_child(?MODULE), ok = gen_event:add_handler(?SERVER, ?MODULE, []), - {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), + {ok, MemoryWatermark} = application:get_env(rabbit, + vm_memory_high_watermark), rabbit_sup:start_restartable_child( vm_memory_monitor, [MemoryWatermark, fun (Alarm) -> @@ -61,7 +62,8 @@ start() -> set_alarm(Alarm) end, fun clear_alarm/1]), - {ok, DiskLimit} = application:get_env(disk_free_limit), + {ok, DiskLimit} = application:get_env(rabbit, + disk_free_limit), rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]), ok. diff --git a/src/rabbit_boot.erl b/src/rabbit_boot.erl new file mode 100644 index 0000000000..1f8fcf1d19 --- /dev/null +++ b/src/rabbit_boot.erl @@ -0,0 +1,403 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_boot). + +-export([boot_with/1, shutdown/1]). +-export([start/1, stop/1]). +-export([run_boot_steps/0]). +-export([boot_error/2, boot_error/4]). + +-ifdef(use_specs). + +-spec(boot_with/1 :: (fun(() -> 'ok')) -> 'ok'). +-spec(shutdown/1 :: ([atom()]) -> 'ok'). +-spec(start/1 :: ([atom()]) -> 'ok'). +-spec(stop/1 :: ([atom()]) -> 'ok'). +-spec(run_boot_steps/0 :: () -> 'ok'). +-spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()). +-spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()]) + -> no_return()). + +-endif. + +-define(BOOT_FILE, "boot.info"). + +%% +%% When the broker is starting, we must run all the boot steps within the +%% rabbit:start/2 application callback, after rabbit_sup has started and +%% before any plugin applications begin to start. To achieve this, we process +%% the boot steps from all loaded applications. +%% +%% If the broker is already running however, we must run all boot steps for +%% each application/plugin we're starting, plus any other (dependent) steps. +%% To achieve this, we process the boot steps from all loaded applications, +%% but skip those that have already been run (i.e., steps that have been run +%% once whilst, or even since the broker started). +%% +%% Tracking which boot steps have already been run is done via an ets table. +%% Because we frequently find ourselves needing to query this table without +%% the rabbit application running (e.g., during the initial boot sequence +%% and when we've "cold started" a node without any running applications), +%% this table is serialized to a file after each operation. When the node is +%% stopped cleanly, the file is deleted. When a node is in the process of +%% starting, the file is also removed and replaced (since we do not want to +%% track boot steps from a previous incarnation of the node). +%% + +%%--------------------------------------------------------------------------- +%% Public API + +boot_with(StartFun) -> + %% TODO: this should be done with monitors, not links, I think + Marker = spawn_link(fun() -> receive stop -> ok end end), + case catch register(rabbit_boot, Marker) of + true -> try + case rabbit:is_running() of + true -> ok; + false -> StartFun() + end + catch + throw:{could_not_start, _App, _Reason}=Err -> + boot_error(Err, not_available); + _:Reason -> + boot_error(Reason, erlang:get_stacktrace()) + after + unlink(Marker), + Marker ! stop, + %% give the error loggers some time to catch up + timer:sleep(100) + end; + _ -> unlink(Marker), + Marker ! stop + end. + +shutdown(Apps) -> + try + case whereis(?MODULE) of + undefined -> ok; + _ -> await_startup(Apps) + end, + %% TODO: put this back in somewhere sensible... + %% rabbit_log:info("Stopping RabbitMQ~n"), + ok = app_utils:stop_applications(Apps) + after + delete_boot_table() + end. + +start(Apps) -> + try + ensure_boot_table(), + force_reload(Apps), + StartupApps = app_utils:app_dependency_order(Apps, false), + case whereis(?MODULE) of + undefined -> run_boot_steps(); + _ -> ok + end, + ok = app_utils:start_applications(StartupApps, + handle_app_error(could_not_start)) + after + save_boot_table() + end. + +stop(Apps) -> + ensure_boot_table(), + try + ok = app_utils:stop_applications( + Apps, handle_app_error(error_during_shutdown)) + after + try + BootSteps = load_steps(boot), + ToDelete = [Step || {App, _, _}=Step <- BootSteps, + lists:member(App, Apps)], + [ets:delete(?MODULE, Step) || {_, Step, _} <- ToDelete], + run_cleanup_steps(Apps) + after + save_boot_table() + end, + [begin + {ok, Mods} = application:get_key(App, modules), + [begin + code:soft_purge(Mod), + code:delete(Mod), + false = code:is_loaded(Mod) + end || Mod <- Mods], + application:unload(App) + end || App <- Apps] + end. + +run_cleanup_steps(Apps) -> + Completed = sets:new(), + lists:foldl( + fun({_, Name, _}=Step, Acc) -> + case sets:is_element(Name, Completed) of + true -> Acc; + false -> run_cleanup_step(Step), + sets:add_element(Name, Acc) + end + end, + Completed, + [Step || {App, _, _}=Step <- load_steps(cleanup), + lists:member(App, Apps)]), + ok. + +run_boot_steps() -> + Steps = load_steps(boot), + [ok = run_boot_step(Step) || Step <- Steps], + ok. + +load_steps(Type) -> + StepAttrs = rabbit_misc:all_module_attributes_with_app(rabbit_boot_step), + sort_boot_steps( + Type, + lists:usort( + [{Mod, {AppName, Steps}} || {AppName, Mod, Steps} <- StepAttrs])). + +boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> + AllNodes = rabbit_mnesia:cluster_nodes(all), + {Err, Nodes} = + case AllNodes -- [node()] of + [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" + " shut down forcefully~nit cannot determine which nodes" + " are timing out.~n", []}; + Ns -> {rabbit_misc:format( + "Timeout contacting cluster nodes: ~p.~n", [Ns]), + Ns} + end, + basic_boot_error(Term, + Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []); +boot_error(Reason, Stacktrace) -> + Fmt = "Error description:~n ~p~n~n" ++ + "Log files (may contain more information):~n ~s~n ~s~n~n", + Args = [Reason, log_location(kernel), log_location(sasl)], + boot_error(Reason, Fmt, Args, Stacktrace). + +boot_error(Reason, Fmt, Args, not_available) -> + basic_boot_error(Reason, Fmt, Args); +boot_error(Reason, Fmt, Args, Stacktrace) -> + basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n", + Args ++ [Stacktrace]). + +%%--------------------------------------------------------------------------- +%% Private API + +force_reload(Apps) -> + ok = app_utils:load_applications(Apps), + ok = do_reload(Apps). + +do_reload([App|Apps]) -> + case application:get_key(App, modules) of + {ok, Modules} -> reload_all(Modules); + _ -> ok + end, + force_reload(Apps); +do_reload([]) -> + ok. + +reload_all(Modules) -> + [begin + case code:soft_purge(Mod) of + true -> load_mod(Mod); + false -> ok + end + end || Mod <- Modules]. + +load_mod(Mod) -> + case code:is_loaded(Mod) of + {file, Path} when Path /= 'preloaded' -> code:load_abs(Path); + _ -> code:load_file(Mod) + end. + +await_startup(Apps) -> + app_utils:wait_for_applications(Apps). + +delete_boot_table() -> + case filelib:is_file(boot_file()) of + true -> file:delete(boot_file()); + false -> ok + end. + +ensure_boot_table() -> + case whereis(?MODULE) of + undefined -> + case filelib:is_file(boot_file()) of + true -> load_table(); + false -> clean_table() + end; + _Pid -> + clean_table() + end. + +clean_table() -> + delete_boot_table(), + case ets:info(?MODULE) of + undefined -> + ets:new(?MODULE, [named_table, public, ordered_set]); + _ -> + ok + end. + +load_table() -> + case ets:file2tab(boot_file(), [{verify, true}]) of + {error, _} -> clean_table(); + {ok, _Tab} -> ok + end. + +save_boot_table() -> + delete_boot_table(), + case ets:info(?MODULE) of + undefined -> ok; + _ -> ets:tab2file(?MODULE, boot_file(), + [{extended_info, [object_count]}]), + ets:delete(?MODULE) + end. + +boot_file() -> + filename:join(rabbit_mnesia:dir(), ?BOOT_FILE). + +handle_app_error(Term) -> + fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) -> + throw({Term, App, ExitReason}); + (App, Reason) -> + throw({Term, App, Reason}) + end. + +run_cleanup_step({_, _, Attributes}) -> + run_step_name(Attributes, cleanup). + +run_boot_step({_, StepName, Attributes}) -> + case catch already_run(StepName) of + false -> ok = run_step_name(Attributes, mfa), + mark_complete(StepName); + _ -> ok + end, + ok. + +run_step_name(Attributes, AttributeName) -> + case [MFA || {Key, MFA} <- Attributes, + Key =:= AttributeName] of + [] -> + ok; + MFAs -> + [try + apply(M,F,A) + of + ok -> ok; + {error, Reason} -> boot_error(Reason, not_available) + catch + _:Reason -> boot_error(Reason, erlang:get_stacktrace()) + end || {M,F,A} <- MFAs], + ok + end. + +already_run(StepName) -> + ets:member(?MODULE, StepName). + +mark_complete(StepName) -> + ets:insert(?MODULE, {StepName}). + +basic_boot_error(Reason, Format, Args) -> + io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), + rabbit_misc:local_info_msg(Format, Args), + timer:sleep(1000), + exit({?MODULE, failure_during_boot, Reason}). + +%% TODO: move me to rabbit_misc +log_location(Type) -> + case application:get_env(rabbit, case Type of + kernel -> error_logger; + sasl -> sasl_error_logger + end) of + {ok, {file, File}} -> File; + {ok, false} -> undefined; + {ok, tty} -> tty; + {ok, silent} -> undefined; + {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}}); + _ -> undefined + end. + +vertices(_Module, {AppName, Steps}) -> + [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps]. + +edges(Type) -> + %% When running "boot" steps, both hard _and_ soft dependencies are + %% considered equally. When running "cleanup" steps however, we only + %% consider /hard/ dependencies (i.e., of the form + %% {DependencyType, {hard, StepName}}) as dependencies. + fun (_Module, {_AppName, Steps}) -> + [case Key of + requires -> {StepName, strip_type(OtherStep)}; + enables -> {strip_type(OtherStep), StepName} + end || {StepName, Atts} <- Steps, + {Key, OtherStep} <- Atts, + filter_dependent_steps(Key, OtherStep, Type)] + end. + +filter_dependent_steps(Key, Dependency, Type) + when Key =:= requires orelse Key =:= enables -> + case {Dependency, Type} of + {{hard, _}, cleanup} -> true; + {_SoftReqs, cleanup} -> false; + {_, boot} -> true + end; +filter_dependent_steps(_, _, _) -> + false. + +strip_type({hard, Step}) -> Step; +strip_type(Step) -> Step. + +sort_boot_steps(Type, UnsortedSteps) -> + case rabbit_misc:build_acyclic_graph(fun vertices/2, edges(Type), + UnsortedSteps) of + {ok, G} -> + %% Use topological sort to find a consistent ordering (if + %% there is one, otherwise fail). + SortedSteps = lists:reverse( + [begin + {StepName, Step} = digraph:vertex(G, + StepName), + Step + end || StepName <- digraph_utils:topsort(G)]), + digraph:delete(G), + %% Check that all mentioned {M,F,A} triples are exported. + case [{StepName, {M,F,A}} || + {_App, StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, + not erlang:function_exported(M, F, length(A))] of + [] -> SortedSteps; + MissingFunctions -> basic_boot_error( + {missing_functions, MissingFunctions}, + "Boot step functions not exported: ~p~n", + [MissingFunctions]) + end; + {error, {vertex, duplicate, StepName}} -> + basic_boot_error({duplicate_boot_step, StepName}, + "Duplicate boot step name: ~w~n", [StepName]); + {error, {edge, Reason, From, To}} -> + basic_boot_error( + {invalid_boot_step_dependency, From, To}, + "Could not add boot step dependency of ~w on ~w:~n~s", + [To, From, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) || + Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]) + end. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 848c4a8721..35afc8fd10 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -51,6 +51,7 @@ -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([parse_arguments/3]). +-export([all_module_attributes_with_app/1]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([const/1]). @@ -210,6 +211,8 @@ -> {'ok', {atom(), [{string(), string()}], [string()]}} | 'no_command'). -spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(all_module_attributes_with_app/1 :: + (atom()) -> [{atom(), atom(), [term()]}]). -spec(build_acyclic_graph/3 :: (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) -> rabbit_types:ok_or_error2(digraph(), @@ -852,21 +855,38 @@ module_attributes(Module) -> V end. +all_module_attributes_with_app(Name) -> + find_module_attributes( + fun(App, Modules) -> + [{App, Module} || Module <- Modules] + end, + fun ({App, Module}, Acc) -> + case lists:append([Atts || {N, Atts} <- module_attributes(Module), + N =:= Name]) of + [] -> Acc; + Atts -> [{App, Module, Atts} | Acc] + end + end). + all_module_attributes(Name) -> - Modules = - lists:usort( - lists:append( - [Modules || {App, _, _} <- application:loaded_applications(), - {ok, Modules} <- [application:get_key(App, modules)]])), - lists:foldl( + find_module_attributes( + fun(_App, Modules) -> Modules end, fun (Module, Acc) -> case lists:append([Atts || {N, Atts} <- module_attributes(Module), N =:= Name]) of [] -> Acc; Atts -> [{Module, Atts} | Acc] end - end, [], Modules). + end). +find_module_attributes(Generator, Fold) -> + Targets = + lists:usort( + lists:append( + [Generator(App, Modules) || + {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), + lists:foldl(Fold, [], Targets). build_acyclic_graph(VertexFun, EdgeFun, Graph) -> G = digraph:new([acyclic]), diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 42438790c0..b1016df1ef 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -125,12 +125,12 @@ boot() -> ok = boot_ssl(). boot_tcp() -> - {ok, TcpListeners} = application:get_env(tcp_listeners), + {ok, TcpListeners} = application:get_env(rabbit, tcp_listeners), [ok = start_tcp_listener(Listener) || Listener <- TcpListeners], ok. boot_ssl() -> - case application:get_env(ssl_listeners) of + case application:get_env(rabbit, ssl_listeners) of {ok, []} -> ok; {ok, SslListeners} -> diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 168ced3cfe..6fe4c12788 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -18,6 +18,7 @@ -include("rabbit.hrl"). -export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]). +-export([enable/1, disable/1]). %%---------------------------------------------------------------------------- @@ -31,11 +32,20 @@ -spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]). -spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) -> [plugin_name()]). - +-spec(enable/1 :: ([plugin_name()]) -> 'ok'). +-spec(disable/1 :: ([plugin_name()]) -> 'ok'). -endif. %%---------------------------------------------------------------------------- +enable(Plugins) -> + setup(), + rabbit_boot:start(Plugins). + +disable(Plugins) -> + setup(), + rabbit_boot:stop(Plugins). + %% @doc Prepares the file system and installs all enabled plugins. setup() -> {ok, PluginDir} = application:get_env(rabbit, plugins_dir), diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 948d2ab000..408fc4e10b 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -19,17 +19,21 @@ -export([start/0, stop/0]). +-define(NODE_OPT, "-n"). -define(VERBOSE_OPT, "-v"). -define(MINIMAL_OPT, "-m"). -define(ENABLED_OPT, "-E"). -define(ENABLED_ALL_OPT, "-e"). +-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). -define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). -define(ENABLED_DEF, {?ENABLED_OPT, flag}). -define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}). --define(GLOBAL_DEFS, []). +-define(RPC_TIMEOUT, infinity). + +-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]). -define(COMMANDS, [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]}, @@ -51,11 +55,10 @@ start() -> {ok, [[PluginsFile|_]|_]} = init:get_argument(enabled_plugins_file), + {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir), {Command, Opts, Args} = - case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS, - init:get_plain_arguments()) - of + case parse_arguments(init:get_plain_arguments(), NodeStr) of {ok, Res} -> Res; no_command -> print_error("could not recognise command", []), usage() @@ -67,7 +70,8 @@ start() -> [string:join([atom_to_list(Command) | Args], " ")]) end, - case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of + Node = proplists:get_value(?NODE_OPT, Opts), + case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of ok -> rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> @@ -92,12 +96,25 @@ stop() -> %%---------------------------------------------------------------------------- -action(list, [], Opts, PluginsFile, PluginsDir) -> - action(list, [".*"], Opts, PluginsFile, PluginsDir); -action(list, [Pat], Opts, PluginsFile, PluginsDir) -> +parse_arguments(CmdLine, NodeStr) -> + case rabbit_misc:parse_arguments( + ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of + {ok, {Cmd, Opts0, Args}} -> + Opts = [case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; + _ -> {K, V} + end || {K, V} <- Opts0], + {ok, {Cmd, Opts, Args}}; + E -> + E + end. + +action(list, Node, [], Opts, PluginsFile, PluginsDir) -> + action(list, Node, [".*"], Opts, PluginsFile, PluginsDir); +action(list, _Node, [Pat], Opts, PluginsFile, PluginsDir) -> format_plugins(Pat, Opts, PluginsFile, PluginsDir); -action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> +action(enable, Node, ToEnable0, _Opts, PluginsFile, PluginsDir) -> case ToEnable0 of [] -> throw({error_string, "Not enough arguments for 'enable'"}); _ -> ok @@ -125,10 +142,10 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", NewImplicitlyEnabled -- ImplicitlyEnabled), - report_change() + action_change(Node, enable, NewEnabled) end; -action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> +action(disable, Node, ToDisable0, _Opts, PluginsFile, PluginsDir) -> case ToDisable0 of [] -> throw({error_string, "Not enough arguments for 'disable'"}); _ -> ok @@ -151,10 +168,11 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> NewImplicitlyEnabled = rabbit_plugins:dependencies(false, NewEnabled, AllPlugins), + Disabled = ImplicitlyEnabled -- NewImplicitlyEnabled, print_list("The following plugins have been disabled:", - ImplicitlyEnabled -- NewImplicitlyEnabled), + Disabled), write_enabled_plugins(PluginsFile, NewEnabled), - report_change() + action_change(Node, disable, Disabled) end. %%---------------------------------------------------------------------------- @@ -262,6 +280,16 @@ write_enabled_plugins(PluginsFile, Plugins) -> PluginsFile, Reason}}) end. -report_change() -> - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n"). +action_change(Node, Action, Targets) -> + rpc_call(Node, rabbit_plugins, Action, [Targets]). + +rpc_call(Node, Mod, Action, Args) -> + case rpc:call(Node, Mod, Action, Args, ?RPC_TIMEOUT) of + {badrpc, nodedown} -> io:format("Plugin configuration has changed.~n"); + ok -> io:format("Plugin(s) ~pd.~n", [Action]); + Error -> io:format("Unable to ~p plugin(s). " + "Please restart the broker " + "to apply your changes.~nError: ~p~n", + [Action, Error]) + end. + |
