diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-11-15 07:15:23 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-11-15 07:15:23 +0300 |
| commit | 53ab5c17ddd11a5718fc0d229070674f8722679b (patch) | |
| tree | 41619f847d4b443209e9b2a2b9353bdf6417a4ce /src | |
| parent | 1648c5a9c9f38cf002d91366281c97394b549730 (diff) | |
| download | rabbitmq-server-git-53ab5c17ddd11a5718fc0d229070674f8722679b.tar.gz | |
Definition import: refactor for future ctl command
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_definitions.erl | 218 |
1 files changed, 136 insertions, 82 deletions
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl index ffa04134ea..aba7c1ae18 100644 --- a/src/rabbit_definitions.erl +++ b/src/rabbit_definitions.erl @@ -17,8 +17,12 @@ -module(rabbit_definitions). -include_lib("rabbit_common/include/rabbit.hrl"). --export([maybe_load_definitions/0, maybe_load_definitions_from/2, - apply_defs/4, apply_defs/5]). +%% automatic import on boot +-export([maybe_load_definitions/0, maybe_load_definitions_from/2]). +%% import +-export([import_definitions/1, import_definitions/2, + apply_defs/2, apply_defs/3, apply_defs/4, apply_defs/5]). +%% export -export([all_definitions/0]). -export([decode/1, decode/2, args/1]). @@ -41,12 +45,24 @@ maybe_load_core_definitions() -> maybe_load_management_definitions() -> maybe_load_definitions(rabbitmq_management, load_definitions). -load_definitions(Body) -> - apply_defs( - Body, ?INTERNAL_USER, - fun () -> ok end, - fun (E) -> {error, E} end). +-spec import_definitions(Body :: binary() | iolist()) -> ok | {error, term()}. +import_definitions(Body) -> + rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]), + case decode([], Body) of + {error, E} -> {error, E}; + {ok, _, Map} -> apply_defs(Map, ?INTERNAL_USER) + end. +-spec import_definitions(Body :: binary() | iolist(), VHost :: vhost:name()) -> ok | {error, term()}. +import_definitions(Body, VHost) -> + rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]), + case decode([], Body) of + {error, E} -> {error, E}; + {ok, _, Map} -> apply_defs(Map, ?INTERNAL_USER, fun() -> ok end, VHost) + end. + + +-spec all_definitions() -> map(). all_definitions() -> Xs = list_exchanges(), Qs = list_queues(), @@ -75,7 +91,6 @@ all_definitions() -> queues => Qs, bindings => Bs, exchanges => Xs - }. %% @@ -117,7 +132,7 @@ load_definitions_from_file(File) -> case file:read_file(File) of {ok, Body} -> rabbit_log:info("Applying definitions from ~s", [File]), - load_definitions(Body); + import_definitions(Body); {error, E} -> rabbit_log:error("Could not read definitions from ~s, Error: ~p", [File, E]), {error, {could_not_read_defs, {File, E}}} @@ -126,14 +141,14 @@ load_definitions_from_file(File) -> decode(Keys, Body) -> case decode(Body) of {ok, J0} -> - J = maps:fold(fun(K, V, Acc) -> - Acc#{binary_to_atom(K, utf8) => V} - end, J0, J0), - Results = [get_or_missing(K, J) || K <- Keys], - case [E || E = {key_missing, _} <- Results] of - [] -> {ok, Results, J}; - Errors -> {error, Errors} - end; + J = maps:fold(fun(K, V, Acc) -> + Acc#{rabbit_data_coercion:to_atom(K, utf8) => V} + end, J0, J0), + Results = [get_or_missing(K, J) || K <- Keys], + case [E || E = {key_missing, _} <- Results] of + [] -> {ok, Results, J}; + Errors -> {error, Errors} + end; Else -> Else end. @@ -141,86 +156,125 @@ decode(<<"">>) -> {ok, #{}}; decode(Body) -> try - {ok, rabbit_json:decode(Body)} + Decoded = rabbit_json:decode(Body), + Normalised = maps:fold(fun(K, V, Acc) -> + Acc#{binary_to_atom(K, utf8) => V} + end, Decoded, Decoded), + {ok, Normalised} catch error:_ -> {error, not_json} end. -apply_defs(Body, ActingUser, SuccessFun, ErrorFun) -> - rabbit_log:info("Asked to import definitions. Acting user: ~s", [rabbit_data_coercion:to_binary(ActingUser)]), - case decode([], Body) of - {error, E} -> - ErrorFun(E); - {ok, _, All} -> - Version = maps:get(rabbit_version, All, undefined), - try - rabbit_log:info("Importing users..."), - for_all(users, ActingUser, All, - fun(User, _Username) -> - rabbit_auth_backend_internal:put_user( - User, - Version, - ActingUser) - end), - rabbit_log:info("Importing vhosts..."), - for_all(vhosts, ActingUser, All, fun add_vhost/2), - validate_limits(All), - rabbit_log:info("Importing user permissions..."), - for_all(permissions, ActingUser, All, fun add_permission/2), - rabbit_log:info("Importing topic permissions..."), - for_all(topic_permissions, ActingUser, All, fun add_topic_permission/2), - rabbit_log:info("Importing parameters..."), - for_all(parameters, ActingUser, All, fun add_parameter/2), - rabbit_log:info("Importing global parameters..."), - for_all(global_parameters, ActingUser, All, fun add_global_parameter/2), - rabbit_log:info("Importing policies..."), - for_all(policies, ActingUser, All, fun add_policy/2), - rabbit_log:info("Importing queues..."), - for_all(queues, ActingUser, All, fun add_queue/2), - rabbit_log:info("Importing exchanges..."), - for_all(exchanges, ActingUser, All, fun add_exchange/2), - rabbit_log:info("Importing bindings..."), - for_all(bindings, ActingUser, All, fun add_binding/2), - SuccessFun() - catch {error, E} -> ErrorFun(format(E)); - exit:E -> ErrorFun(format(E)) - end +-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username()) -> 'ok'. + +apply_defs(Map, ActingUser) -> + apply_defs(Map, ActingUser, fun () -> ok end). + +-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(), + SuccessFun :: fun(() -> 'ok')) -> 'ok'; + (Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(), + VHost :: vhost:name()) -> 'ok'. + +apply_defs(Map, ActingUser, VHost) when is_binary(VHost) -> + apply_defs(Map, ActingUser, fun () -> ok end, VHost); + +apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> + rabbit_log:info("Asked to import definitions for multiple virtual hosts. Acting user: ~p", + [ActingUser]), + rabbit_log:info("Map: ~p", [Map]), + Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)), + try + rabbit_log:info("Importing users..."), + for_all(users, ActingUser, Map, + fun(User, _Username) -> + rabbit_auth_backend_internal:put_user(User, Version, ActingUser) + end), + rabbit_log:info("Importing vhosts..."), + for_all(vhosts, ActingUser, Map, fun add_vhost/2), + validate_limits(Map), + rabbit_log:info("Importing user permissions..."), + for_all(permissions, ActingUser, Map, fun add_permission/2), + rabbit_log:info("Importing topic permissions..."), + for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), + rabbit_log:info("Importing parameters..."), + for_all(parameters, ActingUser, Map, fun add_parameter/2), + rabbit_log:info("Importing global parameters..."), + for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), + rabbit_log:info("Importing policies..."), + for_all(policies, ActingUser, Map, fun add_policy/2), + rabbit_log:info("Importing queues..."), + for_all(queues, ActingUser, Map, fun add_queue/2), + rabbit_log:info("Importing exchanges..."), + for_all(exchanges, ActingUser, Map, fun add_exchange/2), + rabbit_log:info("Importing bindings..."), + for_all(bindings, ActingUser, Map, fun add_binding/2), + SuccessFun(), + ok + catch {error, E} -> {error, E}; + exit:E -> {error, E} end. -apply_defs(Body, ActingUser, SuccessFun, ErrorFun, VHost) -> +-spec apply_defs(Map :: #{atom() => any()}, + ActingUser :: rabbit_types:username(), + SuccessFun :: fun(() -> 'ok'), + VHost :: vhost:name()) -> 'ok'. + +apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) -> rabbit_log:info("Asked to import definitions for a virtual host. Virtual host: ~p, acting user: ~p", [VHost, ActingUser]), - case decode([], Body) of - {error, E} -> - ErrorFun(E); - {ok, _, All} -> - try - validate_limits(All, VHost), - rabbit_log:info("Importing parameters..."), - for_all(parameters, ActingUser, All, VHost, fun add_parameter/3), - rabbit_log:info("Importing policies..."), - for_all(policies, ActingUser, All, VHost, fun add_policy/3), - rabbit_log:info("Importing queues..."), - for_all(queues, ActingUser, All, VHost, fun add_queue/3), - rabbit_log:info("Importing exchanges..."), - for_all(exchanges, ActingUser, All, VHost, fun add_exchange/3), - rabbit_log:info("Importing bindings..."), - for_all(bindings, ActingUser, All, VHost, fun add_binding/3), - SuccessFun() - catch {error, E} -> ErrorFun(format(E)); - exit:E -> ErrorFun(format(E)) - end + try + validate_limits(Map, VHost), + rabbit_log:info("Importing parameters..."), + for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + rabbit_log:info("Importing policies..."), + for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + rabbit_log:info("Importing queues..."), + for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + rabbit_log:info("Importing exchanges..."), + for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + rabbit_log:info("Importing bindings..."), + for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + SuccessFun() + catch {error, E} -> {error, format(E)}; + exit:E -> {error, format(E)} + end. + +-spec apply_defs(Map :: #{atom() => any()}, + ActingUser :: rabbit_types:username(), + SuccessFun :: fun(() -> 'ok'), + ErrorFun :: fun((any()) -> 'ok'), + VHost :: vhost:name()) -> 'ok'. + +apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) -> + rabbit_log:info("Asked to import definitions for a virtual host. Virtual host: ~p, acting user: ~p", + [VHost, ActingUser]), + try + validate_limits(Map, VHost), + rabbit_log:info("Importing parameters..."), + for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + rabbit_log:info("Importing policies..."), + for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + rabbit_log:info("Importing queues..."), + for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + rabbit_log:info("Importing exchanges..."), + for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + rabbit_log:info("Importing bindings..."), + for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + SuccessFun() + catch {error, E} -> ErrorFun(format(E)); + exit:E -> ErrorFun(format(E)) end. -for_all(Name, ActingUser, All, Fun) -> - case maps:get(Name, All, undefined) of +for_all(Name, ActingUser, Definitions, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of undefined -> ok; List -> [Fun(maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]), ActingUser) || M <- List, is_map(M)] end. -for_all(Name, ActingUser, All, VHost, Fun) -> - case maps:get(Name, All, undefined) of +for_all(Name, ActingUser, Definitions, VHost, Fun) -> + + case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of undefined -> ok; List -> [Fun(VHost, maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]), ActingUser) || |
