diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_auth_backend_internal.erl | 152 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_definitions.erl | 674 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 50 |
7 files changed, 919 insertions, 24 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 87ceb5d701..e4d2ee9808 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -187,6 +187,15 @@ {requires, [core_initialized]}, {enables, routing_ready}]}). +%% We want to A) make sure we apply definitions before the node begins serving +%% traffic and B) in fact do it before empty_db_check (so the defaults will not +%% get created if we don't need 'em). +-rabbit_boot_step({load_core_definitions, + [{description, "imports definitions"}, + {mfa, {rabbit_definitions, maybe_load_definitions, []}}, + {requires, recovery}, + {enables, empty_db_check}]}). + -rabbit_boot_step({empty_db_check, [{description, "empty DB check"}, {mfa, {?MODULE, maybe_insert_default_data, []}}, diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index e675ad188b..09392bf7b8 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -28,7 +28,7 @@ hash_password/2, change_password_hash/2, change_password_hash/3, set_tags/3, set_permissions/6, clear_permissions/3, set_topic_permissions/6, clear_topic_permissions/3, clear_topic_permissions/4, - add_user_sans_validation/3]). + add_user_sans_validation/3, put_user/2, put_user/3]). -export([user_info_keys/0, perms_info_keys/0, user_perms_info_keys/0, vhost_perms_info_keys/0, @@ -471,6 +471,138 @@ clear_topic_permissions(Username, VHostPath, Exchange, ActingUser) -> {user_who_performed_action, ActingUser}]), R. +put_user(User, ActingUser) -> put_user(User, undefined, ActingUser). + +put_user(User, Version, ActingUser) -> + Username = maps:get(name, User), + HasPassword = maps:is_key(password, User), + HasPasswordHash = maps:is_key(password_hash, User), + Password = maps:get(password, User, undefined), + PasswordHash = maps:get(password_hash, User, undefined), + + Tags = case {maps:get(tags, User, undefined), maps:get(administrator, User, undefined)} of + {undefined, undefined} -> + throw({error, tags_not_present}); + {undefined, AdminS} -> + case rabbit_mgmt_util:parse_bool(AdminS) of + true -> [administrator]; + false -> [] + end; + {TagsS, _} -> + [list_to_atom(string:strip(T)) || + T <- string:tokens(binary_to_list(TagsS), ",")] + end, + + UserExists = case rabbit_auth_backend_internal:lookup_user(Username) of + %% expected + {error, not_found} -> false; + %% shouldn't normally happen but worth guarding + %% against + {error, _} -> false; + _ -> true + end, + + %% pre-configured, only applies to newly created users + Permissions = maps:get(permissions, User, undefined), + + PassedCredentialValidation = + case {HasPassword, HasPasswordHash} of + {true, false} -> + rabbit_credential_validation:validate(Username, Password) =:= ok; + {false, true} -> true; + _ -> + rabbit_credential_validation:validate(Username, Password) =:= ok + end, + + case UserExists of + true -> + case {HasPassword, HasPasswordHash} of + {true, false} -> + update_user_password(PassedCredentialValidation, Username, Password, Tags, ActingUser); + {false, true} -> + update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser); + {true, true} -> + throw({error, both_password_and_password_hash_are_provided}); + %% clear password, update tags if needed + _ -> + rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser), + rabbit_auth_backend_internal:clear_password(Username, ActingUser) + end; + false -> + case {HasPassword, HasPasswordHash} of + {true, false} -> + create_user_with_password(PassedCredentialValidation, Username, Password, Tags, Permissions, ActingUser); + {false, true} -> + create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, Permissions, ActingUser); + {true, true} -> + throw({error, both_password_and_password_hash_are_provided}); + {false, false} -> + %% this user won't be able to sign in using + %% a username/password pair but can be used for x509 certificate authentication, + %% with authn backends such as HTTP or LDAP and so on. + create_user_with_password(PassedCredentialValidation, Username, <<"">>, Tags, Permissions, ActingUser) + end + end. + +update_user_password(_PassedCredentialValidation = true, Username, Password, Tags, ActingUser) -> + rabbit_auth_backend_internal:change_password(Username, Password, ActingUser), + rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser); +update_user_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _ActingUser) -> + %% we don't log here because + %% rabbit_auth_backend_internal will do it + throw({error, credential_validation_failed}). + +update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser) -> + %% when a hash this provided, credential validation + %% is not applied + HashingAlgorithm = hashing_algorithm(User, Version), + + Hash = rabbit_misc:b64decode_or_throw(PasswordHash), + rabbit_auth_backend_internal:change_password_hash( + Username, Hash, HashingAlgorithm), + rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser). + +create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, undefined, ActingUser) -> + rabbit_auth_backend_internal:add_user(Username, Password, ActingUser), + rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser); +create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, PreconfiguredPermissions, ActingUser) -> + rabbit_auth_backend_internal:add_user(Username, Password, ActingUser), + rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser), + preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser); +create_user_with_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _, _) -> + %% we don't log here because + %% rabbit_auth_backend_internal will do it + throw({error, credential_validation_failed}). + +create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, PreconfiguredPermissions, ActingUser) -> + %% when a hash this provided, credential validation + %% is not applied + HashingAlgorithm = hashing_algorithm(User, Version), + Hash = rabbit_misc:b64decode_or_throw(PasswordHash), + + %% first we create a user with dummy credentials and no + %% validation applied, then we update password hash + TmpPassword = rabbit_guid:binary(rabbit_guid:gen_secure(), "tmp"), + rabbit_auth_backend_internal:add_user_sans_validation(Username, TmpPassword, ActingUser), + + rabbit_auth_backend_internal:change_password_hash( + Username, Hash, HashingAlgorithm), + rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser), + preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser). + +preconfigure_permissions(_Username, undefined, _ActingUser) -> + ok; +preconfigure_permissions(Username, Map, ActingUser) when is_map(Map) -> + maps:map(fun(VHost, M) -> + rabbit_auth_backend_internal:set_permissions(Username, VHost, + maps:get(<<"configure">>, M), + maps:get(<<"write">>, M), + maps:get(<<"read">>, M), + ActingUser) + end, + Map), + ok. + %%---------------------------------------------------------------------------- %% Listing @@ -649,3 +781,21 @@ extract_topic_permission_params(Keys, #topic_permission{ {exchange, Exchange}, {write, WritePerm}, {read, ReadPerm}]). + +hashing_algorithm(User, Version) -> + case maps:get(hashing_algorithm, User, undefined) of + undefined -> + case Version of + %% 3.6.1 and later versions are supposed to have + %% the algorithm exported and thus not need a default + <<"3.6.0">> -> rabbit_password_hashing_sha256; + <<"3.5.", _/binary>> -> rabbit_password_hashing_md5; + <<"3.4.", _/binary>> -> rabbit_password_hashing_md5; + <<"3.3.", _/binary>> -> rabbit_password_hashing_md5; + <<"3.2.", _/binary>> -> rabbit_password_hashing_md5; + <<"3.1.", _/binary>> -> rabbit_password_hashing_md5; + <<"3.0.", _/binary>> -> rabbit_password_hashing_md5; + _ -> rabbit_password:hashing_mod() + end; + Alg -> rabbit_data_coercion:to_atom(Alg, utf8) + end. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 94b8870192..881374a4aa 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -18,9 +18,9 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include("amqqueue.hrl"). --export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/3, list/1]). --export([list_for_source/1, list_for_destination/1, - list_for_source_and_destination/2]). +-export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/3]). +-export([list/1, list_for_source/1, list_for_destination/1, + list_for_source_and_destination/2, list_explicit/0]). -export([new_deletions/0, combine_deletions/2, add_deletion/3, process_deletions/2]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]). @@ -236,6 +236,20 @@ remove_default_exchange_binding_rows_of(Dst = #resource{}) -> end, ok. +-spec list_explicit() -> bindings(). + +list_explicit() -> + mnesia:async_dirty( + fun () -> + AllRoutes = mnesia:dirty_match_object(rabbit_route, #route{_ = '_'}), + %% if there are any default exchange bindings left after an upgrade + %% of a pre-3.8 database, filter them out + AllBindings = [B || #route{binding = B} <- AllRoutes], + lists:filter(fun(#binding{source = S}) -> + not (S#resource.kind =:= exchange andalso S#resource.name =:= <<>>) + end, AllBindings) + end). + -spec list(rabbit_types:vhost()) -> bindings(). list(VHostPath) -> diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl new file mode 100644 index 0000000000..260284d19c --- /dev/null +++ b/src/rabbit_definitions.erl @@ -0,0 +1,674 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_definitions). +-include_lib("rabbit_common/include/rabbit.hrl"). + +%% automatic import on boot +-export([maybe_load_definitions/0, maybe_load_definitions_from/2]). +%% import +-export([import_raw/1, import_raw/2, import_parsed/1, import_parsed/2, + apply_defs/2, apply_defs/3, apply_defs/4, apply_defs/5]). +%% export +-export([all_definitions/0]). +-export([decode/1, decode/2, args/1]). + +-import(rabbit_misc, [pget/2]). + +%% +%% API +%% + +maybe_load_definitions() -> + %% this feature was a part of rabbitmq-management for a long time, + %% so we check rabbit_management.load_definitions for backward compatibility. + maybe_load_management_definitions(), + %% this backs "core" load_definitions + maybe_load_core_definitions(). + +maybe_load_core_definitions() -> + maybe_load_definitions(rabbit, load_definitions). + +maybe_load_management_definitions() -> + maybe_load_definitions(rabbitmq_management, load_definitions). + +-spec import_raw(Body :: binary() | iolist()) -> ok | {error, term()}. +import_raw(Body) -> + rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]), + case decode([], Body) of + {error, E} -> {error, E}; + {ok, _, Map} -> apply_defs(Map, ?INTERNAL_USER) + end. + +-spec import_raw(Body :: binary() | iolist(), VHost :: vhost:name()) -> ok | {error, term()}. +import_raw(Body, VHost) -> + rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]), + case decode([], Body) of + {error, E} -> {error, E}; + {ok, _, Map} -> apply_defs(Map, ?INTERNAL_USER, fun() -> ok end, VHost) + end. + +-spec import_parsed(Defs :: #{any() => any()} | list()) -> ok | {error, term()}. +import_parsed(Body0) when is_list(Body0) -> + import_parsed(maps:from_list(Body0)); +import_parsed(Body0) when is_map(Body0) -> + rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]), + Body = atomise_map_keys(Body0), + apply_defs(Body, ?INTERNAL_USER). + +-spec import_parsed(Defs :: #{any() => any() | list()}, VHost :: vhost:name()) -> ok | {error, term()}. +import_parsed(Body0, VHost) when is_list(Body0) -> + import_parsed(maps:from_list(Body0), VHost); +import_parsed(Body0, VHost) -> + rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]), + Body = atomise_map_keys(Body0), + apply_defs(Body, ?INTERNAL_USER, fun() -> ok end, VHost). + +-spec all_definitions() -> map(). +all_definitions() -> + Xs = list_exchanges(), + Qs = list_queues(), + Bs = list_bindings(), + + Users = list_users(), + VHosts = list_vhosts(), + Params = list_runtime_parameters(), + GParams = list_global_runtime_parameters(), + Pols = list_policies(), + + Perms = list_permissions(), + TPerms = list_topic_permissions(), + + {ok, Vsn} = application:get_key(rabbit, vsn), + #{ + rabbit_version => rabbit_data_coercion:to_binary(Vsn), + rabbitmq_version => rabbit_data_coercion:to_binary(Vsn), + users => Users, + vhosts => VHosts, + permissions => Perms, + topic_permissions => TPerms, + parameters => Params, + global_parameters => GParams, + policies => Pols, + queues => Qs, + bindings => Bs, + exchanges => Xs + }. + +%% +%% Implementation +%% + +maybe_load_definitions(App, Key) -> + case application:get_env(App, Key) of + undefined -> ok; + {ok, none} -> ok; + {ok, FileOrDir} -> + IsDir = filelib:is_dir(FileOrDir), + maybe_load_definitions_from(IsDir, FileOrDir) + end. + +maybe_load_definitions_from(true, Dir) -> + rabbit_log:info("Applying definitions from directory ~s", [Dir]), + load_definitions_from_files(file:list_dir(Dir), Dir); +maybe_load_definitions_from(false, File) -> + load_definitions_from_file(File). + +load_definitions_from_files({ok, Filenames0}, Dir) -> + Filenames1 = lists:sort(Filenames0), + Filenames2 = [filename:join(Dir, F) || F <- Filenames1], + load_definitions_from_filenames(Filenames2); +load_definitions_from_files({error, E}, Dir) -> + rabbit_log:error("Could not read definitions from directory ~s, Error: ~p", [Dir, E]), + {error, {could_not_read_defs, E}}. + +load_definitions_from_filenames([]) -> + ok; +load_definitions_from_filenames([File|Rest]) -> + case load_definitions_from_file(File) of + ok -> load_definitions_from_filenames(Rest); + {error, E} -> {error, {failed_to_import_definitions, File, E}} + end. + +load_definitions_from_file(File) -> + case file:read_file(File) of + {ok, Body} -> + rabbit_log:info("Applying definitions from ~s", [File]), + import_raw(Body); + {error, E} -> + rabbit_log:error("Could not read definitions from ~s, Error: ~p", [File, E]), + {error, {could_not_read_defs, {File, E}}} + end. + +decode(Keys, Body) -> + case decode(Body) of + {ok, J0} -> + J = maps:fold(fun(K, V, Acc) -> + Acc#{rabbit_data_coercion:to_atom(K, utf8) => V} + end, J0, J0), + Results = [get_or_missing(K, J) || K <- Keys], + case [E || E = {key_missing, _} <- Results] of + [] -> {ok, Results, J}; + Errors -> {error, Errors} + end; + Else -> Else + end. + +decode(<<"">>) -> + {ok, #{}}; +decode(Body) -> + try + Decoded = rabbit_json:decode(Body), + Normalised = atomise_map_keys(Decoded), + {ok, Normalised} + catch error:_ -> {error, not_json} + end. + +atomise_map_keys(Decoded) -> + maps:fold(fun(K, V, Acc) -> + Acc#{rabbit_data_coercion:to_atom(K, utf8) => V} + end, Decoded, Decoded). + +-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username()) -> 'ok'. + +apply_defs(Map, ActingUser) -> + apply_defs(Map, ActingUser, fun () -> ok end). + +-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(), + SuccessFun :: fun(() -> 'ok')) -> 'ok'; + (Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(), + VHost :: vhost:name()) -> 'ok'. + +apply_defs(Map, ActingUser, VHost) when is_binary(VHost) -> + apply_defs(Map, ActingUser, fun () -> ok end, VHost); + +apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> + Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)), + try + rabbit_log:info("Importing users..."), + for_all(users, ActingUser, Map, + fun(User, _Username) -> + rabbit_auth_backend_internal:put_user(User, Version, ActingUser) + end), + rabbit_log:info("Importing vhosts..."), + for_all(vhosts, ActingUser, Map, fun add_vhost/2), + validate_limits(Map), + rabbit_log:info("Importing user permissions..."), + for_all(permissions, ActingUser, Map, fun add_permission/2), + rabbit_log:info("Importing topic permissions..."), + for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), + rabbit_log:info("Importing parameters..."), + for_all(parameters, ActingUser, Map, fun add_parameter/2), + rabbit_log:info("Importing global parameters..."), + for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), + rabbit_log:info("Importing policies..."), + for_all(policies, ActingUser, Map, fun add_policy/2), + rabbit_log:info("Importing queues..."), + for_all(queues, ActingUser, Map, fun add_queue/2), + rabbit_log:info("Importing exchanges..."), + for_all(exchanges, ActingUser, Map, fun add_exchange/2), + rabbit_log:info("Importing bindings..."), + for_all(bindings, ActingUser, Map, fun add_binding/2), + SuccessFun(), + ok + catch {error, E} -> {error, E}; + exit:E -> {error, E} + end. + +-spec apply_defs(Map :: #{atom() => any()}, + ActingUser :: rabbit_types:username(), + SuccessFun :: fun(() -> 'ok'), + VHost :: vhost:name()) -> 'ok'. + +apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) -> + rabbit_log:info("Asked to import definitions for a virtual host. Virtual host: ~p, acting user: ~p", + [VHost, ActingUser]), + try + validate_limits(Map, VHost), + rabbit_log:info("Importing parameters..."), + for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + rabbit_log:info("Importing policies..."), + for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + rabbit_log:info("Importing queues..."), + for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + rabbit_log:info("Importing exchanges..."), + for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + rabbit_log:info("Importing bindings..."), + for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + SuccessFun() + catch {error, E} -> {error, format(E)}; + exit:E -> {error, format(E)} + end. + +-spec apply_defs(Map :: #{atom() => any()}, + ActingUser :: rabbit_types:username(), + SuccessFun :: fun(() -> 'ok'), + ErrorFun :: fun((any()) -> 'ok'), + VHost :: vhost:name()) -> 'ok'. + +apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) -> + rabbit_log:info("Asked to import definitions for a virtual host. Virtual host: ~p, acting user: ~p", + [VHost, ActingUser]), + try + validate_limits(Map, VHost), + rabbit_log:info("Importing parameters..."), + for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + rabbit_log:info("Importing policies..."), + for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + rabbit_log:info("Importing queues..."), + for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + rabbit_log:info("Importing exchanges..."), + for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + rabbit_log:info("Importing bindings..."), + for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + SuccessFun() + catch {error, E} -> ErrorFun(format(E)); + exit:E -> ErrorFun(format(E)) + end. + +for_all(Name, ActingUser, Definitions, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of + undefined -> ok; + List -> [Fun(maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]), + ActingUser) || + M <- List, is_map(M)] + end. + +for_all(Name, ActingUser, Definitions, VHost, Fun) -> + + case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of + undefined -> ok; + List -> [Fun(VHost, maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]), + ActingUser) || + M <- List, is_map(M)] + end. + +format(#amqp_error{name = Name, explanation = Explanation}) -> + rabbit_data_coercion:to_binary(rabbit_misc:format("~s: ~s", [Name, Explanation])); +format({no_such_vhost, undefined}) -> + rabbit_data_coercion:to_binary( + "Virtual host does not exist and is not specified in definitions file."); +format({no_such_vhost, VHost}) -> + rabbit_data_coercion:to_binary( + rabbit_misc:format("Please create virtual host \"~s\" prior to importing definitions.", + [VHost])); +format({vhost_limit_exceeded, ErrMsg}) -> + rabbit_data_coercion:to_binary(ErrMsg); +format(E) -> + rabbit_data_coercion:to_binary(rabbit_misc:format("~p", [E])). + +add_parameter(Param, Username) -> + VHost = maps:get(vhost, Param, undefined), + add_parameter(VHost, Param, Username). + +add_parameter(VHost, Param, Username) -> + Comp = maps:get(component, Param, undefined), + Key = maps:get(name, Param, undefined), + Term = maps:get(value, Param, undefined), + Result = case is_map(Term) of + true -> + %% coerce maps to proplists for backwards compatibility. + %% See rabbitmq-management#528. + TermProplist = rabbit_data_coercion:to_proplist(Term), + rabbit_runtime_parameters:set(VHost, Comp, Key, TermProplist, Username); + _ -> + rabbit_runtime_parameters:set(VHost, Comp, Key, Term, Username) + end, + case Result of + ok -> ok; + {error_string, E} -> + S = rabbit_misc:format(" (~s/~s/~s)", [VHost, Comp, Key]), + exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S))) + end. + +add_global_parameter(Param, Username) -> + Key = maps:get(name, Param, undefined), + Term = maps:get(value, Param, undefined), + case is_map(Term) of + true -> + %% coerce maps to proplists for backwards compatibility. + %% See rabbitmq-management#528. + TermProplist = rabbit_data_coercion:to_proplist(Term), + rabbit_runtime_parameters:set_global(Key, TermProplist, Username); + _ -> + rabbit_runtime_parameters:set_global(Key, Term, Username) + end. + +add_policy(Param, Username) -> + VHost = maps:get(vhost, Param, undefined), + add_policy(VHost, Param, Username). + +add_policy(VHost, Param, Username) -> + Key = maps:get(name, Param, undefined), + case rabbit_policy:set( + VHost, Key, maps:get(pattern, Param, undefined), + case maps:get(definition, Param, undefined) of + undefined -> undefined; + Def -> rabbit_data_coercion:to_proplist(Def) + end, + maps:get(priority, Param, undefined), + maps:get('apply-to', Param, <<"all">>), + Username) of + ok -> ok; + {error_string, E} -> S = rabbit_misc:format(" (~s/~s)", [VHost, Key]), + exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S))) + end. + +add_vhost(VHost, ActingUser) -> + VHostName = maps:get(name, VHost, undefined), + VHostTrace = maps:get(tracing, VHost, undefined), + VHostDefinition = maps:get(definition, VHost, undefined), + VHostTags = maps:get(tags, VHost, undefined), + rabbit_vhost:put_vhost(VHostName, VHostDefinition, VHostTags, VHostTrace, ActingUser). + +add_permission(Permission, ActingUser) -> + rabbit_auth_backend_internal:set_permissions(maps:get(user, Permission, undefined), + maps:get(vhost, Permission, undefined), + maps:get(configure, Permission, undefined), + maps:get(write, Permission, undefined), + maps:get(read, Permission, undefined), + ActingUser). + +add_topic_permission(TopicPermission, ActingUser) -> + rabbit_auth_backend_internal:set_topic_permissions( + maps:get(user, TopicPermission, undefined), + maps:get(vhost, TopicPermission, undefined), + maps:get(exchange, TopicPermission, undefined), + maps:get(write, TopicPermission, undefined), + maps:get(read, TopicPermission, undefined), + ActingUser). + +add_queue(Queue, ActingUser) -> + add_queue_int(Queue, r(queue, Queue), ActingUser). + +add_queue(VHost, Queue, ActingUser) -> + add_queue_int(Queue, rv(VHost, queue, Queue), ActingUser). + +add_queue_int(Queue, Name, ActingUser) -> + rabbit_amqqueue:declare(Name, + maps:get(durable, Queue, undefined), + maps:get(auto_delete, Queue, undefined), + args(maps:get(arguments, Queue, undefined)), + none, + ActingUser). + +add_exchange(Exchange, ActingUser) -> + add_exchange_int(Exchange, r(exchange, Exchange), ActingUser). + +add_exchange(VHost, Exchange, ActingUser) -> + add_exchange_int(Exchange, rv(VHost, exchange, Exchange), ActingUser). + +add_exchange_int(Exchange, Name, ActingUser) -> + Internal = case maps:get(internal, Exchange, undefined) of + undefined -> false; %% =< 2.2.0 + I -> I + end, + rabbit_exchange:declare(Name, + rabbit_exchange:check_type(maps:get(type, Exchange, undefined)), + maps:get(durable, Exchange, undefined), + maps:get(auto_delete, Exchange, undefined), + Internal, + args(maps:get(arguments, Exchange, undefined)), + ActingUser). + +add_binding(Binding, ActingUser) -> + DestType = dest_type(Binding), + add_binding_int(Binding, r(exchange, source, Binding), + r(DestType, destination, Binding), ActingUser). + +add_binding(VHost, Binding, ActingUser) -> + DestType = dest_type(Binding), + add_binding_int(Binding, rv(VHost, exchange, source, Binding), + rv(VHost, DestType, destination, Binding), ActingUser). + +add_binding_int(Binding, Source, Destination, ActingUser) -> + rabbit_binding:add( + #binding{source = Source, + destination = Destination, + key = maps:get(routing_key, Binding, undefined), + args = args(maps:get(arguments, Binding, undefined))}, + ActingUser). + +dest_type(Binding) -> + rabbit_data_coercion:to_atom(maps:get(destination_type, Binding, undefined)). + +r(Type, Props) -> r(Type, name, Props). + +r(Type, Name, Props) -> + rabbit_misc:r(maps:get(vhost, Props, undefined), Type, maps:get(Name, Props, undefined)). + +rv(VHost, Type, Props) -> rv(VHost, Type, name, Props). + +rv(VHost, Type, Name, Props) -> + rabbit_misc:r(VHost, Type, maps:get(Name, Props, undefined)). + +%%-------------------------------------------------------------------- + +validate_limits(All) -> + case maps:get(queues, All, undefined) of + undefined -> ok; + Queues0 -> + {ok, VHostMap} = filter_out_existing_queues(Queues0), + maps:fold(fun validate_vhost_limit/3, ok, VHostMap) + end. + +validate_limits(All, VHost) -> + case maps:get(queues, All, undefined) of + undefined -> ok; + Queues0 -> + Queues1 = filter_out_existing_queues(VHost, Queues0), + AddCount = length(Queues1), + validate_vhost_limit(VHost, AddCount, ok) + end. + +filter_out_existing_queues(Queues) -> + build_filtered_map(Queues, maps:new()). + +filter_out_existing_queues(VHost, Queues) -> + Pred = fun(Queue) -> + Rec = rv(VHost, queue, <<"name">>, Queue), + case rabbit_amqqueue:lookup(Rec) of + {ok, _} -> false; + {error, not_found} -> true + end + end, + lists:filter(Pred, Queues). + +build_queue_data(Queue) -> + VHost = maps:get(<<"vhost">>, Queue, undefined), + Rec = rv(VHost, queue, <<"name">>, Queue), + {Rec, VHost}. + +build_filtered_map([], AccMap) -> + {ok, AccMap}; +build_filtered_map([Queue|Rest], AccMap0) -> + {Rec, VHost} = build_queue_data(Queue), + case rabbit_amqqueue:lookup(Rec) of + {error, not_found} -> + AccMap1 = maps_update_with(VHost, fun(V) -> V + 1 end, 1, AccMap0), + build_filtered_map(Rest, AccMap1); + {ok, _} -> + build_filtered_map(Rest, AccMap0) + end. + +%% Copy of maps:with_util/3 from Erlang 20.0.1. +maps_update_with(Key,Fun,Init,Map) when is_function(Fun,1), is_map(Map) -> + case maps:find(Key,Map) of + {ok,Val} -> maps:update(Key,Fun(Val),Map); + error -> maps:put(Key,Init,Map) + end; +maps_update_with(Key,Fun,Init,Map) -> + erlang:error(maps_error_type(Map),[Key,Fun,Init,Map]). + +%% Copy of maps:error_type/1 from Erlang 20.0.1. +maps_error_type(M) when is_map(M) -> badarg; +maps_error_type(V) -> {badmap, V}. + +validate_vhost_limit(VHost, AddCount, ok) -> + WouldExceed = rabbit_vhost_limit:would_exceed_queue_limit(AddCount, VHost), + validate_vhost_queue_limit(VHost, AddCount, WouldExceed). + +validate_vhost_queue_limit(_VHost, 0, _) -> + % Note: not adding any new queues so the upload + % must be update-only + ok; +validate_vhost_queue_limit(_VHost, _AddCount, false) -> + % Note: would not exceed queue limit + ok; +validate_vhost_queue_limit(VHost, AddCount, {true, Limit, QueueCount}) -> + ErrFmt = "Adding ~B queue(s) to virtual host \"~s\" would exceed the limit of ~B queue(s).~n~nThis virtual host currently has ~B queue(s) defined.~n~nImport aborted!", + ErrInfo = [AddCount, VHost, Limit, QueueCount], + ErrMsg = rabbit_misc:format(ErrFmt, ErrInfo), + exit({vhost_limit_exceeded, ErrMsg}). + +atomise_name(N) -> rabbit_data_coercion:to_atom(N). + +get_or_missing(K, L) -> + case maps:get(K, L, undefined) of + undefined -> {key_missing, K}; + V -> V + end. + +args([]) -> args(#{}); +args(L) -> rabbit_misc:to_amqp_table(L). + +%% +%% Export +%% + +list_exchanges() -> + %% exclude internal exchanges, they are not meant to be declared or used by + %% applications + [exchange_definition(X) || X <- lists:filter(fun(#exchange{internal = true}) -> false; + (#exchange{}) -> true + end, + rabbit_exchange:list())]. + +exchange_definition(#exchange{name = #resource{virtual_host = VHost, name = Name}, + type = Type, + durable = Durable, auto_delete = AD, arguments = Args}) -> + #{<<"vhost">> => VHost, + <<"name">> => Name, + <<"type">> => Type, + <<"durable">> => Durable, + <<"auto_delete">> => AD, + <<"arguments">> => Args}. + +list_queues() -> + %% exclude exclusive queues, they cannot be restored + [queue_definition(Q) || Q <- lists:filter(fun(Q0) -> + amqqueue:get_exclusive_owner(Q0) =:= none + end, + rabbit_amqqueue:list())]. + +queue_definition(Q) -> + #resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q), + Type = case amqqueue:get_type(Q) of + rabbit_classic_queue -> classic; + rabbit_quorum_queue -> quorum; + T -> T + end, + Arguments = [{Key, Value} || {Key, _Type, Value} <- amqqueue:get_arguments(Q)], + #{ + <<"vhost">> => VHost, + <<"name">> => Name, + <<"type">> => Type, + <<"durable">> => amqqueue:is_durable(Q), + <<"auto_delete">> => amqqueue:is_auto_delete(Q), + <<"arguments">> => maps:from_list(Arguments) + }. + +list_bindings() -> + [binding_definition(B) || B <- rabbit_binding:list_explicit()]. + +binding_definition(#binding{source = S, + key = RoutingKey, + destination = D, + args = Args}) -> + Arguments = [{Key, Value} || {Key, _Type, Value} <- Args], + #{ + <<"source">> => S#resource.name, + <<"vhost">> => S#resource.virtual_host, + <<"destination">> => D#resource.name, + <<"destination_type">> => D#resource.kind, + <<"routing_key">> => RoutingKey, + <<"arguments">> => maps:from_list(Arguments) + }. + +list_vhosts() -> + [vhost_definition(V) || V <- rabbit_vhost:all()]. + +vhost_definition(VHost) -> + #{ + <<"name">> => vhost:get_name(VHost), + <<"limits">> => vhost:get_limits(VHost), + <<"metadata">> => vhost:get_metadata(VHost) + }. + +list_users() -> + [begin + {ok, User} = rabbit_auth_backend_internal:lookup_user(pget(user, U)), + #{name => User#internal_user.username, + password_hash => base64:encode(User#internal_user.password_hash), + hashing_algorithm => rabbit_auth_backend_internal:hashing_module_for_user(User), + tags => tags_as_binaries(User#internal_user.tags) + } + end || U <- rabbit_auth_backend_internal:list_users()]. + +list_runtime_parameters() -> + [runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list()]. + +runtime_parameter_definition(Param) -> + #{ + <<"vhost">> => pget(vhost, Param), + <<"component">> => pget(component, Param), + <<"name">> => pget(name, Param), + <<"value">> => maps:from_list(pget(value, Param)) + }. + +list_global_runtime_parameters() -> + [global_runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list_global()]. + +global_runtime_parameter_definition(Param) -> + maps:from_list(Param). + +list_policies() -> + [policy_definition(P) || P <- rabbit_policy:list()]. + +policy_definition(Policy) -> + #{ + <<"vhost">> => pget(vhost, Policy), + <<"name">> => pget(name, Policy), + <<"pattern">> => pget(pattern, Policy), + <<"apply-to">> => pget('apply-to', Policy), + <<"priority">> => pget(priority, Policy), + <<"definition">> => maps:from_list(pget(definition, Policy)) + }. + +list_permissions() -> + [permission_definition(P) || P <- rabbit_auth_backend_internal:list_permissions()]. + +permission_definition(P) -> + maps:from_list(P). + +list_topic_permissions() -> + [topic_permission_definition(P) || P <- rabbit_auth_backend_internal:list_topic_permissions()]. + +topic_permission_definition(P) -> + maps:from_list(P). + +tags_as_binaries(Tags) -> + list_to_binary(string:join([atom_to_list(T) || T <- Tags], ",")). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 86c11c4ad2..0d945929af 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -174,7 +174,7 @@ store_ram(X) -> (binary()) -> atom() | rabbit_types:connection_exit(). check_type(TypeBin) -> - case rabbit_registry:binary_to_type(TypeBin) of + case rabbit_registry:binary_to_type(rabbit_data_coercion:to_binary(TypeBin)) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "unknown exchange type '~s'", [TypeBin]); diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index ff07bfa8ee..78157072e3 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -43,7 +43,7 @@ -export([register/0]). -export([invalidate/0, recover/0]). --export([name/1, name_op/1, effective_definition/1, get/2, get_arg/3, set/1]). +-export([name/1, name_op/1, effective_definition/1, merge_operator_definitions/2, get/2, get_arg/3, set/1]). -export([validate/5, notify/5, notify_clear/4]). -export([parse_set/7, set/7, delete/3, lookup/2, list/0, list/1, list_formatted/1, list_formatted/3, info_keys/0]). @@ -76,23 +76,23 @@ name0(Policy) -> pget(name, Policy). effective_definition(Q) when ?is_amqqueue(Q) -> Policy = amqqueue:get_policy(Q), OpPolicy = amqqueue:get_operator_policy(Q), - effective_definition0(Policy, OpPolicy); + merge_operator_definitions(Policy, OpPolicy); effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) -> - effective_definition0(Policy, OpPolicy). - -effective_definition0(undefined, undefined) -> undefined; -effective_definition0(Policy, undefined) -> pget(definition, Policy); -effective_definition0(undefined, OpPolicy) -> pget(definition, OpPolicy); -effective_definition0(Policy, OpPolicy) -> - OpDefinition = pget(definition, OpPolicy, []), - Definition = pget(definition, Policy, []), - {Keys, _} = lists:unzip(Definition), - {OpKeys, _} = lists:unzip(OpDefinition), + merge_operator_definitions(Policy, OpPolicy). + +merge_operator_definitions(undefined, undefined) -> undefined; +merge_operator_definitions(Policy, undefined) -> pget(definition, Policy); +merge_operator_definitions(undefined, OpPolicy) -> pget(definition, OpPolicy); +merge_operator_definitions(Policy, OpPolicy) -> + OpDefinition = rabbit_data_coercion:to_map(pget(definition, OpPolicy, [])), + Definition = rabbit_data_coercion:to_map(pget(definition, Policy, [])), + Keys = maps:keys(Definition), + OpKeys = maps:keys(OpDefinition), lists:map(fun(Key) -> - case {pget(Key, Definition), pget(Key, OpDefinition)} of - {Val, undefined} -> {Key, Val}; - {undefined, Val} -> {Key, Val}; - {Val, OpVal} -> {Key, merge_policy_value(Key, Val, OpVal)} + case {maps:get(Key, Definition, undefined), maps:get(Key, OpDefinition, undefined)} of + {Val, undefined} -> {Key, Val}; + {undefined, OpVal} -> {Key, OpVal}; + {Val, OpVal} -> {Key, merge_policy_value(Key, Val, OpVal)} end end, lists:umerge(Keys, OpKeys)). @@ -142,11 +142,11 @@ get0(Name, Policy, OpPolicy) -> merge_policy_value(Name, PolicyVal, OpVal) -> case policy_merge_strategy(Name) of {ok, Module} -> Module:merge_policy_value(Name, PolicyVal, OpVal); - {error, not_found} -> PolicyVal + {error, not_found} -> rabbit_policies:merge_policy_value(Name, PolicyVal, OpVal) end. policy_merge_strategy(Name) -> - case rabbit_registry:binary_to_type(Name) of + case rabbit_registry:binary_to_type(rabbit_data_coercion:to_binary(Name)) of {error, not_found} -> {error, not_found}; T -> diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 95758cec7f..dbc3d6344a 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -27,6 +27,7 @@ -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). -export([delete_storage/1]). -export([vhost_down/1]). +-export([put_vhost/5]). %% %% API @@ -96,7 +97,12 @@ add(Name, Description, Tags, ActingUser) -> end. do_add(Name, Description, Tags, ActingUser) -> - rabbit_log:info("Adding vhost '~s' (description: '~s')", [Name, Description]), + case Description of + undefined -> + rabbit_log:info("Adding vhost '~s' without a description", [Name]); + Value -> + rabbit_log:info("Adding vhost '~s' (description: '~s')", [Name, Value]) + end, VHost = rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_vhost, Name}) of @@ -171,6 +177,48 @@ delete(VHost, ActingUser) -> rabbit_vhost_sup_sup:delete_on_all_nodes(VHost), ok. +put_vhost(Name, Description, Tags0, Trace, Username) -> + Tags = case Tags0 of + undefined -> <<"">>; + null -> <<"">>; + "undefined" -> <<"">>; + "null" -> <<"">>; + Other -> Other + end, + Result = case exists(Name) of + true -> ok; + false -> add(Name, Description, parse_tags(Tags), Username), + %% wait for up to 45 seconds for the vhost to initialise + %% on all nodes + case await_running_on_all_nodes(Name, 45000) of + ok -> + maybe_grant_full_permissions(Name, Username); + {error, timeout} -> + {error, timeout} + end + end, + case Trace of + true -> rabbit_trace:start(Name); + false -> rabbit_trace:stop(Name); + undefined -> ok + end, + Result. + +%% when definitions are loaded on boot, Username here will be ?INTERNAL_USER, +%% which does not actually exist +maybe_grant_full_permissions(_Name, ?INTERNAL_USER) -> + ok; +maybe_grant_full_permissions(Name, Username) -> + U = rabbit_auth_backend_internal:lookup_user(Username), + maybe_grant_full_permissions(U, Name, Username). + +maybe_grant_full_permissions({ok, _}, Name, Username) -> + rabbit_auth_backend_internal:set_permissions( + Username, Name, <<".*">>, <<".*">>, <<".*">>, Username); +maybe_grant_full_permissions(_, _Name, _Username) -> + ok. + + %% 50 ms -define(AWAIT_SAMPLE_INTERVAL, 50). |
