summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl9
-rw-r--r--src/rabbit_auth_backend_internal.erl152
-rw-r--r--src/rabbit_binding.erl20
-rw-r--r--src/rabbit_definitions.erl674
-rw-r--r--src/rabbit_exchange.erl2
-rw-r--r--src/rabbit_policy.erl36
-rw-r--r--src/rabbit_vhost.erl50
7 files changed, 919 insertions, 24 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 87ceb5d701..e4d2ee9808 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -187,6 +187,15 @@
{requires, [core_initialized]},
{enables, routing_ready}]}).
+%% We want to A) make sure we apply definitions before the node begins serving
+%% traffic and B) in fact do it before empty_db_check (so the defaults will not
+%% get created if we don't need 'em).
+-rabbit_boot_step({load_core_definitions,
+ [{description, "imports definitions"},
+ {mfa, {rabbit_definitions, maybe_load_definitions, []}},
+ {requires, recovery},
+ {enables, empty_db_check}]}).
+
-rabbit_boot_step({empty_db_check,
[{description, "empty DB check"},
{mfa, {?MODULE, maybe_insert_default_data, []}},
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index e675ad188b..09392bf7b8 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -28,7 +28,7 @@
hash_password/2, change_password_hash/2, change_password_hash/3,
set_tags/3, set_permissions/6, clear_permissions/3,
set_topic_permissions/6, clear_topic_permissions/3, clear_topic_permissions/4,
- add_user_sans_validation/3]).
+ add_user_sans_validation/3, put_user/2, put_user/3]).
-export([user_info_keys/0, perms_info_keys/0,
user_perms_info_keys/0, vhost_perms_info_keys/0,
@@ -471,6 +471,138 @@ clear_topic_permissions(Username, VHostPath, Exchange, ActingUser) ->
{user_who_performed_action, ActingUser}]),
R.
+put_user(User, ActingUser) -> put_user(User, undefined, ActingUser).
+
+put_user(User, Version, ActingUser) ->
+ Username = maps:get(name, User),
+ HasPassword = maps:is_key(password, User),
+ HasPasswordHash = maps:is_key(password_hash, User),
+ Password = maps:get(password, User, undefined),
+ PasswordHash = maps:get(password_hash, User, undefined),
+
+ Tags = case {maps:get(tags, User, undefined), maps:get(administrator, User, undefined)} of
+ {undefined, undefined} ->
+ throw({error, tags_not_present});
+ {undefined, AdminS} ->
+ case rabbit_mgmt_util:parse_bool(AdminS) of
+ true -> [administrator];
+ false -> []
+ end;
+ {TagsS, _} ->
+ [list_to_atom(string:strip(T)) ||
+ T <- string:tokens(binary_to_list(TagsS), ",")]
+ end,
+
+ UserExists = case rabbit_auth_backend_internal:lookup_user(Username) of
+ %% expected
+ {error, not_found} -> false;
+ %% shouldn't normally happen but worth guarding
+ %% against
+ {error, _} -> false;
+ _ -> true
+ end,
+
+ %% pre-configured, only applies to newly created users
+ Permissions = maps:get(permissions, User, undefined),
+
+ PassedCredentialValidation =
+ case {HasPassword, HasPasswordHash} of
+ {true, false} ->
+ rabbit_credential_validation:validate(Username, Password) =:= ok;
+ {false, true} -> true;
+ _ ->
+ rabbit_credential_validation:validate(Username, Password) =:= ok
+ end,
+
+ case UserExists of
+ true ->
+ case {HasPassword, HasPasswordHash} of
+ {true, false} ->
+ update_user_password(PassedCredentialValidation, Username, Password, Tags, ActingUser);
+ {false, true} ->
+ update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser);
+ {true, true} ->
+ throw({error, both_password_and_password_hash_are_provided});
+ %% clear password, update tags if needed
+ _ ->
+ rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser),
+ rabbit_auth_backend_internal:clear_password(Username, ActingUser)
+ end;
+ false ->
+ case {HasPassword, HasPasswordHash} of
+ {true, false} ->
+ create_user_with_password(PassedCredentialValidation, Username, Password, Tags, Permissions, ActingUser);
+ {false, true} ->
+ create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, Permissions, ActingUser);
+ {true, true} ->
+ throw({error, both_password_and_password_hash_are_provided});
+ {false, false} ->
+ %% this user won't be able to sign in using
+ %% a username/password pair but can be used for x509 certificate authentication,
+ %% with authn backends such as HTTP or LDAP and so on.
+ create_user_with_password(PassedCredentialValidation, Username, <<"">>, Tags, Permissions, ActingUser)
+ end
+ end.
+
+update_user_password(_PassedCredentialValidation = true, Username, Password, Tags, ActingUser) ->
+ rabbit_auth_backend_internal:change_password(Username, Password, ActingUser),
+ rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser);
+update_user_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _ActingUser) ->
+ %% we don't log here because
+ %% rabbit_auth_backend_internal will do it
+ throw({error, credential_validation_failed}).
+
+update_user_password_hash(Username, PasswordHash, Tags, User, Version, ActingUser) ->
+ %% when a hash this provided, credential validation
+ %% is not applied
+ HashingAlgorithm = hashing_algorithm(User, Version),
+
+ Hash = rabbit_misc:b64decode_or_throw(PasswordHash),
+ rabbit_auth_backend_internal:change_password_hash(
+ Username, Hash, HashingAlgorithm),
+ rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser).
+
+create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, undefined, ActingUser) ->
+ rabbit_auth_backend_internal:add_user(Username, Password, ActingUser),
+ rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser);
+create_user_with_password(_PassedCredentialValidation = true, Username, Password, Tags, PreconfiguredPermissions, ActingUser) ->
+ rabbit_auth_backend_internal:add_user(Username, Password, ActingUser),
+ rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser),
+ preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser);
+create_user_with_password(_PassedCredentialValidation = false, _Username, _Password, _Tags, _, _) ->
+ %% we don't log here because
+ %% rabbit_auth_backend_internal will do it
+ throw({error, credential_validation_failed}).
+
+create_user_with_password_hash(Username, PasswordHash, Tags, User, Version, PreconfiguredPermissions, ActingUser) ->
+ %% when a hash this provided, credential validation
+ %% is not applied
+ HashingAlgorithm = hashing_algorithm(User, Version),
+ Hash = rabbit_misc:b64decode_or_throw(PasswordHash),
+
+ %% first we create a user with dummy credentials and no
+ %% validation applied, then we update password hash
+ TmpPassword = rabbit_guid:binary(rabbit_guid:gen_secure(), "tmp"),
+ rabbit_auth_backend_internal:add_user_sans_validation(Username, TmpPassword, ActingUser),
+
+ rabbit_auth_backend_internal:change_password_hash(
+ Username, Hash, HashingAlgorithm),
+ rabbit_auth_backend_internal:set_tags(Username, Tags, ActingUser),
+ preconfigure_permissions(Username, PreconfiguredPermissions, ActingUser).
+
+preconfigure_permissions(_Username, undefined, _ActingUser) ->
+ ok;
+preconfigure_permissions(Username, Map, ActingUser) when is_map(Map) ->
+ maps:map(fun(VHost, M) ->
+ rabbit_auth_backend_internal:set_permissions(Username, VHost,
+ maps:get(<<"configure">>, M),
+ maps:get(<<"write">>, M),
+ maps:get(<<"read">>, M),
+ ActingUser)
+ end,
+ Map),
+ ok.
+
%%----------------------------------------------------------------------------
%% Listing
@@ -649,3 +781,21 @@ extract_topic_permission_params(Keys, #topic_permission{
{exchange, Exchange},
{write, WritePerm},
{read, ReadPerm}]).
+
+hashing_algorithm(User, Version) ->
+ case maps:get(hashing_algorithm, User, undefined) of
+ undefined ->
+ case Version of
+ %% 3.6.1 and later versions are supposed to have
+ %% the algorithm exported and thus not need a default
+ <<"3.6.0">> -> rabbit_password_hashing_sha256;
+ <<"3.5.", _/binary>> -> rabbit_password_hashing_md5;
+ <<"3.4.", _/binary>> -> rabbit_password_hashing_md5;
+ <<"3.3.", _/binary>> -> rabbit_password_hashing_md5;
+ <<"3.2.", _/binary>> -> rabbit_password_hashing_md5;
+ <<"3.1.", _/binary>> -> rabbit_password_hashing_md5;
+ <<"3.0.", _/binary>> -> rabbit_password_hashing_md5;
+ _ -> rabbit_password:hashing_mod()
+ end;
+ Alg -> rabbit_data_coercion:to_atom(Alg, utf8)
+ end.
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 94b8870192..881374a4aa 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -18,9 +18,9 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("amqqueue.hrl").
--export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/3, list/1]).
--export([list_for_source/1, list_for_destination/1,
- list_for_source_and_destination/2]).
+-export([recover/0, recover/2, exists/1, add/2, add/3, remove/1, remove/3]).
+-export([list/1, list_for_source/1, list_for_destination/1,
+ list_for_source_and_destination/2, list_explicit/0]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
process_deletions/2]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).
@@ -236,6 +236,20 @@ remove_default_exchange_binding_rows_of(Dst = #resource{}) ->
end,
ok.
+-spec list_explicit() -> bindings().
+
+list_explicit() ->
+ mnesia:async_dirty(
+ fun () ->
+ AllRoutes = mnesia:dirty_match_object(rabbit_route, #route{_ = '_'}),
+ %% if there are any default exchange bindings left after an upgrade
+ %% of a pre-3.8 database, filter them out
+ AllBindings = [B || #route{binding = B} <- AllRoutes],
+ lists:filter(fun(#binding{source = S}) ->
+ not (S#resource.kind =:= exchange andalso S#resource.name =:= <<>>)
+ end, AllBindings)
+ end).
+
-spec list(rabbit_types:vhost()) -> bindings().
list(VHostPath) ->
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl
new file mode 100644
index 0000000000..260284d19c
--- /dev/null
+++ b/src/rabbit_definitions.erl
@@ -0,0 +1,674 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_definitions).
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+%% automatic import on boot
+-export([maybe_load_definitions/0, maybe_load_definitions_from/2]).
+%% import
+-export([import_raw/1, import_raw/2, import_parsed/1, import_parsed/2,
+ apply_defs/2, apply_defs/3, apply_defs/4, apply_defs/5]).
+%% export
+-export([all_definitions/0]).
+-export([decode/1, decode/2, args/1]).
+
+-import(rabbit_misc, [pget/2]).
+
+%%
+%% API
+%%
+
+maybe_load_definitions() ->
+ %% this feature was a part of rabbitmq-management for a long time,
+ %% so we check rabbit_management.load_definitions for backward compatibility.
+ maybe_load_management_definitions(),
+ %% this backs "core" load_definitions
+ maybe_load_core_definitions().
+
+maybe_load_core_definitions() ->
+ maybe_load_definitions(rabbit, load_definitions).
+
+maybe_load_management_definitions() ->
+ maybe_load_definitions(rabbitmq_management, load_definitions).
+
+-spec import_raw(Body :: binary() | iolist()) -> ok | {error, term()}.
+import_raw(Body) ->
+ rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]),
+ case decode([], Body) of
+ {error, E} -> {error, E};
+ {ok, _, Map} -> apply_defs(Map, ?INTERNAL_USER)
+ end.
+
+-spec import_raw(Body :: binary() | iolist(), VHost :: vhost:name()) -> ok | {error, term()}.
+import_raw(Body, VHost) ->
+ rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]),
+ case decode([], Body) of
+ {error, E} -> {error, E};
+ {ok, _, Map} -> apply_defs(Map, ?INTERNAL_USER, fun() -> ok end, VHost)
+ end.
+
+-spec import_parsed(Defs :: #{any() => any()} | list()) -> ok | {error, term()}.
+import_parsed(Body0) when is_list(Body0) ->
+ import_parsed(maps:from_list(Body0));
+import_parsed(Body0) when is_map(Body0) ->
+ rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]),
+ Body = atomise_map_keys(Body0),
+ apply_defs(Body, ?INTERNAL_USER).
+
+-spec import_parsed(Defs :: #{any() => any() | list()}, VHost :: vhost:name()) -> ok | {error, term()}.
+import_parsed(Body0, VHost) when is_list(Body0) ->
+ import_parsed(maps:from_list(Body0), VHost);
+import_parsed(Body0, VHost) ->
+ rabbit_log:info("Asked to import definitions. Acting user: ~s", [?INTERNAL_USER]),
+ Body = atomise_map_keys(Body0),
+ apply_defs(Body, ?INTERNAL_USER, fun() -> ok end, VHost).
+
+-spec all_definitions() -> map().
+all_definitions() ->
+ Xs = list_exchanges(),
+ Qs = list_queues(),
+ Bs = list_bindings(),
+
+ Users = list_users(),
+ VHosts = list_vhosts(),
+ Params = list_runtime_parameters(),
+ GParams = list_global_runtime_parameters(),
+ Pols = list_policies(),
+
+ Perms = list_permissions(),
+ TPerms = list_topic_permissions(),
+
+ {ok, Vsn} = application:get_key(rabbit, vsn),
+ #{
+ rabbit_version => rabbit_data_coercion:to_binary(Vsn),
+ rabbitmq_version => rabbit_data_coercion:to_binary(Vsn),
+ users => Users,
+ vhosts => VHosts,
+ permissions => Perms,
+ topic_permissions => TPerms,
+ parameters => Params,
+ global_parameters => GParams,
+ policies => Pols,
+ queues => Qs,
+ bindings => Bs,
+ exchanges => Xs
+ }.
+
+%%
+%% Implementation
+%%
+
+maybe_load_definitions(App, Key) ->
+ case application:get_env(App, Key) of
+ undefined -> ok;
+ {ok, none} -> ok;
+ {ok, FileOrDir} ->
+ IsDir = filelib:is_dir(FileOrDir),
+ maybe_load_definitions_from(IsDir, FileOrDir)
+ end.
+
+maybe_load_definitions_from(true, Dir) ->
+ rabbit_log:info("Applying definitions from directory ~s", [Dir]),
+ load_definitions_from_files(file:list_dir(Dir), Dir);
+maybe_load_definitions_from(false, File) ->
+ load_definitions_from_file(File).
+
+load_definitions_from_files({ok, Filenames0}, Dir) ->
+ Filenames1 = lists:sort(Filenames0),
+ Filenames2 = [filename:join(Dir, F) || F <- Filenames1],
+ load_definitions_from_filenames(Filenames2);
+load_definitions_from_files({error, E}, Dir) ->
+ rabbit_log:error("Could not read definitions from directory ~s, Error: ~p", [Dir, E]),
+ {error, {could_not_read_defs, E}}.
+
+load_definitions_from_filenames([]) ->
+ ok;
+load_definitions_from_filenames([File|Rest]) ->
+ case load_definitions_from_file(File) of
+ ok -> load_definitions_from_filenames(Rest);
+ {error, E} -> {error, {failed_to_import_definitions, File, E}}
+ end.
+
+load_definitions_from_file(File) ->
+ case file:read_file(File) of
+ {ok, Body} ->
+ rabbit_log:info("Applying definitions from ~s", [File]),
+ import_raw(Body);
+ {error, E} ->
+ rabbit_log:error("Could not read definitions from ~s, Error: ~p", [File, E]),
+ {error, {could_not_read_defs, {File, E}}}
+ end.
+
+decode(Keys, Body) ->
+ case decode(Body) of
+ {ok, J0} ->
+ J = maps:fold(fun(K, V, Acc) ->
+ Acc#{rabbit_data_coercion:to_atom(K, utf8) => V}
+ end, J0, J0),
+ Results = [get_or_missing(K, J) || K <- Keys],
+ case [E || E = {key_missing, _} <- Results] of
+ [] -> {ok, Results, J};
+ Errors -> {error, Errors}
+ end;
+ Else -> Else
+ end.
+
+decode(<<"">>) ->
+ {ok, #{}};
+decode(Body) ->
+ try
+ Decoded = rabbit_json:decode(Body),
+ Normalised = atomise_map_keys(Decoded),
+ {ok, Normalised}
+ catch error:_ -> {error, not_json}
+ end.
+
+atomise_map_keys(Decoded) ->
+ maps:fold(fun(K, V, Acc) ->
+ Acc#{rabbit_data_coercion:to_atom(K, utf8) => V}
+ end, Decoded, Decoded).
+
+-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username()) -> 'ok'.
+
+apply_defs(Map, ActingUser) ->
+ apply_defs(Map, ActingUser, fun () -> ok end).
+
+-spec apply_defs(Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(),
+ SuccessFun :: fun(() -> 'ok')) -> 'ok';
+ (Map :: #{atom() => any()}, ActingUser :: rabbit_types:username(),
+ VHost :: vhost:name()) -> 'ok'.
+
+apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
+ apply_defs(Map, ActingUser, fun () -> ok end, VHost);
+
+apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
+ Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),
+ try
+ rabbit_log:info("Importing users..."),
+ for_all(users, ActingUser, Map,
+ fun(User, _Username) ->
+ rabbit_auth_backend_internal:put_user(User, Version, ActingUser)
+ end),
+ rabbit_log:info("Importing vhosts..."),
+ for_all(vhosts, ActingUser, Map, fun add_vhost/2),
+ validate_limits(Map),
+ rabbit_log:info("Importing user permissions..."),
+ for_all(permissions, ActingUser, Map, fun add_permission/2),
+ rabbit_log:info("Importing topic permissions..."),
+ for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
+ rabbit_log:info("Importing parameters..."),
+ for_all(parameters, ActingUser, Map, fun add_parameter/2),
+ rabbit_log:info("Importing global parameters..."),
+ for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
+ rabbit_log:info("Importing policies..."),
+ for_all(policies, ActingUser, Map, fun add_policy/2),
+ rabbit_log:info("Importing queues..."),
+ for_all(queues, ActingUser, Map, fun add_queue/2),
+ rabbit_log:info("Importing exchanges..."),
+ for_all(exchanges, ActingUser, Map, fun add_exchange/2),
+ rabbit_log:info("Importing bindings..."),
+ for_all(bindings, ActingUser, Map, fun add_binding/2),
+ SuccessFun(),
+ ok
+ catch {error, E} -> {error, E};
+ exit:E -> {error, E}
+ end.
+
+-spec apply_defs(Map :: #{atom() => any()},
+ ActingUser :: rabbit_types:username(),
+ SuccessFun :: fun(() -> 'ok'),
+ VHost :: vhost:name()) -> 'ok'.
+
+apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
+ rabbit_log:info("Asked to import definitions for a virtual host. Virtual host: ~p, acting user: ~p",
+ [VHost, ActingUser]),
+ try
+ validate_limits(Map, VHost),
+ rabbit_log:info("Importing parameters..."),
+ for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ rabbit_log:info("Importing policies..."),
+ for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ rabbit_log:info("Importing queues..."),
+ for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ rabbit_log:info("Importing exchanges..."),
+ for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ rabbit_log:info("Importing bindings..."),
+ for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ SuccessFun()
+ catch {error, E} -> {error, format(E)};
+ exit:E -> {error, format(E)}
+ end.
+
+-spec apply_defs(Map :: #{atom() => any()},
+ ActingUser :: rabbit_types:username(),
+ SuccessFun :: fun(() -> 'ok'),
+ ErrorFun :: fun((any()) -> 'ok'),
+ VHost :: vhost:name()) -> 'ok'.
+
+apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
+ rabbit_log:info("Asked to import definitions for a virtual host. Virtual host: ~p, acting user: ~p",
+ [VHost, ActingUser]),
+ try
+ validate_limits(Map, VHost),
+ rabbit_log:info("Importing parameters..."),
+ for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ rabbit_log:info("Importing policies..."),
+ for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ rabbit_log:info("Importing queues..."),
+ for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ rabbit_log:info("Importing exchanges..."),
+ for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ rabbit_log:info("Importing bindings..."),
+ for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ SuccessFun()
+ catch {error, E} -> ErrorFun(format(E));
+ exit:E -> ErrorFun(format(E))
+ end.
+
+for_all(Name, ActingUser, Definitions, Fun) ->
+ case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
+ undefined -> ok;
+ List -> [Fun(maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]),
+ ActingUser) ||
+ M <- List, is_map(M)]
+ end.
+
+for_all(Name, ActingUser, Definitions, VHost, Fun) ->
+
+ case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
+ undefined -> ok;
+ List -> [Fun(VHost, maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]),
+ ActingUser) ||
+ M <- List, is_map(M)]
+ end.
+
+format(#amqp_error{name = Name, explanation = Explanation}) ->
+ rabbit_data_coercion:to_binary(rabbit_misc:format("~s: ~s", [Name, Explanation]));
+format({no_such_vhost, undefined}) ->
+ rabbit_data_coercion:to_binary(
+ "Virtual host does not exist and is not specified in definitions file.");
+format({no_such_vhost, VHost}) ->
+ rabbit_data_coercion:to_binary(
+ rabbit_misc:format("Please create virtual host \"~s\" prior to importing definitions.",
+ [VHost]));
+format({vhost_limit_exceeded, ErrMsg}) ->
+ rabbit_data_coercion:to_binary(ErrMsg);
+format(E) ->
+ rabbit_data_coercion:to_binary(rabbit_misc:format("~p", [E])).
+
+add_parameter(Param, Username) ->
+ VHost = maps:get(vhost, Param, undefined),
+ add_parameter(VHost, Param, Username).
+
+add_parameter(VHost, Param, Username) ->
+ Comp = maps:get(component, Param, undefined),
+ Key = maps:get(name, Param, undefined),
+ Term = maps:get(value, Param, undefined),
+ Result = case is_map(Term) of
+ true ->
+ %% coerce maps to proplists for backwards compatibility.
+ %% See rabbitmq-management#528.
+ TermProplist = rabbit_data_coercion:to_proplist(Term),
+ rabbit_runtime_parameters:set(VHost, Comp, Key, TermProplist, Username);
+ _ ->
+ rabbit_runtime_parameters:set(VHost, Comp, Key, Term, Username)
+ end,
+ case Result of
+ ok -> ok;
+ {error_string, E} ->
+ S = rabbit_misc:format(" (~s/~s/~s)", [VHost, Comp, Key]),
+ exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S)))
+ end.
+
+add_global_parameter(Param, Username) ->
+ Key = maps:get(name, Param, undefined),
+ Term = maps:get(value, Param, undefined),
+ case is_map(Term) of
+ true ->
+ %% coerce maps to proplists for backwards compatibility.
+ %% See rabbitmq-management#528.
+ TermProplist = rabbit_data_coercion:to_proplist(Term),
+ rabbit_runtime_parameters:set_global(Key, TermProplist, Username);
+ _ ->
+ rabbit_runtime_parameters:set_global(Key, Term, Username)
+ end.
+
+add_policy(Param, Username) ->
+ VHost = maps:get(vhost, Param, undefined),
+ add_policy(VHost, Param, Username).
+
+add_policy(VHost, Param, Username) ->
+ Key = maps:get(name, Param, undefined),
+ case rabbit_policy:set(
+ VHost, Key, maps:get(pattern, Param, undefined),
+ case maps:get(definition, Param, undefined) of
+ undefined -> undefined;
+ Def -> rabbit_data_coercion:to_proplist(Def)
+ end,
+ maps:get(priority, Param, undefined),
+ maps:get('apply-to', Param, <<"all">>),
+ Username) of
+ ok -> ok;
+ {error_string, E} -> S = rabbit_misc:format(" (~s/~s)", [VHost, Key]),
+ exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S)))
+ end.
+
+add_vhost(VHost, ActingUser) ->
+ VHostName = maps:get(name, VHost, undefined),
+ VHostTrace = maps:get(tracing, VHost, undefined),
+ VHostDefinition = maps:get(definition, VHost, undefined),
+ VHostTags = maps:get(tags, VHost, undefined),
+ rabbit_vhost:put_vhost(VHostName, VHostDefinition, VHostTags, VHostTrace, ActingUser).
+
+add_permission(Permission, ActingUser) ->
+ rabbit_auth_backend_internal:set_permissions(maps:get(user, Permission, undefined),
+ maps:get(vhost, Permission, undefined),
+ maps:get(configure, Permission, undefined),
+ maps:get(write, Permission, undefined),
+ maps:get(read, Permission, undefined),
+ ActingUser).
+
+add_topic_permission(TopicPermission, ActingUser) ->
+ rabbit_auth_backend_internal:set_topic_permissions(
+ maps:get(user, TopicPermission, undefined),
+ maps:get(vhost, TopicPermission, undefined),
+ maps:get(exchange, TopicPermission, undefined),
+ maps:get(write, TopicPermission, undefined),
+ maps:get(read, TopicPermission, undefined),
+ ActingUser).
+
+add_queue(Queue, ActingUser) ->
+ add_queue_int(Queue, r(queue, Queue), ActingUser).
+
+add_queue(VHost, Queue, ActingUser) ->
+ add_queue_int(Queue, rv(VHost, queue, Queue), ActingUser).
+
+add_queue_int(Queue, Name, ActingUser) ->
+ rabbit_amqqueue:declare(Name,
+ maps:get(durable, Queue, undefined),
+ maps:get(auto_delete, Queue, undefined),
+ args(maps:get(arguments, Queue, undefined)),
+ none,
+ ActingUser).
+
+add_exchange(Exchange, ActingUser) ->
+ add_exchange_int(Exchange, r(exchange, Exchange), ActingUser).
+
+add_exchange(VHost, Exchange, ActingUser) ->
+ add_exchange_int(Exchange, rv(VHost, exchange, Exchange), ActingUser).
+
+add_exchange_int(Exchange, Name, ActingUser) ->
+ Internal = case maps:get(internal, Exchange, undefined) of
+ undefined -> false; %% =< 2.2.0
+ I -> I
+ end,
+ rabbit_exchange:declare(Name,
+ rabbit_exchange:check_type(maps:get(type, Exchange, undefined)),
+ maps:get(durable, Exchange, undefined),
+ maps:get(auto_delete, Exchange, undefined),
+ Internal,
+ args(maps:get(arguments, Exchange, undefined)),
+ ActingUser).
+
+add_binding(Binding, ActingUser) ->
+ DestType = dest_type(Binding),
+ add_binding_int(Binding, r(exchange, source, Binding),
+ r(DestType, destination, Binding), ActingUser).
+
+add_binding(VHost, Binding, ActingUser) ->
+ DestType = dest_type(Binding),
+ add_binding_int(Binding, rv(VHost, exchange, source, Binding),
+ rv(VHost, DestType, destination, Binding), ActingUser).
+
+add_binding_int(Binding, Source, Destination, ActingUser) ->
+ rabbit_binding:add(
+ #binding{source = Source,
+ destination = Destination,
+ key = maps:get(routing_key, Binding, undefined),
+ args = args(maps:get(arguments, Binding, undefined))},
+ ActingUser).
+
+dest_type(Binding) ->
+ rabbit_data_coercion:to_atom(maps:get(destination_type, Binding, undefined)).
+
+r(Type, Props) -> r(Type, name, Props).
+
+r(Type, Name, Props) ->
+ rabbit_misc:r(maps:get(vhost, Props, undefined), Type, maps:get(Name, Props, undefined)).
+
+rv(VHost, Type, Props) -> rv(VHost, Type, name, Props).
+
+rv(VHost, Type, Name, Props) ->
+ rabbit_misc:r(VHost, Type, maps:get(Name, Props, undefined)).
+
+%%--------------------------------------------------------------------
+
+validate_limits(All) ->
+ case maps:get(queues, All, undefined) of
+ undefined -> ok;
+ Queues0 ->
+ {ok, VHostMap} = filter_out_existing_queues(Queues0),
+ maps:fold(fun validate_vhost_limit/3, ok, VHostMap)
+ end.
+
+validate_limits(All, VHost) ->
+ case maps:get(queues, All, undefined) of
+ undefined -> ok;
+ Queues0 ->
+ Queues1 = filter_out_existing_queues(VHost, Queues0),
+ AddCount = length(Queues1),
+ validate_vhost_limit(VHost, AddCount, ok)
+ end.
+
+filter_out_existing_queues(Queues) ->
+ build_filtered_map(Queues, maps:new()).
+
+filter_out_existing_queues(VHost, Queues) ->
+ Pred = fun(Queue) ->
+ Rec = rv(VHost, queue, <<"name">>, Queue),
+ case rabbit_amqqueue:lookup(Rec) of
+ {ok, _} -> false;
+ {error, not_found} -> true
+ end
+ end,
+ lists:filter(Pred, Queues).
+
+build_queue_data(Queue) ->
+ VHost = maps:get(<<"vhost">>, Queue, undefined),
+ Rec = rv(VHost, queue, <<"name">>, Queue),
+ {Rec, VHost}.
+
+build_filtered_map([], AccMap) ->
+ {ok, AccMap};
+build_filtered_map([Queue|Rest], AccMap0) ->
+ {Rec, VHost} = build_queue_data(Queue),
+ case rabbit_amqqueue:lookup(Rec) of
+ {error, not_found} ->
+ AccMap1 = maps_update_with(VHost, fun(V) -> V + 1 end, 1, AccMap0),
+ build_filtered_map(Rest, AccMap1);
+ {ok, _} ->
+ build_filtered_map(Rest, AccMap0)
+ end.
+
+%% Copy of maps:with_util/3 from Erlang 20.0.1.
+maps_update_with(Key,Fun,Init,Map) when is_function(Fun,1), is_map(Map) ->
+ case maps:find(Key,Map) of
+ {ok,Val} -> maps:update(Key,Fun(Val),Map);
+ error -> maps:put(Key,Init,Map)
+ end;
+maps_update_with(Key,Fun,Init,Map) ->
+ erlang:error(maps_error_type(Map),[Key,Fun,Init,Map]).
+
+%% Copy of maps:error_type/1 from Erlang 20.0.1.
+maps_error_type(M) when is_map(M) -> badarg;
+maps_error_type(V) -> {badmap, V}.
+
+validate_vhost_limit(VHost, AddCount, ok) ->
+ WouldExceed = rabbit_vhost_limit:would_exceed_queue_limit(AddCount, VHost),
+ validate_vhost_queue_limit(VHost, AddCount, WouldExceed).
+
+validate_vhost_queue_limit(_VHost, 0, _) ->
+ % Note: not adding any new queues so the upload
+ % must be update-only
+ ok;
+validate_vhost_queue_limit(_VHost, _AddCount, false) ->
+ % Note: would not exceed queue limit
+ ok;
+validate_vhost_queue_limit(VHost, AddCount, {true, Limit, QueueCount}) ->
+ ErrFmt = "Adding ~B queue(s) to virtual host \"~s\" would exceed the limit of ~B queue(s).~n~nThis virtual host currently has ~B queue(s) defined.~n~nImport aborted!",
+ ErrInfo = [AddCount, VHost, Limit, QueueCount],
+ ErrMsg = rabbit_misc:format(ErrFmt, ErrInfo),
+ exit({vhost_limit_exceeded, ErrMsg}).
+
+atomise_name(N) -> rabbit_data_coercion:to_atom(N).
+
+get_or_missing(K, L) ->
+ case maps:get(K, L, undefined) of
+ undefined -> {key_missing, K};
+ V -> V
+ end.
+
+args([]) -> args(#{});
+args(L) -> rabbit_misc:to_amqp_table(L).
+
+%%
+%% Export
+%%
+
+list_exchanges() ->
+ %% exclude internal exchanges, they are not meant to be declared or used by
+ %% applications
+ [exchange_definition(X) || X <- lists:filter(fun(#exchange{internal = true}) -> false;
+ (#exchange{}) -> true
+ end,
+ rabbit_exchange:list())].
+
+exchange_definition(#exchange{name = #resource{virtual_host = VHost, name = Name},
+ type = Type,
+ durable = Durable, auto_delete = AD, arguments = Args}) ->
+ #{<<"vhost">> => VHost,
+ <<"name">> => Name,
+ <<"type">> => Type,
+ <<"durable">> => Durable,
+ <<"auto_delete">> => AD,
+ <<"arguments">> => Args}.
+
+list_queues() ->
+ %% exclude exclusive queues, they cannot be restored
+ [queue_definition(Q) || Q <- lists:filter(fun(Q0) ->
+ amqqueue:get_exclusive_owner(Q0) =:= none
+ end,
+ rabbit_amqqueue:list())].
+
+queue_definition(Q) ->
+ #resource{virtual_host = VHost, name = Name} = amqqueue:get_name(Q),
+ Type = case amqqueue:get_type(Q) of
+ rabbit_classic_queue -> classic;
+ rabbit_quorum_queue -> quorum;
+ T -> T
+ end,
+ Arguments = [{Key, Value} || {Key, _Type, Value} <- amqqueue:get_arguments(Q)],
+ #{
+ <<"vhost">> => VHost,
+ <<"name">> => Name,
+ <<"type">> => Type,
+ <<"durable">> => amqqueue:is_durable(Q),
+ <<"auto_delete">> => amqqueue:is_auto_delete(Q),
+ <<"arguments">> => maps:from_list(Arguments)
+ }.
+
+list_bindings() ->
+ [binding_definition(B) || B <- rabbit_binding:list_explicit()].
+
+binding_definition(#binding{source = S,
+ key = RoutingKey,
+ destination = D,
+ args = Args}) ->
+ Arguments = [{Key, Value} || {Key, _Type, Value} <- Args],
+ #{
+ <<"source">> => S#resource.name,
+ <<"vhost">> => S#resource.virtual_host,
+ <<"destination">> => D#resource.name,
+ <<"destination_type">> => D#resource.kind,
+ <<"routing_key">> => RoutingKey,
+ <<"arguments">> => maps:from_list(Arguments)
+ }.
+
+list_vhosts() ->
+ [vhost_definition(V) || V <- rabbit_vhost:all()].
+
+vhost_definition(VHost) ->
+ #{
+ <<"name">> => vhost:get_name(VHost),
+ <<"limits">> => vhost:get_limits(VHost),
+ <<"metadata">> => vhost:get_metadata(VHost)
+ }.
+
+list_users() ->
+ [begin
+ {ok, User} = rabbit_auth_backend_internal:lookup_user(pget(user, U)),
+ #{name => User#internal_user.username,
+ password_hash => base64:encode(User#internal_user.password_hash),
+ hashing_algorithm => rabbit_auth_backend_internal:hashing_module_for_user(User),
+ tags => tags_as_binaries(User#internal_user.tags)
+ }
+ end || U <- rabbit_auth_backend_internal:list_users()].
+
+list_runtime_parameters() ->
+ [runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list()].
+
+runtime_parameter_definition(Param) ->
+ #{
+ <<"vhost">> => pget(vhost, Param),
+ <<"component">> => pget(component, Param),
+ <<"name">> => pget(name, Param),
+ <<"value">> => maps:from_list(pget(value, Param))
+ }.
+
+list_global_runtime_parameters() ->
+ [global_runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list_global()].
+
+global_runtime_parameter_definition(Param) ->
+ maps:from_list(Param).
+
+list_policies() ->
+ [policy_definition(P) || P <- rabbit_policy:list()].
+
+policy_definition(Policy) ->
+ #{
+ <<"vhost">> => pget(vhost, Policy),
+ <<"name">> => pget(name, Policy),
+ <<"pattern">> => pget(pattern, Policy),
+ <<"apply-to">> => pget('apply-to', Policy),
+ <<"priority">> => pget(priority, Policy),
+ <<"definition">> => maps:from_list(pget(definition, Policy))
+ }.
+
+list_permissions() ->
+ [permission_definition(P) || P <- rabbit_auth_backend_internal:list_permissions()].
+
+permission_definition(P) ->
+ maps:from_list(P).
+
+list_topic_permissions() ->
+ [topic_permission_definition(P) || P <- rabbit_auth_backend_internal:list_topic_permissions()].
+
+topic_permission_definition(P) ->
+ maps:from_list(P).
+
+tags_as_binaries(Tags) ->
+ list_to_binary(string:join([atom_to_list(T) || T <- Tags], ",")).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 86c11c4ad2..0d945929af 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -174,7 +174,7 @@ store_ram(X) ->
(binary()) -> atom() | rabbit_types:connection_exit().
check_type(TypeBin) ->
- case rabbit_registry:binary_to_type(TypeBin) of
+ case rabbit_registry:binary_to_type(rabbit_data_coercion:to_binary(TypeBin)) of
{error, not_found} ->
rabbit_misc:protocol_error(
command_invalid, "unknown exchange type '~s'", [TypeBin]);
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index ff07bfa8ee..78157072e3 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -43,7 +43,7 @@
-export([register/0]).
-export([invalidate/0, recover/0]).
--export([name/1, name_op/1, effective_definition/1, get/2, get_arg/3, set/1]).
+-export([name/1, name_op/1, effective_definition/1, merge_operator_definitions/2, get/2, get_arg/3, set/1]).
-export([validate/5, notify/5, notify_clear/4]).
-export([parse_set/7, set/7, delete/3, lookup/2, list/0, list/1,
list_formatted/1, list_formatted/3, info_keys/0]).
@@ -76,23 +76,23 @@ name0(Policy) -> pget(name, Policy).
effective_definition(Q) when ?is_amqqueue(Q) ->
Policy = amqqueue:get_policy(Q),
OpPolicy = amqqueue:get_operator_policy(Q),
- effective_definition0(Policy, OpPolicy);
+ merge_operator_definitions(Policy, OpPolicy);
effective_definition(#exchange{policy = Policy, operator_policy = OpPolicy}) ->
- effective_definition0(Policy, OpPolicy).
-
-effective_definition0(undefined, undefined) -> undefined;
-effective_definition0(Policy, undefined) -> pget(definition, Policy);
-effective_definition0(undefined, OpPolicy) -> pget(definition, OpPolicy);
-effective_definition0(Policy, OpPolicy) ->
- OpDefinition = pget(definition, OpPolicy, []),
- Definition = pget(definition, Policy, []),
- {Keys, _} = lists:unzip(Definition),
- {OpKeys, _} = lists:unzip(OpDefinition),
+ merge_operator_definitions(Policy, OpPolicy).
+
+merge_operator_definitions(undefined, undefined) -> undefined;
+merge_operator_definitions(Policy, undefined) -> pget(definition, Policy);
+merge_operator_definitions(undefined, OpPolicy) -> pget(definition, OpPolicy);
+merge_operator_definitions(Policy, OpPolicy) ->
+ OpDefinition = rabbit_data_coercion:to_map(pget(definition, OpPolicy, [])),
+ Definition = rabbit_data_coercion:to_map(pget(definition, Policy, [])),
+ Keys = maps:keys(Definition),
+ OpKeys = maps:keys(OpDefinition),
lists:map(fun(Key) ->
- case {pget(Key, Definition), pget(Key, OpDefinition)} of
- {Val, undefined} -> {Key, Val};
- {undefined, Val} -> {Key, Val};
- {Val, OpVal} -> {Key, merge_policy_value(Key, Val, OpVal)}
+ case {maps:get(Key, Definition, undefined), maps:get(Key, OpDefinition, undefined)} of
+ {Val, undefined} -> {Key, Val};
+ {undefined, OpVal} -> {Key, OpVal};
+ {Val, OpVal} -> {Key, merge_policy_value(Key, Val, OpVal)}
end
end,
lists:umerge(Keys, OpKeys)).
@@ -142,11 +142,11 @@ get0(Name, Policy, OpPolicy) ->
merge_policy_value(Name, PolicyVal, OpVal) ->
case policy_merge_strategy(Name) of
{ok, Module} -> Module:merge_policy_value(Name, PolicyVal, OpVal);
- {error, not_found} -> PolicyVal
+ {error, not_found} -> rabbit_policies:merge_policy_value(Name, PolicyVal, OpVal)
end.
policy_merge_strategy(Name) ->
- case rabbit_registry:binary_to_type(Name) of
+ case rabbit_registry:binary_to_type(rabbit_data_coercion:to_binary(Name)) of
{error, not_found} ->
{error, not_found};
T ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 95758cec7f..dbc3d6344a 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -27,6 +27,7 @@
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([delete_storage/1]).
-export([vhost_down/1]).
+-export([put_vhost/5]).
%%
%% API
@@ -96,7 +97,12 @@ add(Name, Description, Tags, ActingUser) ->
end.
do_add(Name, Description, Tags, ActingUser) ->
- rabbit_log:info("Adding vhost '~s' (description: '~s')", [Name, Description]),
+ case Description of
+ undefined ->
+ rabbit_log:info("Adding vhost '~s' without a description", [Name]);
+ Value ->
+ rabbit_log:info("Adding vhost '~s' (description: '~s')", [Name, Value])
+ end,
VHost = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_vhost, Name}) of
@@ -171,6 +177,48 @@ delete(VHost, ActingUser) ->
rabbit_vhost_sup_sup:delete_on_all_nodes(VHost),
ok.
+put_vhost(Name, Description, Tags0, Trace, Username) ->
+ Tags = case Tags0 of
+ undefined -> <<"">>;
+ null -> <<"">>;
+ "undefined" -> <<"">>;
+ "null" -> <<"">>;
+ Other -> Other
+ end,
+ Result = case exists(Name) of
+ true -> ok;
+ false -> add(Name, Description, parse_tags(Tags), Username),
+ %% wait for up to 45 seconds for the vhost to initialise
+ %% on all nodes
+ case await_running_on_all_nodes(Name, 45000) of
+ ok ->
+ maybe_grant_full_permissions(Name, Username);
+ {error, timeout} ->
+ {error, timeout}
+ end
+ end,
+ case Trace of
+ true -> rabbit_trace:start(Name);
+ false -> rabbit_trace:stop(Name);
+ undefined -> ok
+ end,
+ Result.
+
+%% when definitions are loaded on boot, Username here will be ?INTERNAL_USER,
+%% which does not actually exist
+maybe_grant_full_permissions(_Name, ?INTERNAL_USER) ->
+ ok;
+maybe_grant_full_permissions(Name, Username) ->
+ U = rabbit_auth_backend_internal:lookup_user(Username),
+ maybe_grant_full_permissions(U, Name, Username).
+
+maybe_grant_full_permissions({ok, _}, Name, Username) ->
+ rabbit_auth_backend_internal:set_permissions(
+ Username, Name, <<".*">>, <<".*">>, <<".*">>, Username);
+maybe_grant_full_permissions(_, _Name, _Username) ->
+ ok.
+
+
%% 50 ms
-define(AWAIT_SAMPLE_INTERVAL, 50).