summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2014-03-13 22:51:57 +0000
committerTim Watson <tim@rabbitmq.com>2014-03-13 22:51:57 +0000
commit161fd648b75a14008f2e0543eb7e7cb6e3250f54 (patch)
treed3e75f57d110bcb487dcf7734aaa95e2a83ca331 /src
parent442cc36f9f76d73f5af8e5c33a223fa8648521b0 (diff)
parent4a743ceba66a1190f5bcc445615e8e3f0641f080 (diff)
downloadrabbitmq-server-git-161fd648b75a14008f2e0543eb7e7cb6e3250f54.tar.gz
Merge default into bug24926
Diffstat (limited to 'src')
-rw-r--r--src/app_utils.erl23
-rw-r--r--src/rabbit.erl147
-rw-r--r--src/rabbit_misc.erl34
-rw-r--r--src/rabbit_plugins.erl43
-rw-r--r--src/rabbit_plugins_main.erl60
5 files changed, 244 insertions, 63 deletions
diff --git a/src/app_utils.erl b/src/app_utils.erl
index 5ae2d2954e..4edee86f2b 100644
--- a/src/app_utils.erl
+++ b/src/app_utils.erl
@@ -17,11 +17,13 @@
-export([load_applications/1, start_applications/1, start_applications/2,
stop_applications/1, stop_applications/2, app_dependency_order/2,
- wait_for_applications/1]).
+ wait_for_applications/1, app_dependencies/1, app_modules/1,
+ which_applications/0, update_running_apps/2]).
-ifdef(use_specs).
-type error_handler() :: fun((atom(), any()) -> 'ok').
+-type diff() :: [atom()].
-spec load_applications([atom()]) -> 'ok'.
-spec start_applications([atom()]) -> 'ok'.
@@ -30,12 +32,31 @@
-spec stop_applications([atom()], error_handler()) -> 'ok'.
-spec wait_for_applications([atom()]) -> 'ok'.
-spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()].
+-spec app_dependencies(atom()) -> [atom()].
+-spec update_running_apps(fun (() -> 'ok'),
+ fun ((diff()) -> 'ok')) -> 'ok'.
+-spec which_applications() -> [atom()].
+-spec app_modules(atom()) -> [module()].
-endif.
%%---------------------------------------------------------------------------
%% Public API
+update_running_apps(MakeChanges, WithChanges) ->
+ Old = sets:from_list(which_applications()),
+ MakeChanges(),
+ New = sets:from_list(which_applications()),
+ Diff = sets:to_list(sets:subtract(New, Old)),
+ WithChanges(Diff).
+
+which_applications() ->
+ [App || {App, _, _} <- rabbit_misc:which_applications()].
+
+app_modules(App) ->
+ {ok, Modules} = application:get_key(App, modules),
+ Modules.
+
load_applications(Apps) ->
load_applications(queue:from_list(Apps), sets:new()),
ok.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index bd4f1dbc88..c703fedb35 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -22,9 +22,9 @@
stop_and_halt/0, await_startup/0, status/0, is_running/0,
is_running/1, environment/0, rotate_logs/1, force_event_refresh/1,
start_fhc/0]).
-
+-export([run_boot_steps/0, load_steps/1, run_step/3]).
-export([start/2, stop/1]).
-
+-export([start_apps/1, stop_apps/1]).
-export([log_location/1]). %% for testing
%%---------------------------------------------------------------------------
@@ -312,8 +312,7 @@ start() ->
ok = ensure_working_log_handlers(),
rabbit_node_monitor:prepare_cluster_status_files(),
rabbit_mnesia:check_cluster_consistency(),
- ok = app_utils:start_applications(
- app_startup_order(), fun handle_app_error/2),
+ ok = start_apps(app_startup_order()),
ok = log_broker_started(rabbit_plugins:active())
end).
@@ -331,19 +330,26 @@ boot() ->
rabbit_mnesia:check_cluster_consistency(),
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
- ok = app_utils:load_applications(ToBeLoaded),
- StartupApps = app_utils:app_dependency_order(ToBeLoaded,
- false),
- ok = app_utils:start_applications(
- StartupApps, fun handle_app_error/2),
+ ok = start_apps(ToBeLoaded),
ok = log_broker_started(Plugins)
end).
-handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) ->
- throw({could_not_start, App, Reason});
+handle_app_error(Term) ->
+ fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) ->
+ throw({Term, App, ExitReason});
+ (App, Reason) ->
+ throw({Term, App, Reason})
+ end.
-handle_app_error(App, Reason) ->
- throw({could_not_start, App, Reason}).
+start_apps(Apps) ->
+ app_utils:load_applications(Apps),
+ StartupApps = app_utils:app_dependency_order(Apps, false),
+ case whereis(rabbit_boot) of
+ undefined -> run_boot_steps(Apps);
+ _ -> ok
+ end,
+ ok = app_utils:start_applications(StartupApps,
+ handle_app_error(could_not_start)).
start_it(StartFun) ->
Marker = spawn_link(fun() -> receive stop -> ok end end),
@@ -369,12 +375,13 @@ start_it(StartFun) ->
end.
stop() ->
+ Apps = app_shutdown_order(),
case whereis(rabbit_boot) of
undefined -> ok;
- _ -> await_startup()
+ _ -> app_utils:wait_for_applications(Apps)
end,
rabbit_log:info("Stopping RabbitMQ~n"),
- ok = app_utils:stop_applications(app_shutdown_order()).
+ ok = app_utils:stop_applications(Apps).
stop_and_halt() ->
try
@@ -385,6 +392,29 @@ stop_and_halt() ->
end,
ok.
+stop_apps(Apps) ->
+ try
+ ok = app_utils:stop_applications(
+ Apps, handle_app_error(error_during_shutdown))
+ after
+ run_cleanup_steps(Apps),
+ [begin
+ {ok, Mods} = application:get_key(App, modules),
+ [begin
+ code:soft_purge(Mod),
+ code:delete(Mod),
+ false = code:is_loaded(Mod)
+ end || Mod <- Mods],
+ application:unload(App)
+ end || App <- Apps]
+ end.
+
+run_cleanup_steps(Apps) ->
+ [run_step(Name, Attributes, cleanup) ||
+ {App, Name, Attributes} <- load_steps(Apps),
+ lists:member(App, Apps)],
+ ok.
+
await_startup() ->
app_utils:wait_for_applications(app_startup_order()).
@@ -455,7 +485,7 @@ start(normal, []) ->
true = register(rabbit, self()),
print_banner(),
log_banner(),
- [ok = run_boot_step(Step) || Step <- boot_steps()],
+ run_boot_steps(),
{ok, SupPid};
Error ->
Error
@@ -483,29 +513,90 @@ app_shutdown_order() ->
%%---------------------------------------------------------------------------
%% boot step logic
-run_boot_step({_StepName, Attributes}) ->
- case [MFA || {mfa, MFA} <- Attributes] of
+run_boot_steps() ->
+ run_boot_steps([App || {App, _, _} <- application:loaded_applications()]).
+
+run_boot_steps(Apps) ->
+ Steps = load_steps(Apps),
+ [ok = run_step(StepName, Attributes, mfa) ||
+ {_, StepName, Attributes} <- Steps],
+ ok.
+
+run_step(StepName, Attributes, AttributeName) ->
+ case [MFA || {Key, MFA} <- Attributes,
+ Key =:= AttributeName] of
[] ->
ok;
MFAs ->
[try
apply(M,F,A)
of
- ok -> ok;
- {error, Reason} -> boot_error(Reason, not_available)
+ ok -> ok;
+ {error, Reason} -> boot_error({boot_step, StepName, Reason},
+ not_available)
catch
- _:Reason -> boot_error(Reason, erlang:get_stacktrace())
+ _:Reason -> boot_error({boot_step, StepName, Reason},
+ erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
ok
end.
-boot_steps() ->
- sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)).
+load_steps(BaseApps) ->
+ Apps = BaseApps -- app_utils:which_applications(), %% exclude running apps
+ StepAttrs = rabbit_misc:all_module_attributes_with_app(rabbit_boot_step),
+ {AllSteps, StepsDict} =
+ lists:foldl(
+ fun({AppName, Mod, Steps}, {AccSteps, AccDict}) ->
+ {[{Mod, {AppName, Steps}}|AccSteps],
+ lists:foldl(
+ fun({StepName, _}, Acc) ->
+ dict:store(StepName, AppName, Acc)
+ end, AccDict, Steps)}
+ end, {[], dict:new()}, StepAttrs),
+ Steps = lists:foldl(filter_steps(Apps, StepsDict), [], AllSteps),
+ sort_boot_steps(lists:usort(Steps)).
+
+filter_steps(Apps, Dict) ->
+ fun({Mod, {AppName, Steps}}, Acc) ->
+ Steps2 = [begin
+ Filtered = lists:foldl(filter_attrs(Apps, Dict),
+ [], Attrs),
+ {Step, Filtered}
+ end || {Step, Attrs} <- Steps,
+ filter_app(Apps, Dict, Step)],
+ [{Mod, {AppName, Steps2}}|Acc]
+ end.
+
+filter_app(Apps, Dict, Step) ->
+ case dict:find(Step, Dict) of
+ {ok, App} -> lists:member(App, Apps);
+ error -> false
+ end.
+
+filter_attrs(Apps, Dict) ->
+ fun(Attr={Type, Other}, AccAttrs) when Type =:= requires orelse
+ Type =:= enables ->
+ %% If we don't know about a dependency, we allow it through,
+ %% since we don't *know* that it should be ignored. If, on
+ %% the other hand, we recognise a dependency then we _only_
+ %% include it (i.e., the requires/enables attribute itself)
+ %% if the referenced step comes from one of the Apps we're
+ %% actively working with at this point.
+ case dict:find(Other, Dict) of
+ error -> [Attr | AccAttrs];
+ {ok, App} -> case lists:member(App, Apps) of
+ true -> [Attr | AccAttrs];
+ false -> AccAttrs
+ end
+ end;
+ (Attr, AccAttrs) ->
+ [Attr | AccAttrs]
+ end.
-vertices(_Module, Steps) ->
- [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps].
+vertices(_Module, {AppName, Steps}) ->
+ [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps].
-edges(_Module, Steps) ->
+edges(_Module, {_AppName, Steps}) ->
[case Key of
requires -> {StepName, OtherStep};
enables -> {OtherStep, StepName}
@@ -528,8 +619,8 @@ sort_boot_steps(UnsortedSteps) ->
digraph:delete(G),
%% Check that all mentioned {M,F,A} triples are exported.
case [{StepName, {M,F,A}} ||
- {StepName, Attributes} <- SortedSteps,
- {mfa, {M,F,A}} <- Attributes,
+ {_App, StepName, Attributes} <- SortedSteps,
+ {mfa, {M,F,A}} <- Attributes,
not erlang:function_exported(M, F, length(A))] of
[] -> SortedSteps;
MissingFunctions -> basic_boot_error(
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ab1c60635d..15f6ff432a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -51,6 +51,7 @@
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([parse_arguments/3]).
+-export([all_module_attributes_with_app/1]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
-export([const/1]).
@@ -210,6 +211,8 @@
-> {'ok', {atom(), [{string(), string()}], [string()]}} |
'no_command').
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
+-spec(all_module_attributes_with_app/1 ::
+ (atom()) -> [{atom(), atom(), [term()]}]).
-spec(build_acyclic_graph/3 ::
(graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}])
-> rabbit_types:ok_or_error2(digraph(),
@@ -846,21 +849,38 @@ module_attributes(Module) ->
V
end.
+all_module_attributes_with_app(Name) ->
+ find_module_attributes(
+ fun(App, Modules) ->
+ [{App, Module} || Module <- Modules]
+ end,
+ fun ({App, Module}, Acc) ->
+ case lists:append([Atts || {N, Atts} <- module_attributes(Module),
+ N =:= Name]) of
+ [] -> Acc;
+ Atts -> [{App, Module, Atts} | Acc]
+ end
+ end).
+
all_module_attributes(Name) ->
- Modules =
- lists:usort(
- lists:append(
- [Modules || {App, _, _} <- application:loaded_applications(),
- {ok, Modules} <- [application:get_key(App, modules)]])),
- lists:foldl(
+ find_module_attributes(
+ fun(_App, Modules) -> Modules end,
fun (Module, Acc) ->
case lists:append([Atts || {N, Atts} <- module_attributes(Module),
N =:= Name]) of
[] -> Acc;
Atts -> [{Module, Atts} | Acc]
end
- end, [], Modules).
+ end).
+find_module_attributes(Generator, Fold) ->
+ Targets =
+ lists:usort(
+ lists:append(
+ [Generator(App, Modules) ||
+ {App, _, _} <- application:loaded_applications(),
+ {ok, Modules} <- [application:get_key(App, modules)]])),
+ lists:foldl(Fold, [], Targets).
build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
G = digraph:new([acyclic]),
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 168ced3cfe..1f36ce4ce0 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -18,6 +18,7 @@
-include("rabbit.hrl").
-export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]).
+-export([enable/1, disable/1]).
%%----------------------------------------------------------------------------
@@ -31,17 +32,41 @@
-spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]).
-spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) ->
[plugin_name()]).
-
+-spec(enable/1 :: ([plugin_name()]) -> 'ok').
+-spec(disable/1 :: ([plugin_name()]) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
+enable(Plugins) ->
+ prepare_plugins(Plugins),
+ app_utils:update_running_apps(
+ fun() -> rabbit:start_apps(Plugins) end,
+ fun(Diff) ->
+ ok = rabbit_event:notify(plugins_changed, [{enabled, Diff}])
+ end).
+
+disable(Plugins) ->
+ app_utils:update_running_apps(
+ fun() -> rabbit:stop_apps(Plugins) end,
+ fun(Diff) ->
+ ok = rabbit_event:notify(plugins_changed, [{disabled, Diff}])
+ end).
+
%% @doc Prepares the file system and installs all enabled plugins.
setup() ->
- {ok, PluginDir} = application:get_env(rabbit, plugins_dir),
{ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
+ %% Eliminate the contents of the destination directory
+ case delete_recursively(ExpandDir) of
+ ok -> ok;
+ {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
+ [ExpandDir, E1]}})
+ end,
+
{ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file),
- prepare_plugins(EnabledFile, PluginDir, ExpandDir).
+ Enabled = read_enabled(EnabledFile),
+ prepare_plugins(Enabled).
%% @doc Lists the plugins which are currently running.
active() ->
@@ -104,9 +129,11 @@ dependencies(Reverse, Sources, AllPlugins) ->
%%----------------------------------------------------------------------------
-prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
+prepare_plugins(Enabled) ->
+ {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir),
+ {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir),
+
AllPlugins = list(PluginsDistDir),
- Enabled = read_enabled(EnabledFile),
ToUnpack = dependencies(false, Enabled, AllPlugins),
ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins),
@@ -117,12 +144,6 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) ->
[Missing])
end,
- %% Eliminate the contents of the destination directory
- case delete_recursively(ExpandDir) of
- ok -> ok;
- {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir,
- [ExpandDir, E1]}})
- end,
case filelib:ensure_dir(ExpandDir ++ "/") of
ok -> ok;
{error, E2} -> throw({error, {cannot_create_plugins_expand_dir,
diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl
index 948d2ab000..408fc4e10b 100644
--- a/src/rabbit_plugins_main.erl
+++ b/src/rabbit_plugins_main.erl
@@ -19,17 +19,21 @@
-export([start/0, stop/0]).
+-define(NODE_OPT, "-n").
-define(VERBOSE_OPT, "-v").
-define(MINIMAL_OPT, "-m").
-define(ENABLED_OPT, "-E").
-define(ENABLED_ALL_OPT, "-e").
+-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
-define(ENABLED_DEF, {?ENABLED_OPT, flag}).
-define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}).
--define(GLOBAL_DEFS, []).
+-define(RPC_TIMEOUT, infinity).
+
+-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]).
-define(COMMANDS,
[{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]},
@@ -51,11 +55,10 @@
start() ->
{ok, [[PluginsFile|_]|_]} =
init:get_argument(enabled_plugins_file),
+ {ok, [[NodeStr|_]|_]} = init:get_argument(nodename),
{ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir),
{Command, Opts, Args} =
- case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS,
- init:get_plain_arguments())
- of
+ case parse_arguments(init:get_plain_arguments(), NodeStr) of
{ok, Res} -> Res;
no_command -> print_error("could not recognise command", []),
usage()
@@ -67,7 +70,8 @@ start() ->
[string:join([atom_to_list(Command) | Args], " ")])
end,
- case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
+ Node = proplists:get_value(?NODE_OPT, Opts),
+ case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
@@ -92,12 +96,25 @@ stop() ->
%%----------------------------------------------------------------------------
-action(list, [], Opts, PluginsFile, PluginsDir) ->
- action(list, [".*"], Opts, PluginsFile, PluginsDir);
-action(list, [Pat], Opts, PluginsFile, PluginsDir) ->
+parse_arguments(CmdLine, NodeStr) ->
+ case rabbit_misc:parse_arguments(
+ ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of
+ {ok, {Cmd, Opts0, Args}} ->
+ Opts = [case K of
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
+ _ -> {K, V}
+ end || {K, V} <- Opts0],
+ {ok, {Cmd, Opts, Args}};
+ E ->
+ E
+ end.
+
+action(list, Node, [], Opts, PluginsFile, PluginsDir) ->
+ action(list, Node, [".*"], Opts, PluginsFile, PluginsDir);
+action(list, _Node, [Pat], Opts, PluginsFile, PluginsDir) ->
format_plugins(Pat, Opts, PluginsFile, PluginsDir);
-action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
+action(enable, Node, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
case ToEnable0 of
[] -> throw({error_string, "Not enough arguments for 'enable'"});
_ -> ok
@@ -125,10 +142,10 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) ->
[] -> io:format("Plugin configuration unchanged.~n");
_ -> print_list("The following plugins have been enabled:",
NewImplicitlyEnabled -- ImplicitlyEnabled),
- report_change()
+ action_change(Node, enable, NewEnabled)
end;
-action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
+action(disable, Node, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
case ToDisable0 of
[] -> throw({error_string, "Not enough arguments for 'disable'"});
_ -> ok
@@ -151,10 +168,11 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) ->
NewImplicitlyEnabled =
rabbit_plugins:dependencies(false,
NewEnabled, AllPlugins),
+ Disabled = ImplicitlyEnabled -- NewImplicitlyEnabled,
print_list("The following plugins have been disabled:",
- ImplicitlyEnabled -- NewImplicitlyEnabled),
+ Disabled),
write_enabled_plugins(PluginsFile, NewEnabled),
- report_change()
+ action_change(Node, disable, Disabled)
end.
%%----------------------------------------------------------------------------
@@ -262,6 +280,16 @@ write_enabled_plugins(PluginsFile, Plugins) ->
PluginsFile, Reason}})
end.
-report_change() ->
- io:format("Plugin configuration has changed. "
- "Restart RabbitMQ for changes to take effect.~n").
+action_change(Node, Action, Targets) ->
+ rpc_call(Node, rabbit_plugins, Action, [Targets]).
+
+rpc_call(Node, Mod, Action, Args) ->
+ case rpc:call(Node, Mod, Action, Args, ?RPC_TIMEOUT) of
+ {badrpc, nodedown} -> io:format("Plugin configuration has changed.~n");
+ ok -> io:format("Plugin(s) ~pd.~n", [Action]);
+ Error -> io:format("Unable to ~p plugin(s). "
+ "Please restart the broker "
+ "to apply your changes.~nError: ~p~n",
+ [Action, Error])
+ end.
+