summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-11-15 07:15:23 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-11-15 07:15:23 +0300
commit53ab5c17ddd11a5718fc0d229070674f8722679b (patch)
tree41619f847d4b443209e9b2a2b9353bdf6417a4ce /src
parent1648c5a9c9f38cf002d91366281c97394b549730 (diff)
downloadrabbitmq-server-git-53ab5c17ddd11a5718fc0d229070674f8722679b.tar.gz
Definition import: refactor for future ctl command
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_definitions.erl218
1 files changed, 136 insertions, 82 deletions
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl
index ffa04134ea..aba7c1ae18 100644
--- a/src/rabbit_definitions.erl
+++ b/src/rabbit_definitions.erl
@@ -17,8 +17,12 @@
-module(rabbit_definitions).
-include_lib("rabbit_common/include/rabbit.hrl").
--export([maybe_load_definitions/0, maybe_load_definitions_from/2,
- apply_defs/4, apply_defs/5]).
+%% automatic import on boot
+-export([maybe_load_definitions/0, maybe_load_definitions_from/2]).
+%% import
+-export([import_definitions/1, import_definitions/2,
+ apply_defs/2, apply_defs/3, apply_defs/4, apply_defs/5]).
+%% export
-export([all_definitions/0]).
-export([decode/1, decode/2, args/1]).
@@ -41,12 +45,24 @@ maybe_load_core_definitions() ->
maybe_load_management_definitions() ->
maybe_load_definitions(rabbitmq_management, load_definitions).
-load_definitions(Body) ->
- apply_defs(
- Body, ?INTERNAL_USER,
- fun () -> ok end,
- fun (E) -> {error, E} end).
+-spec import_definitions(Body :: binary() | iolist()) -> ok | {error, term()}.
+import_definitions(Body) ->
+ rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]),
+ case decode([], Body) of
+ {error, E} -> {error, E};
+ {ok, _, Map} -> apply_defs(Map, ?INTERNAL_USER)
+ end.
+-spec import_definitions(Body :: binary() | iolist(), VHost :: vhost:name()) -> ok | {error, term()}.
+import_definitions(Body, VHost) ->
+ rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]),
+ case decode([], Body) of
+ {error, E} -> {error, E};
+ {ok, _, Map} -> apply_defs(Map, ?INTERNAL_USER, fun() -> ok end, VHost)
+ end.
+
+
+-spec all_definitions() -> map().
all_definitions() ->
Xs = list_exchanges(),
Qs = list_queues(),
@@ -75,7 +91,6 @@ all_definitions() ->
queues => Qs,
bindings => Bs,
exchanges => Xs
-
}.
%%
@@ -117,7 +132,7 @@ load_definitions_from_file(File) ->
case file:read_file(File) of
{ok, Body} ->
rabbit_log:info("Applying definitions from ~s", [File]),
- load_definitions(Body);
+ import_definitions(Body);
{error, E} ->
rabbit_log:error("Could not read definitions from ~s, Error: ~p", [File, E]),
{error, {could_not_read_defs, {File, E}}}
@@ -126,14 +141,14 @@ load_definitions_from_file(File) ->
decode(Keys, Body) ->
case decode(Body) of
{ok, J0} ->
- J = maps:fold(fun(K, V, Acc) ->
- Acc#{binary_to_atom(K, utf8) => V}
- end, J0, J0),
- Results = [get_or_missing(K, J) || K <- Keys],
- case [E || E = {key_missing, _} <- Results] of
- [] -> {ok, Results, J};
- Errors -> {error, Errors}
- end;
+ J = maps:fold(fun(K, V, Acc) ->
+ Acc#{rabbit_data_coercion:to_atom(K, utf8) => V}
+ end, J0, J0),
+ Results = [get_or_missing(K, J) || K <- Keys],
+ case [E || E = {key_missing, _} <- Results] of
+ [] -> {ok, Results, J};
+ Errors -> {error, Errors}
+ end;
Else -> Else
end.
@@ -141,86 +156,125 @@ decode(<<"">>) ->
{ok, #{}};
decode(Body) ->
try
- {ok, rabbit_json:decode(Body)}
+ Decoded = rabbit_json:decode(Body),
+ Normalised = maps:fold(fun(K, V, Acc) ->
+ Acc#{binary_to_atom(K, utf8) => V}
+ end, Decoded, Decoded),
+ {ok, Normalised}
catch error:_ -> {error, not_json}
end.
-apply_defs(Body, ActingUser, SuccessFun, ErrorFun) ->
- rabbit_log:info("Asked to import definitions. Acting user: ~s", [rabbit_data_coercion:to_binary(ActingUser)]),
- case decode([], Body) of
- {error, E} ->
- ErrorFun(E);
- {ok, _, All} ->
- Version = maps:get(rabbit_version, All, undefined),
- try
- rabbit_log:info("Importing users..."),
- for_all(users, ActingUser, All,
- fun(User, _Username) ->
- rabbit_auth_backend_internal:put_user(
- User,
- Version,
- ActingUser)
- end),
- rabbit_log:info("Importing vhosts..."),
- for_all(vhosts, ActingUser, All, fun add_vhost/2),
- validate_limits(All),
- rabbit_log:info("Importing user permissions..."),
- for_all(permissions, ActingUser, All, fun add_permission/2),
- rabbit_log:info("Importing topic permissions..."),
- for_all(topic_permissions, ActingUser, All, fun add_topic_permission/2),
- rabbit_log:info("Importing parameters..."),
- for_all(parameters, ActingUser, All, fun add_parameter/2),
- rabbit_log:info("Importing global parameters..."),
- for_all(global_parameters, ActingUser, All, fun add_global_parameter/2),
- rabbit_log:info("Importing policies..."),
- for_all(policies, ActingUser, All, fun add_policy/2),
- rabbit_log:info("Importing queues..."),
- for_all(queues, ActingUser, All, fun add_queue/2),
- rabbit_log:info("Importing exchanges..."),
- for_all(exchanges, ActingUser, All, fun add_exchange/2),
- rabbit_log:info("Importing bindings..."),
- for_all(bindings, ActingUser, All, fun add_binding/2),
- SuccessFun()
- catch {error, E} -> ErrorFun(format(E));
- exit:E -> ErrorFun(format(E))
- end
+-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username()) -> 'ok'.
+
+apply_defs(Map, ActingUser) ->
+ apply_defs(Map, ActingUser, fun () -> ok end).
+
+-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(),
+ SuccessFun :: fun(() -> 'ok')) -> 'ok';
+ (Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(),
+ VHost :: vhost:name()) -> 'ok'.
+
+apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
+ apply_defs(Map, ActingUser, fun () -> ok end, VHost);
+
+apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
+ rabbit_log:info("Asked to import definitions for multiple virtual hosts. Acting user: ~p",
+ [ActingUser]),
+ rabbit_log:info("Map: ~p", [Map]),
+ Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),
+ try
+ rabbit_log:info("Importing users..."),
+ for_all(users, ActingUser, Map,
+ fun(User, _Username) ->
+ rabbit_auth_backend_internal:put_user(User, Version, ActingUser)
+ end),
+ rabbit_log:info("Importing vhosts..."),
+ for_all(vhosts, ActingUser, Map, fun add_vhost/2),
+ validate_limits(Map),
+ rabbit_log:info("Importing user permissions..."),
+ for_all(permissions, ActingUser, Map, fun add_permission/2),
+ rabbit_log:info("Importing topic permissions..."),
+ for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
+ rabbit_log:info("Importing parameters..."),
+ for_all(parameters, ActingUser, Map, fun add_parameter/2),
+ rabbit_log:info("Importing global parameters..."),
+ for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
+ rabbit_log:info("Importing policies..."),
+ for_all(policies, ActingUser, Map, fun add_policy/2),
+ rabbit_log:info("Importing queues..."),
+ for_all(queues, ActingUser, Map, fun add_queue/2),
+ rabbit_log:info("Importing exchanges..."),
+ for_all(exchanges, ActingUser, Map, fun add_exchange/2),
+ rabbit_log:info("Importing bindings..."),
+ for_all(bindings, ActingUser, Map, fun add_binding/2),
+ SuccessFun(),
+ ok
+ catch {error, E} -> {error, E};
+ exit:E -> {error, E}
end.
-apply_defs(Body, ActingUser, SuccessFun, ErrorFun, VHost) ->
+-spec apply_defs(Map :: #{atom() => any()},
+ ActingUser :: rabbit_types:username(),
+ SuccessFun :: fun(() -> 'ok'),
+ VHost :: vhost:name()) -> 'ok'.
+
+apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
rabbit_log:info("Asked to import definitions for a virtual host. Virtual host: ~p, acting user: ~p",
[VHost, ActingUser]),
- case decode([], Body) of
- {error, E} ->
- ErrorFun(E);
- {ok, _, All} ->
- try
- validate_limits(All, VHost),
- rabbit_log:info("Importing parameters..."),
- for_all(parameters, ActingUser, All, VHost, fun add_parameter/3),
- rabbit_log:info("Importing policies..."),
- for_all(policies, ActingUser, All, VHost, fun add_policy/3),
- rabbit_log:info("Importing queues..."),
- for_all(queues, ActingUser, All, VHost, fun add_queue/3),
- rabbit_log:info("Importing exchanges..."),
- for_all(exchanges, ActingUser, All, VHost, fun add_exchange/3),
- rabbit_log:info("Importing bindings..."),
- for_all(bindings, ActingUser, All, VHost, fun add_binding/3),
- SuccessFun()
- catch {error, E} -> ErrorFun(format(E));
- exit:E -> ErrorFun(format(E))
- end
+ try
+ validate_limits(Map, VHost),
+ rabbit_log:info("Importing parameters..."),
+ for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ rabbit_log:info("Importing policies..."),
+ for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ rabbit_log:info("Importing queues..."),
+ for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ rabbit_log:info("Importing exchanges..."),
+ for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ rabbit_log:info("Importing bindings..."),
+ for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ SuccessFun()
+ catch {error, E} -> {error, format(E)};
+ exit:E -> {error, format(E)}
+ end.
+
+-spec apply_defs(Map :: #{atom() => any()},
+ ActingUser :: rabbit_types:username(),
+ SuccessFun :: fun(() -> 'ok'),
+ ErrorFun :: fun((any()) -> 'ok'),
+ VHost :: vhost:name()) -> 'ok'.
+
+apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
+ rabbit_log:info("Asked to import definitions for a virtual host. Virtual host: ~p, acting user: ~p",
+ [VHost, ActingUser]),
+ try
+ validate_limits(Map, VHost),
+ rabbit_log:info("Importing parameters..."),
+ for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ rabbit_log:info("Importing policies..."),
+ for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ rabbit_log:info("Importing queues..."),
+ for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ rabbit_log:info("Importing exchanges..."),
+ for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ rabbit_log:info("Importing bindings..."),
+ for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ SuccessFun()
+ catch {error, E} -> ErrorFun(format(E));
+ exit:E -> ErrorFun(format(E))
end.
-for_all(Name, ActingUser, All, Fun) ->
- case maps:get(Name, All, undefined) of
+for_all(Name, ActingUser, Definitions, Fun) ->
+ case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
undefined -> ok;
List -> [Fun(maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]),
ActingUser) ||
M <- List, is_map(M)]
end.
-for_all(Name, ActingUser, All, VHost, Fun) ->
- case maps:get(Name, All, undefined) of
+for_all(Name, ActingUser, Definitions, VHost, Fun) ->
+
+ case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
undefined -> ok;
List -> [Fun(VHost, maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]),
ActingUser) ||