summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rw-r--r--src/app_utils.erl9
-rw-r--r--src/rabbit.erl221
-rw-r--r--src/rabbit_alarm.erl6
-rw-r--r--src/rabbit_boot.erl403
-rw-r--r--src/rabbit_misc.erl34
-rw-r--r--src/rabbit_networking.erl4
-rw-r--r--src/rabbit_plugins.erl12
-rw-r--r--src/rabbit_plugins_main.erl60
10 files changed, 539 insertions, 218 deletions
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 90eb5a5d77..c6f3ff9c49 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -21,6 +21,7 @@
##--- Set environment vars RABBITMQ_<var_name> to defaults if not set
+[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_ENABLED_PLUGINS_FILE" ] && RABBITMQ_ENABLED_PLUGINS_FILE=${ENABLED_PLUGINS_FILE}
[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR=${PLUGINS_DIR}
@@ -35,4 +36,5 @@ exec ${ERL_DIR}erl \
-s rabbit_plugins_main \
-enabled_plugins_file "$RABBITMQ_ENABLED_PLUGINS_FILE" \
-plugins_dist_dir "$RABBITMQ_PLUGINS_DIR" \
+ -nodename $RABBITMQ_NODENAME \
-extra "$@"
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 0d1f128ea1..252ecb3a20 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -31,6 +31,10 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!RABBITMQ_NODENAME!"=="" (
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
+)
+
if not exist "!ERLANG_HOME!\bin\erl.exe" (
echo.
echo ******************************
@@ -51,7 +55,7 @@ if "!RABBITMQ_PLUGINS_DIR!"=="" (
set RABBITMQ_PLUGINS_DIR=!TDP0!..\plugins
)
-"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -extra !STAR!
+"!ERLANG_HOME!\bin\erl.exe" -pa "!TDP0!..\ebin" -noinput -hidden -sname rabbitmq-plugins!RANDOM!!TIME:~9! -s rabbit_plugins_main -enabled_plugins_file "!RABBITMQ_ENABLED_PLUGINS_FILE!" -plugins_dist_dir "!RABBITMQ_PLUGINS_DIR:\=/!" -nodename !RABBITMQ_NODENAME! -extra !STAR!
endlocal
endlocal
diff --git a/src/app_utils.erl b/src/app_utils.erl
index 5ae2d2954e..5eeb692425 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -17,7 +17,7 @@
-export([load_applications/1, start_applications/1, start_applications/2,
stop_applications/1, stop_applications/2, app_dependency_order/2,
- wait_for_applications/1]).
+ wait_for_applications/1, app_dependencies/1]).
-ifdef(use_specs).
@@ -30,6 +30,7 @@
-spec stop_applications([atom()], error_handler()) -> 'ok'.
-spec wait_for_applications([atom()]) -> 'ok'.
-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+-spec app_dependencies(atom()) -> [atom()].
-endif.
@@ -68,7 +69,6 @@ stop_applications(Apps, ErrorHandler) ->
ErrorHandler,
Apps).
-
wait_for_applications(Apps) ->
[wait_for_application(App) || App <- Apps], ok.
@@ -80,8 +80,9 @@ app_dependency_order(RootApps, StripUnreachable) ->
{App, _Desc, _Vsn} <- application:loaded_applications()]),
try
case StripUnreachable of
- true -> digraph:del_vertices(G, digraph:vertices(G) --
- digraph_utils:reachable(RootApps, G));
+ true -> digraph:del_vertices(
+ G, digraph:vertices(G) --
+ digraph_utils:reachable(RootApps, G));
false -> ok
end,
digraph_utils:topsort(G)
diff --git a/src/rabbit.erl b/src/rabbit.erl
index bd4f1dbc88..081e2e229a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -305,76 +305,40 @@ ensure_application_loaded() ->
end.
start() ->
- start_it(fun() ->
- %% We do not want to HiPE compile or upgrade
- %% mnesia after just restarting the app
- ok = ensure_application_loaded(),
- ok = ensure_working_log_handlers(),
- rabbit_node_monitor:prepare_cluster_status_files(),
- rabbit_mnesia:check_cluster_consistency(),
- ok = app_utils:start_applications(
- app_startup_order(), fun handle_app_error/2),
- ok = log_broker_started(rabbit_plugins:active())
- end).
+ rabbit_boot:boot_with(
+ fun() ->
+ %% We do not want to HiPE compile or upgrade
+ %% mnesia after just restarting the app
+ ok = ensure_application_loaded(),
+ ok = ensure_working_log_handlers(),
+ rabbit_node_monitor:prepare_cluster_status_files(),
+ rabbit_mnesia:check_cluster_consistency(),
+ ok = rabbit_boot:start(app_startup_order()),
+ ok = log_broker_started(rabbit_plugins:active())
+ end).
boot() ->
- start_it(fun() ->
- ok = ensure_application_loaded(),
- Success = maybe_hipe_compile(),
- ok = ensure_working_log_handlers(),
- warn_if_hipe_compilation_failed(Success),
- rabbit_node_monitor:prepare_cluster_status_files(),
- ok = rabbit_upgrade:maybe_upgrade_mnesia(),
- %% It's important that the consistency check happens after
- %% the upgrade, since if we are a secondary node the
- %% primary node will have forgotten us
- rabbit_mnesia:check_cluster_consistency(),
- Plugins = rabbit_plugins:setup(),
- ToBeLoaded = Plugins ++ ?APPS,
- ok = app_utils:load_applications(ToBeLoaded),
- StartupApps = app_utils:app_dependency_order(ToBeLoaded,
- false),
- ok = app_utils:start_applications(
- StartupApps, fun handle_app_error/2),
- ok = log_broker_started(Plugins)
- end).
-
-handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- throw({could_not_start, App, Reason});
-
-handle_app_error(App, Reason) ->
- throw({could_not_start, App, Reason}).
-
-start_it(StartFun) ->
- Marker = spawn_link(fun() -> receive stop -> ok end end),
- case catch register(rabbit_boot, Marker) of
- true -> try
- case is_running() of
- true -> ok;
- false -> StartFun()
- end
- catch
- throw:{could_not_start, _App, _Reason}=Err ->
- boot_error(Err, not_available);
- _:Reason ->
- boot_error(Reason, erlang:get_stacktrace())
- after
- unlink(Marker),
- Marker ! stop,
- %% give the error loggers some time to catch up
- timer:sleep(100)
- end;
- _ -> unlink(Marker),
- Marker ! stop
- end.
+ rabbit_boot:boot_with(
+ fun() ->
+ ok = ensure_application_loaded(),
+ Success = maybe_hipe_compile(),
+ ok = ensure_working_log_handlers(),
+ warn_if_hipe_compilation_failed(Success),
+ rabbit_node_monitor:prepare_cluster_status_files(),
+ ok = rabbit_upgrade:maybe_upgrade_mnesia(),
+ %% It's important that the consistency check happens after
+ %% the upgrade, since if we are a secondary node the
+ %% primary node will have forgotten us
+ rabbit_mnesia:check_cluster_consistency(),
+ Plugins = rabbit_plugins:setup(),
+ ToBeLoaded = Plugins ++ ?APPS,
+ ok = rabbit_boot:start(ToBeLoaded),
+ ok = log_broker_started(Plugins)
+ end).
stop() ->
- case whereis(rabbit_boot) of
- undefined -> ok;
- _ -> await_startup()
- end,
rabbit_log:info("Stopping RabbitMQ~n"),
- ok = app_utils:stop_applications(app_shutdown_order()).
+ rabbit_boot:shutdown(app_shutdown_order()).
stop_and_halt() ->
try
@@ -455,7 +419,7 @@ start(normal, []) ->
true = register(rabbit, self()),
print_banner(),
log_banner(),
- [ok = run_boot_step(Step) || Step <- boot_steps()],
+ rabbit_boot:run_boot_steps(),
{ok, SupPid};
Error ->
Error
@@ -467,6 +431,7 @@ stop(_State) ->
true -> rabbit_amqqueue:on_node_down(node());
false -> rabbit_table:clear_ram_only_tables()
end,
+ ok = rabbit_boot:shutdown(),
ok.
%%---------------------------------------------------------------------------
@@ -481,120 +446,6 @@ app_shutdown_order() ->
app_utils:app_dependency_order(Apps, true).
%%---------------------------------------------------------------------------
-%% boot step logic
-
-run_boot_step({_StepName, Attributes}) ->
- case [MFA || {mfa, MFA} <- Attributes] of
- [] ->
- ok;
- MFAs ->
- [try
- apply(M,F,A)
- of
- ok -> ok;
- {error, Reason} -> boot_error(Reason, not_available)
- catch
- _:Reason -> boot_error(Reason, erlang:get_stacktrace())
- end || {M,F,A} <- MFAs],
- ok
- end.
-
-boot_steps() ->
- sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
-
-vertices(_Module, Steps) ->
- [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
-
-edges(_Module, Steps) ->
- [case Key of
- requires -> {StepName, OtherStep};
- enables -> {OtherStep, StepName}
- end || {StepName, Atts} <- Steps,
- {Key, OtherStep} <- Atts,
- Key =:= requires orelse Key =:= enables].
-
-sort_boot_steps(UnsortedSteps) ->
- case rabbit_misc:build_acyclic_graph(fun vertices/2, fun edges/2,
- UnsortedSteps) of
- {ok, G} ->
- %% Use topological sort to find a consistent ordering (if
- %% there is one, otherwise fail).
- SortedSteps = lists:reverse(
- [begin
- {StepName, Step} = digraph:vertex(G,
- StepName),
- Step
- end || StepName <- digraph_utils:topsort(G)]),
- digraph:delete(G),
- %% Check that all mentioned {M,F,A} triples are exported.
- case [{StepName, {M,F,A}} ||
- {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
- not erlang:function_exported(M, F, length(A))] of
- [] -> SortedSteps;
- MissingFunctions -> basic_boot_error(
- {missing_functions, MissingFunctions},
- "Boot step functions not exported: ~p~n",
- [MissingFunctions])
- end;
- {error, {vertex, duplicate, StepName}} ->
- basic_boot_error({duplicate_boot_step, StepName},
- "Duplicate boot step name: ~w~n", [StepName]);
- {error, {edge, Reason, From, To}} ->
- basic_boot_error(
- {invalid_boot_step_dependency, From, To},
- "Could not add boot step dependency of ~w on ~w:~n~s",
- [To, From,
- case Reason of
- {bad_vertex, V} ->
- io_lib:format("Boot step not registered: ~w~n", [V]);
- {bad_edge, [First | Rest]} ->
- [io_lib:format("Cyclic dependency: ~w", [First]),
- [io_lib:format(" depends on ~w", [Next]) ||
- Next <- Rest],
- io_lib:format(" depends on ~w~n", [First])]
- end])
- end.
-
--ifdef(use_specs).
--spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()).
--endif.
-boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
- AllNodes = rabbit_mnesia:cluster_nodes(all),
- {Err, Nodes} =
- case AllNodes -- [node()] of
- [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
- " shut down forcefully~nit cannot determine which nodes"
- " are timing out.~n", []};
- Ns -> {rabbit_misc:format(
- "Timeout contacting cluster nodes: ~p.~n", [Ns]),
- Ns}
- end,
- basic_boot_error(Term,
- Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
-boot_error(Reason, Stacktrace) ->
- Fmt = "Error description:~n ~p~n~n" ++
- "Log files (may contain more information):~n ~s~n ~s~n~n",
- Args = [Reason, log_location(kernel), log_location(sasl)],
- boot_error(Reason, Fmt, Args, Stacktrace).
-
--ifdef(use_specs).
--spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()])
- -> no_return()).
--endif.
-boot_error(Reason, Fmt, Args, not_available) ->
- basic_boot_error(Reason, Fmt, Args);
-boot_error(Reason, Fmt, Args, Stacktrace) ->
- basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n",
- Args ++ [Stacktrace]).
-
-basic_boot_error(Reason, Format, Args) ->
- io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
- rabbit_misc:local_info_msg(Format, Args),
- timer:sleep(1000),
- exit({?MODULE, failure_during_boot, Reason}).
-
-%%---------------------------------------------------------------------------
%% boot step functions
boot_delegate() ->
@@ -615,12 +466,12 @@ maybe_insert_default_data() ->
end.
insert_default_data() ->
- {ok, DefaultUser} = application:get_env(default_user),
- {ok, DefaultPass} = application:get_env(default_pass),
- {ok, DefaultTags} = application:get_env(default_user_tags),
- {ok, DefaultVHost} = application:get_env(default_vhost),
+ {ok, DefaultUser} = application:get_env(rabbit, default_user),
+ {ok, DefaultPass} = application:get_env(rabbit, default_pass),
+ {ok, DefaultTags} = application:get_env(rabbit, default_user_tags),
+ {ok, DefaultVHost} = application:get_env(rabbit, default_vhost),
{ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
- application:get_env(default_permissions),
+ application:get_env(rabbit, default_permissions),
ok = rabbit_vhost:add(DefaultVHost),
ok = rabbit_auth_backend_internal:add_user(DefaultUser, DefaultPass),
ok = rabbit_auth_backend_internal:set_tags(DefaultUser, DefaultTags),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index cd1d125bd5..ac2fb42f11 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -53,7 +53,8 @@ start_link() ->
start() ->
ok = rabbit_sup:start_restartable_child(?MODULE),
ok = gen_event:add_handler(?SERVER, ?MODULE, []),
- {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
+ {ok, MemoryWatermark} = application:get_env(rabbit,
+ vm_memory_high_watermark),
rabbit_sup:start_restartable_child(
vm_memory_monitor, [MemoryWatermark,
fun (Alarm) ->
@@ -61,7 +62,8 @@ start() ->
set_alarm(Alarm)
end,
fun clear_alarm/1]),
- {ok, DiskLimit} = application:get_env(disk_free_limit),
+ {ok, DiskLimit} = application:get_env(rabbit,
+ disk_free_limit),
rabbit_sup:start_restartable_child(rabbit_disk_monitor, [DiskLimit]),
ok.
diff --git a/src/rabbit_boot.erl b/src/rabbit_boot.erl
new file mode 100644
index 0000000000..1f8fcf1d19
--- /dev/null
+++ b/src/rabbit_boot.erl
@@ -0,0 +1,403 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_boot).
+
+-export([boot_with/1, shutdown/1]).
+-export([start/1, stop/1]).
+-export([run_boot_steps/0]).
+-export([boot_error/2, boot_error/4]).
+
+-ifdef(use_specs).
+
+-spec(boot_with/1 :: (fun(() -> 'ok')) -> 'ok').
+-spec(shutdown/1 :: ([atom()]) -> 'ok').
+-spec(start/1 :: ([atom()]) -> 'ok').
+-spec(stop/1 :: ([atom()]) -> 'ok').
+-spec(run_boot_steps/0 :: () -> 'ok').
+-spec(boot_error/2 :: (term(), not_available | [tuple()]) -> no_return()).
+-spec(boot_error/4 :: (term(), string(), [any()], not_available | [tuple()])
+ -> no_return()).
+
+-endif.
+
+-define(BOOT_FILE, "boot.info").
+
+%%
+%% When the broker is starting, we must run all the boot steps within the
+%% rabbit:start/2 application callback, after rabbit_sup has started and
+%% before any plugin applications begin to start. To achieve this, we process
+%% the boot steps from all loaded applications.
+%%
+%% If the broker is already running however, we must run all boot steps for
+%% each application/plugin we're starting, plus any other (dependent) steps.
+%% To achieve this, we process the boot steps from all loaded applications,
+%% but skip those that have already been run (i.e., steps that have been run
+%% once whilst, or even since the broker started).
+%%
+%% Tracking which boot steps have already been run is done via an ets table.
+%% Because we frequently find ourselves needing to query this table without
+%% the rabbit application running (e.g., during the initial boot sequence
+%% and when we've "cold started" a node without any running applications),
+%% this table is serialized to a file after each operation. When the node is
+%% stopped cleanly, the file is deleted. When a node is in the process of
+%% starting, the file is also removed and replaced (since we do not want to
+%% track boot steps from a previous incarnation of the node).
+%%
+
+%%---------------------------------------------------------------------------
+%% Public API
+
+boot_with(StartFun) ->
+ %% TODO: this should be done with monitors, not links, I think
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ case catch register(rabbit_boot, Marker) of
+ true -> try
+ case rabbit:is_running() of
+ true -> ok;
+ false -> StartFun()
+ end
+ catch
+ throw:{could_not_start, _App, _Reason}=Err ->
+ boot_error(Err, not_available);
+ _:Reason ->
+ boot_error(Reason, erlang:get_stacktrace())
+ after
+ unlink(Marker),
+ Marker ! stop,
+ %% give the error loggers some time to catch up
+ timer:sleep(100)
+ end;
+ _ -> unlink(Marker),
+ Marker ! stop
+ end.
+
+shutdown(Apps) ->
+ try
+ case whereis(?MODULE) of
+ undefined -> ok;
+ _ -> await_startup(Apps)
+ end,
+ %% TODO: put this back in somewhere sensible...
+ %% rabbit_log:info("Stopping RabbitMQ~n"),
+ ok = app_utils:stop_applications(Apps)
+ after
+ delete_boot_table()
+ end.
+
+start(Apps) ->
+ try
+ ensure_boot_table(),
+ force_reload(Apps),
+ StartupApps = app_utils:app_dependency_order(Apps, false),
+ case whereis(?MODULE) of
+ undefined -> run_boot_steps();
+ _ -> ok
+ end,
+ ok = app_utils:start_applications(StartupApps,
+ handle_app_error(could_not_start))
+ after
+ save_boot_table()
+ end.
+
+stop(Apps) ->
+ ensure_boot_table(),
+ try
+ ok = app_utils:stop_applications(
+ Apps, handle_app_error(error_during_shutdown))
+ after
+ try
+ BootSteps = load_steps(boot),
+ ToDelete = [Step || {App, _, _}=Step <- BootSteps,
+ lists:member(App, Apps)],
+ [ets:delete(?MODULE, Step) || {_, Step, _} <- ToDelete],
+ run_cleanup_steps(Apps)
+ after
+ save_boot_table()
+ end,
+ [begin
+ {ok, Mods} = application:get_key(App, modules),
+ [begin
+ code:soft_purge(Mod),
+ code:delete(Mod),
+ false = code:is_loaded(Mod)
+ end || Mod <- Mods],
+ application:unload(App)
+ end || App <- Apps]
+ end.
+
+run_cleanup_steps(Apps) ->
+ Completed = sets:new(),
+ lists:foldl(
+ fun({_, Name, _}=Step, Acc) ->
+ case sets:is_element(Name, Completed) of
+ true -> Acc;
+ false -> run_cleanup_step(Step),
+ sets:add_element(Name, Acc)
+ end
+ end,
+ Completed,
+ [Step || {App, _, _}=Step <- load_steps(cleanup),
+ lists:member(App, Apps)]),
+ ok.
+
+run_boot_steps() ->
+ Steps = load_steps(boot),
+ [ok = run_boot_step(Step) || Step <- Steps],
+ ok.
+
+load_steps(Type) ->
+ StepAttrs = rabbit_misc:all_module_attributes_with_app(rabbit_boot_step),
+ sort_boot_steps(
+ Type,
+ lists:usort(
+ [{Mod, {AppName, Steps}} || {AppName, Mod, Steps} <- StepAttrs])).
+
+boot_error(Term={error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+ AllNodes = rabbit_mnesia:cluster_nodes(all),
+ {Err, Nodes} =
+ case AllNodes -- [node()] of
+ [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
+ " shut down forcefully~nit cannot determine which nodes"
+ " are timing out.~n", []};
+ Ns -> {rabbit_misc:format(
+ "Timeout contacting cluster nodes: ~p.~n", [Ns]),
+ Ns}
+ end,
+ basic_boot_error(Term,
+ Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
+boot_error(Reason, Stacktrace) ->
+ Fmt = "Error description:~n ~p~n~n" ++
+ "Log files (may contain more information):~n ~s~n ~s~n~n",
+ Args = [Reason, log_location(kernel), log_location(sasl)],
+ boot_error(Reason, Fmt, Args, Stacktrace).
+
+boot_error(Reason, Fmt, Args, not_available) ->
+ basic_boot_error(Reason, Fmt, Args);
+boot_error(Reason, Fmt, Args, Stacktrace) ->
+ basic_boot_error(Reason, Fmt ++ "Stack trace:~n ~p~n~n",
+ Args ++ [Stacktrace]).
+
+%%---------------------------------------------------------------------------
+%% Private API
+
+force_reload(Apps) ->
+ ok = app_utils:load_applications(Apps),
+ ok = do_reload(Apps).
+
+do_reload([App|Apps]) ->
+ case application:get_key(App, modules) of
+ {ok, Modules} -> reload_all(Modules);
+ _ -> ok
+ end,
+ force_reload(Apps);
+do_reload([]) ->
+ ok.
+
+reload_all(Modules) ->
+ [begin
+ case code:soft_purge(Mod) of
+ true -> load_mod(Mod);
+ false -> ok
+ end
+ end || Mod <- Modules].
+
+load_mod(Mod) ->
+ case code:is_loaded(Mod) of
+ {file, Path} when Path /= 'preloaded' -> code:load_abs(Path);
+ _ -> code:load_file(Mod)
+ end.
+
+await_startup(Apps) ->
+ app_utils:wait_for_applications(Apps).
+
+delete_boot_table() ->
+ case filelib:is_file(boot_file()) of
+ true -> file:delete(boot_file());
+ false -> ok
+ end.
+
+ensure_boot_table() ->
+ case whereis(?MODULE) of
+ undefined ->
+ case filelib:is_file(boot_file()) of
+ true -> load_table();
+ false -> clean_table()
+ end;
+ _Pid ->
+ clean_table()
+ end.
+
+clean_table() ->
+ delete_boot_table(),
+ case ets:info(?MODULE) of
+ undefined ->
+ ets:new(?MODULE, [named_table, public, ordered_set]);
+ _ ->
+ ok
+ end.
+
+load_table() ->
+ case ets:file2tab(boot_file(), [{verify, true}]) of
+ {error, _} -> clean_table();
+ {ok, _Tab} -> ok
+ end.
+
+save_boot_table() ->
+ delete_boot_table(),
+ case ets:info(?MODULE) of
+ undefined -> ok;
+ _ -> ets:tab2file(?MODULE, boot_file(),
+ [{extended_info, [object_count]}]),
+ ets:delete(?MODULE)
+ end.
+
+boot_file() ->
+ filename:join(rabbit_mnesia:dir(), ?BOOT_FILE).
+
+handle_app_error(Term) ->
+ fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) ->
+ throw({Term, App, ExitReason});
+ (App, Reason) ->
+ throw({Term, App, Reason})
+ end.
+
+run_cleanup_step({_, _, Attributes}) ->
+ run_step_name(Attributes, cleanup).
+
+run_boot_step({_, StepName, Attributes}) ->
+ case catch already_run(StepName) of
+ false -> ok = run_step_name(Attributes, mfa),
+ mark_complete(StepName);
+ _ -> ok
+ end,
+ ok.
+
+run_step_name(Attributes, AttributeName) ->
+ case [MFA || {Key, MFA} <- Attributes,
+ Key =:= AttributeName] of
+ [] ->
+ ok;
+ MFAs ->
+ [try
+ apply(M,F,A)
+ of
+ ok -> ok;
+ {error, Reason} -> boot_error(Reason, not_available)
+ catch
+ _:Reason -> boot_error(Reason, erlang:get_stacktrace())
+ end || {M,F,A} <- MFAs],
+ ok
+ end.
+
+already_run(StepName) ->
+ ets:member(?MODULE, StepName).
+
+mark_complete(StepName) ->
+ ets:insert(?MODULE, {StepName}).
+
+basic_boot_error(Reason, Format, Args) ->
+ io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
+ rabbit_misc:local_info_msg(Format, Args),
+ timer:sleep(1000),
+ exit({?MODULE, failure_during_boot, Reason}).
+
+%% TODO: move me to rabbit_misc
+log_location(Type) ->
+ case application:get_env(rabbit, case Type of
+ kernel -> error_logger;
+ sasl -> sasl_error_logger
+ end) of
+ {ok, {file, File}} -> File;
+ {ok, false} -> undefined;
+ {ok, tty} -> tty;
+ {ok, silent} -> undefined;
+ {ok, Bad} -> throw({error, {cannot_log_to_file, Bad}});
+ _ -> undefined
+ end.
+
+vertices(_Module, {AppName, Steps}) ->
+ [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps].
+
+edges(Type) ->
+ %% When running "boot" steps, both hard _and_ soft dependencies are
+ %% considered equally. When running "cleanup" steps however, we only
+ %% consider /hard/ dependencies (i.e., of the form
+ %% {DependencyType, {hard, StepName}}) as dependencies.
+ fun (_Module, {_AppName, Steps}) ->
+ [case Key of
+ requires -> {StepName, strip_type(OtherStep)};
+ enables -> {strip_type(OtherStep), StepName}
+ end || {StepName, Atts} <- Steps,
+ {Key, OtherStep} <- Atts,
+ filter_dependent_steps(Key, OtherStep, Type)]
+ end.
+
+filter_dependent_steps(Key, Dependency, Type)
+ when Key =:= requires orelse Key =:= enables ->
+ case {Dependency, Type} of
+ {{hard, _}, cleanup} -> true;
+ {_SoftReqs, cleanup} -> false;
+ {_, boot} -> true
+ end;
+filter_dependent_steps(_, _, _) ->
+ false.
+
+strip_type({hard, Step}) -> Step;
+strip_type(Step) -> Step.
+
+sort_boot_steps(Type, UnsortedSteps) ->
+ case rabbit_misc:build_acyclic_graph(fun vertices/2, edges(Type),
+ UnsortedSteps) of
+ {ok, G} ->
+ %% Use topological sort to find a consistent ordering (if
+ %% there is one, otherwise fail).
+ SortedSteps = lists:reverse(
+ [begin
+ {StepName, Step} = digraph:vertex(G,
+ StepName),
+ Step
+ end || StepName <- digraph_utils:topsort(G)]),
+ digraph:delete(G),
+ %% Check that all mentioned {M,F,A} triples are exported.
+ case [{StepName, {M,F,A}} ||
+ {_App, StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
+ not erlang:function_exported(M, F, length(A))] of
+ [] -> SortedSteps;
+ MissingFunctions -> basic_boot_error(
+ {missing_functions, MissingFunctions},
+ "Boot step functions not exported: ~p~n",
+ [MissingFunctions])
+ end;
+ {error, {vertex, duplicate, StepName}} ->
+ basic_boot_error({duplicate_boot_step, StepName},
+ "Duplicate boot step name: ~w~n", [StepName]);
+ {error, {edge, Reason, From, To}} ->
+ basic_boot_error(
+ {invalid_boot_step_dependency, From, To},
+ "Could not add boot step dependency of ~w on ~w:~n~s",
+ [To, From,
+ case Reason of
+ {bad_vertex, V} ->
+ io_lib:format("Boot step not registered: ~w~n", [V]);
+ {bad_edge, [First | Rest]} ->
+ [io_lib:format("Cyclic dependency: ~w", [First]),
+ [io_lib:format(" depends on ~w", [Next]) ||
+ Next <- Rest],
+ io_lib:format(" depends on ~w~n", [First])]
+ end])
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 848c4a8721..35afc8fd10 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -51,6 +51,7 @@
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([parse_arguments/3]).
+-export([all_module_attributes_with_app/1]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([const/1]).
@@ -210,6 +211,8 @@
-> {'ok', {atom(), [{string(), string()}], [string()]}} |
'no_command').
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
+-spec(all_module_attributes_with_app/1 ::
+ (atom()) -> [{atom(), atom(), [term()]}]).
-spec(build_acyclic_graph/3 ::
(graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
-> rabbit_types:ok_or_error2(digraph(),
@@ -852,21 +855,38 @@ module_attributes(Module) ->
V
end.
+all_module_attributes_with_app(Name) ->
+ find_module_attributes(
+ fun(App, Modules) ->
+ [{App, Module} || Module <- Modules]
+ end,
+ fun ({App, Module}, Acc) ->
+ case lists:append([Atts || {N, Atts} <- module_attributes(Module),
+ N =:= Name]) of
+ [] -> Acc;
+ Atts -> [{App, Module, Atts} | Acc]
+ end
+ end).
+
all_module_attributes(Name) ->
- Modules =
- lists:usort(
- lists:append(
- [Modules || {App, _, _} <- application:loaded_applications(),
- {ok, Modules} <- [application:get_key(App, modules)]])),
- lists:foldl(
+ find_module_attributes(
+ fun(_App, Modules) -> Modules end,
fun (Module, Acc) ->
case lists:append([Atts || {N, Atts} <- module_attributes(Module),
N =:= Name]) of
[] -> Acc;
Atts -> [{Module, Atts} | Acc]
end
- end, [], Modules).
+ end).
+find_module_attributes(Generator, Fold) ->
+ Targets =
+ lists:usort(
+ lists:append(
+ [Generator(App, Modules) ||
+ {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
+ lists:foldl(Fold, [], Targets).
build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 42438790c0..b1016df1ef 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -125,12 +125,12 @@ boot() ->
ok = boot_ssl().
boot_tcp() ->
- {ok, TcpListeners} = application:get_env(tcp_listeners),
+ {ok, TcpListeners} = application:get_env(rabbit, tcp_listeners),
[ok = start_tcp_listener(Listener) || Listener <- TcpListeners],
ok.
boot_ssl() ->
- case application:get_env(ssl_listeners) of
+ case application:get_env(rabbit, ssl_listeners) of
{ok, []} ->
ok;
{ok, SslListeners} ->
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 168ced3cfe..6fe4c12788 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -18,6 +18,7 @@
-include("rabbit.hrl").
-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]).
+-export([enable/1, disable/1]).
%%----------------------------------------------------------------------------
@@ -31,11 +32,20 @@
-spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]).
-spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) ->
[plugin_name()]).
-
+-spec(enable/1 :: ([plugin_name()]) -> 'ok').
+-spec(disable/1 :: ([plugin_name()]) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
+enable(Plugins) ->
+ setup(),
+ rabbit_boot:start(Plugins).
+
+disable(Plugins) ->
+ setup(),
+ rabbit_boot:stop(Plugins).
+
%% @doc Prepares the file system and installs all enabled plugins.
setup() ->
{ok, PluginDir} = application:get_env(rabbit, plugins_dir),
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 948d2ab000..408fc4e10b 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -19,17 +19,21 @@
-export([start/0, stop/0]).
+-define(NODE_OPT, "-n").
-define(VERBOSE_OPT, "-v").
-define(MINIMAL_OPT, "-m").
-define(ENABLED_OPT, "-E").
-define(ENABLED_ALL_OPT, "-e").
+-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
--define(GLOBAL_DEFS, []).
+-define(RPC_TIMEOUT, infinity).
+
+-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]).
-define(COMMANDS,
[{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
@@ -51,11 +55,10 @@
start() ->
{ok, [[PluginsFile|_]|_]} =
init:get_argument(enabled_plugins_file),
+ {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
{Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
- init:get_plain_arguments())
- of
+ case parse_arguments(init:get_plain_arguments(), NodeStr) of
{ok, Res} -> Res;
no_command -> print_error("could not recognise command", []),
usage()
@@ -67,7 +70,8 @@ start() ->
[string:join([atom_to_list(Command) | Args], " ")])
end,
- case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ Node = proplists:get_value(?NODE_OPT, Opts),
+ case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
@@ -92,12 +96,25 @@ stop() ->
%%----------------------------------------------------------------------------
-action(list, [], Opts, PluginsFile, PluginsDir) ->
- action(list, [".*"], Opts, PluginsFile, PluginsDir);
-action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
+parse_arguments(CmdLine, NodeStr) ->
+ case rabbit_misc:parse_arguments(
+ ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of
+ {ok, {Cmd, Opts0, Args}} ->
+ Opts = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts0],
+ {ok, {Cmd, Opts, Args}};
+ E ->
+ E
+ end.
+
+action(list, Node, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, Node, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, _Node, [Pat], Opts, PluginsFile, PluginsDir) ->
format_plugins(Pat, Opts, PluginsFile, PluginsDir);
-action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+action(enable, Node, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
case ToEnable0 of
[] -> throw({error_string, "Not enough arguments for 'enable'"});
_ -> ok
@@ -125,10 +142,10 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
NewImplicitlyEnabled -- ImplicitlyEnabled),
- report_change()
+ action_change(Node, enable, NewEnabled)
end;
-action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+action(disable, Node, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
case ToDisable0 of
[] -> throw({error_string, "Not enough arguments for 'disable'"});
_ -> ok
@@ -151,10 +168,11 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
NewImplicitlyEnabled =
rabbit_plugins:dependencies(false,
NewEnabled, AllPlugins),
+ Disabled = ImplicitlyEnabled -- NewImplicitlyEnabled,
print_list("The following plugins have been disabled:",
- ImplicitlyEnabled -- NewImplicitlyEnabled),
+ Disabled),
write_enabled_plugins(PluginsFile, NewEnabled),
- report_change()
+ action_change(Node, disable, Disabled)
end.
%%----------------------------------------------------------------------------
@@ -262,6 +280,16 @@ write_enabled_plugins(PluginsFile, Plugins) ->
PluginsFile, Reason}})
end.
-report_change() ->
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n").
+action_change(Node, Action, Targets) ->
+ rpc_call(Node, rabbit_plugins, Action, [Targets]).
+
+rpc_call(Node, Mod, Action, Args) ->
+ case rpc:call(Node, Mod, Action, Args, ?RPC_TIMEOUT) of
+ {badrpc, nodedown} -> io:format("Plugin configuration has changed.~n");
+ ok -> io:format("Plugin(s) ~pd.~n", [Action]);
+ Error -> io:format("Unable to ~p plugin(s). "
+ "Please restart the broker "
+ "to apply your changes.~nError: ~p~n",
+ [Action, Error])
+ end.
+