diff options
| author | Tim Watson <tim@rabbitmq.com> | 2014-03-13 22:51:57 +0000 |
|---|---|---|
| committer | Tim Watson <tim@rabbitmq.com> | 2014-03-13 22:51:57 +0000 |
| commit | 161fd648b75a14008f2e0543eb7e7cb6e3250f54 (patch) | |
| tree | d3e75f57d110bcb487dcf7734aaa95e2a83ca331 /src | |
| parent | 442cc36f9f76d73f5af8e5c33a223fa8648521b0 (diff) | |
| parent | 4a743ceba66a1190f5bcc445615e8e3f0641f080 (diff) | |
| download | rabbitmq-server-git-161fd648b75a14008f2e0543eb7e7cb6e3250f54.tar.gz | |
Merge default into bug24926
Diffstat (limited to 'src')
| -rw-r--r-- | src/app_utils.erl | 23 | ||||
| -rw-r--r-- | src/rabbit.erl | 147 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_plugins.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_plugins_main.erl | 60 |
5 files changed, 244 insertions, 63 deletions
diff --git a/src/app_utils.erl b/src/app_utils.erl index 5ae2d2954e..4edee86f2b 100644 --- a/src/app_utils.erl +++ b/src/app_utils.erl @@ -17,11 +17,13 @@ -export([load_applications/1, start_applications/1, start_applications/2, stop_applications/1, stop_applications/2, app_dependency_order/2, - wait_for_applications/1]). + wait_for_applications/1, app_dependencies/1, app_modules/1, + which_applications/0, update_running_apps/2]). -ifdef(use_specs). -type error_handler() :: fun((atom(), any()) -> 'ok'). +-type diff() :: [atom()]. -spec load_applications([atom()]) -> 'ok'. -spec start_applications([atom()]) -> 'ok'. @@ -30,12 +32,31 @@ -spec stop_applications([atom()], error_handler()) -> 'ok'. -spec wait_for_applications([atom()]) -> 'ok'. -spec app_dependency_order([atom()], boolean()) -> [digraph:vertex()]. +-spec app_dependencies(atom()) -> [atom()]. +-spec update_running_apps(fun (() -> 'ok'), + fun ((diff()) -> 'ok')) -> 'ok'. +-spec which_applications() -> [atom()]. +-spec app_modules(atom()) -> [module()]. -endif. %%--------------------------------------------------------------------------- %% Public API +update_running_apps(MakeChanges, WithChanges) -> + Old = sets:from_list(which_applications()), + MakeChanges(), + New = sets:from_list(which_applications()), + Diff = sets:to_list(sets:subtract(New, Old)), + WithChanges(Diff). + +which_applications() -> + [App || {App, _, _} <- rabbit_misc:which_applications()]. + +app_modules(App) -> + {ok, Modules} = application:get_key(App, modules), + Modules. + load_applications(Apps) -> load_applications(queue:from_list(Apps), sets:new()), ok. diff --git a/src/rabbit.erl b/src/rabbit.erl index bd4f1dbc88..c703fedb35 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -22,9 +22,9 @@ stop_and_halt/0, await_startup/0, status/0, is_running/0, is_running/1, environment/0, rotate_logs/1, force_event_refresh/1, start_fhc/0]). - +-export([run_boot_steps/0, load_steps/1, run_step/3]). -export([start/2, stop/1]). - +-export([start_apps/1, stop_apps/1]). -export([log_location/1]). %% for testing %%--------------------------------------------------------------------------- @@ -312,8 +312,7 @@ start() -> ok = ensure_working_log_handlers(), rabbit_node_monitor:prepare_cluster_status_files(), rabbit_mnesia:check_cluster_consistency(), - ok = app_utils:start_applications( - app_startup_order(), fun handle_app_error/2), + ok = start_apps(app_startup_order()), ok = log_broker_started(rabbit_plugins:active()) end). @@ -331,19 +330,26 @@ boot() -> rabbit_mnesia:check_cluster_consistency(), Plugins = rabbit_plugins:setup(), ToBeLoaded = Plugins ++ ?APPS, - ok = app_utils:load_applications(ToBeLoaded), - StartupApps = app_utils:app_dependency_order(ToBeLoaded, - false), - ok = app_utils:start_applications( - StartupApps, fun handle_app_error/2), + ok = start_apps(ToBeLoaded), ok = log_broker_started(Plugins) end). -handle_app_error(App, {bad_return, {_MFA, {'EXIT', {Reason, _}}}}) -> - throw({could_not_start, App, Reason}); +handle_app_error(Term) -> + fun(App, {bad_return, {_MFA, {'EXIT', {ExitReason, _}}}}) -> + throw({Term, App, ExitReason}); + (App, Reason) -> + throw({Term, App, Reason}) + end. -handle_app_error(App, Reason) -> - throw({could_not_start, App, Reason}). +start_apps(Apps) -> + app_utils:load_applications(Apps), + StartupApps = app_utils:app_dependency_order(Apps, false), + case whereis(rabbit_boot) of + undefined -> run_boot_steps(Apps); + _ -> ok + end, + ok = app_utils:start_applications(StartupApps, + handle_app_error(could_not_start)). start_it(StartFun) -> Marker = spawn_link(fun() -> receive stop -> ok end end), @@ -369,12 +375,13 @@ start_it(StartFun) -> end. stop() -> + Apps = app_shutdown_order(), case whereis(rabbit_boot) of undefined -> ok; - _ -> await_startup() + _ -> app_utils:wait_for_applications(Apps) end, rabbit_log:info("Stopping RabbitMQ~n"), - ok = app_utils:stop_applications(app_shutdown_order()). + ok = app_utils:stop_applications(Apps). stop_and_halt() -> try @@ -385,6 +392,29 @@ stop_and_halt() -> end, ok. +stop_apps(Apps) -> + try + ok = app_utils:stop_applications( + Apps, handle_app_error(error_during_shutdown)) + after + run_cleanup_steps(Apps), + [begin + {ok, Mods} = application:get_key(App, modules), + [begin + code:soft_purge(Mod), + code:delete(Mod), + false = code:is_loaded(Mod) + end || Mod <- Mods], + application:unload(App) + end || App <- Apps] + end. + +run_cleanup_steps(Apps) -> + [run_step(Name, Attributes, cleanup) || + {App, Name, Attributes} <- load_steps(Apps), + lists:member(App, Apps)], + ok. + await_startup() -> app_utils:wait_for_applications(app_startup_order()). @@ -455,7 +485,7 @@ start(normal, []) -> true = register(rabbit, self()), print_banner(), log_banner(), - [ok = run_boot_step(Step) || Step <- boot_steps()], + run_boot_steps(), {ok, SupPid}; Error -> Error @@ -483,29 +513,90 @@ app_shutdown_order() -> %%--------------------------------------------------------------------------- %% boot step logic -run_boot_step({_StepName, Attributes}) -> - case [MFA || {mfa, MFA} <- Attributes] of +run_boot_steps() -> + run_boot_steps([App || {App, _, _} <- application:loaded_applications()]). + +run_boot_steps(Apps) -> + Steps = load_steps(Apps), + [ok = run_step(StepName, Attributes, mfa) || + {_, StepName, Attributes} <- Steps], + ok. + +run_step(StepName, Attributes, AttributeName) -> + case [MFA || {Key, MFA} <- Attributes, + Key =:= AttributeName] of [] -> ok; MFAs -> [try apply(M,F,A) of - ok -> ok; - {error, Reason} -> boot_error(Reason, not_available) + ok -> ok; + {error, Reason} -> boot_error({boot_step, StepName, Reason}, + not_available) catch - _:Reason -> boot_error(Reason, erlang:get_stacktrace()) + _:Reason -> boot_error({boot_step, StepName, Reason}, + erlang:get_stacktrace()) end || {M,F,A} <- MFAs], ok end. -boot_steps() -> - sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). +load_steps(BaseApps) -> + Apps = BaseApps -- app_utils:which_applications(), %% exclude running apps + StepAttrs = rabbit_misc:all_module_attributes_with_app(rabbit_boot_step), + {AllSteps, StepsDict} = + lists:foldl( + fun({AppName, Mod, Steps}, {AccSteps, AccDict}) -> + {[{Mod, {AppName, Steps}}|AccSteps], + lists:foldl( + fun({StepName, _}, Acc) -> + dict:store(StepName, AppName, Acc) + end, AccDict, Steps)} + end, {[], dict:new()}, StepAttrs), + Steps = lists:foldl(filter_steps(Apps, StepsDict), [], AllSteps), + sort_boot_steps(lists:usort(Steps)). + +filter_steps(Apps, Dict) -> + fun({Mod, {AppName, Steps}}, Acc) -> + Steps2 = [begin + Filtered = lists:foldl(filter_attrs(Apps, Dict), + [], Attrs), + {Step, Filtered} + end || {Step, Attrs} <- Steps, + filter_app(Apps, Dict, Step)], + [{Mod, {AppName, Steps2}}|Acc] + end. + +filter_app(Apps, Dict, Step) -> + case dict:find(Step, Dict) of + {ok, App} -> lists:member(App, Apps); + error -> false + end. + +filter_attrs(Apps, Dict) -> + fun(Attr={Type, Other}, AccAttrs) when Type =:= requires orelse + Type =:= enables -> + %% If we don't know about a dependency, we allow it through, + %% since we don't *know* that it should be ignored. If, on + %% the other hand, we recognise a dependency then we _only_ + %% include it (i.e., the requires/enables attribute itself) + %% if the referenced step comes from one of the Apps we're + %% actively working with at this point. + case dict:find(Other, Dict) of + error -> [Attr | AccAttrs]; + {ok, App} -> case lists:member(App, Apps) of + true -> [Attr | AccAttrs]; + false -> AccAttrs + end + end; + (Attr, AccAttrs) -> + [Attr | AccAttrs] + end. -vertices(_Module, Steps) -> - [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. +vertices(_Module, {AppName, Steps}) -> + [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps]. -edges(_Module, Steps) -> +edges(_Module, {_AppName, Steps}) -> [case Key of requires -> {StepName, OtherStep}; enables -> {OtherStep, StepName} @@ -528,8 +619,8 @@ sort_boot_steps(UnsortedSteps) -> digraph:delete(G), %% Check that all mentioned {M,F,A} triples are exported. case [{StepName, {M,F,A}} || - {StepName, Attributes} <- SortedSteps, - {mfa, {M,F,A}} <- Attributes, + {_App, StepName, Attributes} <- SortedSteps, + {mfa, {M,F,A}} <- Attributes, not erlang:function_exported(M, F, length(A))] of [] -> SortedSteps; MissingFunctions -> basic_boot_error( diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ab1c60635d..15f6ff432a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -51,6 +51,7 @@ -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([parse_arguments/3]). +-export([all_module_attributes_with_app/1]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). -export([const/1]). @@ -210,6 +211,8 @@ -> {'ok', {atom(), [{string(), string()}], [string()]}} | 'no_command'). -spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(all_module_attributes_with_app/1 :: + (atom()) -> [{atom(), atom(), [term()]}]). -spec(build_acyclic_graph/3 :: (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) -> rabbit_types:ok_or_error2(digraph(), @@ -846,21 +849,38 @@ module_attributes(Module) -> V end. +all_module_attributes_with_app(Name) -> + find_module_attributes( + fun(App, Modules) -> + [{App, Module} || Module <- Modules] + end, + fun ({App, Module}, Acc) -> + case lists:append([Atts || {N, Atts} <- module_attributes(Module), + N =:= Name]) of + [] -> Acc; + Atts -> [{App, Module, Atts} | Acc] + end + end). + all_module_attributes(Name) -> - Modules = - lists:usort( - lists:append( - [Modules || {App, _, _} <- application:loaded_applications(), - {ok, Modules} <- [application:get_key(App, modules)]])), - lists:foldl( + find_module_attributes( + fun(_App, Modules) -> Modules end, fun (Module, Acc) -> case lists:append([Atts || {N, Atts} <- module_attributes(Module), N =:= Name]) of [] -> Acc; Atts -> [{Module, Atts} | Acc] end - end, [], Modules). + end). +find_module_attributes(Generator, Fold) -> + Targets = + lists:usort( + lists:append( + [Generator(App, Modules) || + {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), + lists:foldl(Fold, [], Targets). build_acyclic_graph(VertexFun, EdgeFun, Graph) -> G = digraph:new([acyclic]), diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 168ced3cfe..1f36ce4ce0 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -18,6 +18,7 @@ -include("rabbit.hrl"). -export([setup/0, active/0, read_enabled/1, list/1, dependencies/3]). +-export([enable/1, disable/1]). %%---------------------------------------------------------------------------- @@ -31,17 +32,41 @@ -spec(read_enabled/1 :: (file:filename()) -> [plugin_name()]). -spec(dependencies/3 :: (boolean(), [plugin_name()], [#plugin{}]) -> [plugin_name()]). - +-spec(enable/1 :: ([plugin_name()]) -> 'ok'). +-spec(disable/1 :: ([plugin_name()]) -> 'ok'). -endif. %%---------------------------------------------------------------------------- +enable(Plugins) -> + prepare_plugins(Plugins), + app_utils:update_running_apps( + fun() -> rabbit:start_apps(Plugins) end, + fun(Diff) -> + ok = rabbit_event:notify(plugins_changed, [{enabled, Diff}]) + end). + +disable(Plugins) -> + app_utils:update_running_apps( + fun() -> rabbit:stop_apps(Plugins) end, + fun(Diff) -> + ok = rabbit_event:notify(plugins_changed, [{disabled, Diff}]) + end). + %% @doc Prepares the file system and installs all enabled plugins. setup() -> - {ok, PluginDir} = application:get_env(rabbit, plugins_dir), {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + + %% Eliminate the contents of the destination directory + case delete_recursively(ExpandDir) of + ok -> ok; + {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, + [ExpandDir, E1]}}) + end, + {ok, EnabledFile} = application:get_env(rabbit, enabled_plugins_file), - prepare_plugins(EnabledFile, PluginDir, ExpandDir). + Enabled = read_enabled(EnabledFile), + prepare_plugins(Enabled). %% @doc Lists the plugins which are currently running. active() -> @@ -104,9 +129,11 @@ dependencies(Reverse, Sources, AllPlugins) -> %%---------------------------------------------------------------------------- -prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> +prepare_plugins(Enabled) -> + {ok, PluginsDistDir} = application:get_env(rabbit, plugins_dir), + {ok, ExpandDir} = application:get_env(rabbit, plugins_expand_dir), + AllPlugins = list(PluginsDistDir), - Enabled = read_enabled(EnabledFile), ToUnpack = dependencies(false, Enabled, AllPlugins), ToUnpackPlugins = lookup_plugins(ToUnpack, AllPlugins), @@ -117,12 +144,6 @@ prepare_plugins(EnabledFile, PluginsDistDir, ExpandDir) -> [Missing]) end, - %% Eliminate the contents of the destination directory - case delete_recursively(ExpandDir) of - ok -> ok; - {error, E1} -> throw({error, {cannot_delete_plugins_expand_dir, - [ExpandDir, E1]}}) - end, case filelib:ensure_dir(ExpandDir ++ "/") of ok -> ok; {error, E2} -> throw({error, {cannot_create_plugins_expand_dir, diff --git a/src/rabbit_plugins_main.erl b/src/rabbit_plugins_main.erl index 948d2ab000..408fc4e10b 100644 --- a/src/rabbit_plugins_main.erl +++ b/src/rabbit_plugins_main.erl @@ -19,17 +19,21 @@ -export([start/0, stop/0]). +-define(NODE_OPT, "-n"). -define(VERBOSE_OPT, "-v"). -define(MINIMAL_OPT, "-m"). -define(ENABLED_OPT, "-E"). -define(ENABLED_ALL_OPT, "-e"). +-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VERBOSE_DEF, {?VERBOSE_OPT, flag}). -define(MINIMAL_DEF, {?MINIMAL_OPT, flag}). -define(ENABLED_DEF, {?ENABLED_OPT, flag}). -define(ENABLED_ALL_DEF, {?ENABLED_ALL_OPT, flag}). --define(GLOBAL_DEFS, []). +-define(RPC_TIMEOUT, infinity). + +-define(GLOBAL_DEFS(Node), [?NODE_DEF(Node)]). -define(COMMANDS, [{list, [?VERBOSE_DEF, ?MINIMAL_DEF, ?ENABLED_DEF, ?ENABLED_ALL_DEF]}, @@ -51,11 +55,10 @@ start() -> {ok, [[PluginsFile|_]|_]} = init:get_argument(enabled_plugins_file), + {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), {ok, [[PluginsDir|_]|_]} = init:get_argument(plugins_dist_dir), {Command, Opts, Args} = - case rabbit_misc:parse_arguments(?COMMANDS, ?GLOBAL_DEFS, - init:get_plain_arguments()) - of + case parse_arguments(init:get_plain_arguments(), NodeStr) of {ok, Res} -> Res; no_command -> print_error("could not recognise command", []), usage() @@ -67,7 +70,8 @@ start() -> [string:join([atom_to_list(Command) | Args], " ")]) end, - case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of + Node = proplists:get_value(?NODE_OPT, Opts), + case catch action(Command, Node, Args, Opts, PluginsFile, PluginsDir) of ok -> rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> @@ -92,12 +96,25 @@ stop() -> %%---------------------------------------------------------------------------- -action(list, [], Opts, PluginsFile, PluginsDir) -> - action(list, [".*"], Opts, PluginsFile, PluginsDir); -action(list, [Pat], Opts, PluginsFile, PluginsDir) -> +parse_arguments(CmdLine, NodeStr) -> + case rabbit_misc:parse_arguments( + ?COMMANDS, ?GLOBAL_DEFS(NodeStr), CmdLine) of + {ok, {Cmd, Opts0, Args}} -> + Opts = [case K of + ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)}; + _ -> {K, V} + end || {K, V} <- Opts0], + {ok, {Cmd, Opts, Args}}; + E -> + E + end. + +action(list, Node, [], Opts, PluginsFile, PluginsDir) -> + action(list, Node, [".*"], Opts, PluginsFile, PluginsDir); +action(list, _Node, [Pat], Opts, PluginsFile, PluginsDir) -> format_plugins(Pat, Opts, PluginsFile, PluginsDir); -action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> +action(enable, Node, ToEnable0, _Opts, PluginsFile, PluginsDir) -> case ToEnable0 of [] -> throw({error_string, "Not enough arguments for 'enable'"}); _ -> ok @@ -125,10 +142,10 @@ action(enable, ToEnable0, _Opts, PluginsFile, PluginsDir) -> [] -> io:format("Plugin configuration unchanged.~n"); _ -> print_list("The following plugins have been enabled:", NewImplicitlyEnabled -- ImplicitlyEnabled), - report_change() + action_change(Node, enable, NewEnabled) end; -action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> +action(disable, Node, ToDisable0, _Opts, PluginsFile, PluginsDir) -> case ToDisable0 of [] -> throw({error_string, "Not enough arguments for 'disable'"}); _ -> ok @@ -151,10 +168,11 @@ action(disable, ToDisable0, _Opts, PluginsFile, PluginsDir) -> NewImplicitlyEnabled = rabbit_plugins:dependencies(false, NewEnabled, AllPlugins), + Disabled = ImplicitlyEnabled -- NewImplicitlyEnabled, print_list("The following plugins have been disabled:", - ImplicitlyEnabled -- NewImplicitlyEnabled), + Disabled), write_enabled_plugins(PluginsFile, NewEnabled), - report_change() + action_change(Node, disable, Disabled) end. %%---------------------------------------------------------------------------- @@ -262,6 +280,16 @@ write_enabled_plugins(PluginsFile, Plugins) -> PluginsFile, Reason}}) end. -report_change() -> - io:format("Plugin configuration has changed. " - "Restart RabbitMQ for changes to take effect.~n"). +action_change(Node, Action, Targets) -> + rpc_call(Node, rabbit_plugins, Action, [Targets]). + +rpc_call(Node, Mod, Action, Args) -> + case rpc:call(Node, Mod, Action, Args, ?RPC_TIMEOUT) of + {badrpc, nodedown} -> io:format("Plugin configuration has changed.~n"); + ok -> io:format("Plugin(s) ~pd.~n", [Action]); + Error -> io:format("Unable to ~p plugin(s). " + "Please restart the broker " + "to apply your changes.~nError: ~p~n", + [Action, Error]) + end. + |
