summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl7
-rw-r--r--src/rabbit_definitions.erl114
2 files changed, 95 insertions, 26 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 0f98e50504..b1aee4248c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -88,7 +88,7 @@
{enables, worker_pool}]}).
-rabbit_boot_step({worker_pool,
- [{description, "worker pool"},
+ [{description, "default worker pool"},
{mfa, {rabbit_sup, start_supervisor_child,
[worker_pool_sup]}},
{requires, pre_boot},
@@ -229,6 +229,11 @@
[{description, "ready to communicate with peers and clients"},
{requires, [core_initialized, recovery, routing_ready]}]}).
+-rabbit_boot_step({definition_import_worker_pool,
+ [{description, "dedicated worker pool for definition import"},
+ {mfa, {rabbit_definitions, boot, []}},
+ {requires, pre_flight}]}).
+
-rabbit_boot_step({cluster_name,
[{description, "sets cluster name if configured"},
{mfa, {rabbit_nodes, boot, []}},
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl
index 1d0a09f682..14e991cac5 100644
--- a/src/rabbit_definitions.erl
+++ b/src/rabbit_definitions.erl
@@ -17,6 +17,7 @@
-module(rabbit_definitions).
-include_lib("rabbit_common/include/rabbit.hrl").
+-export([boot/0]).
%% automatic import on boot
-export([maybe_load_definitions/0, maybe_load_definitions_from/2]).
%% import
@@ -58,6 +59,12 @@
-export_type([definition_object/0, definition_list/0, definition_category/0, definitions/0]).
+-define(IMPORT_WORK_POOL, definition_import_pool).
+
+boot() ->
+ PoolSize = application:get_env(rabbit, definition_import_work_pool_size, rabbit_runtime:guess_number_of_cpu_cores()),
+ rabbit_sup:start_supervisor_child(definition_import_pool_sup, worker_pool_sup, [PoolSize, ?IMPORT_WORK_POOL]).
+
maybe_load_definitions() ->
%% this feature was a part of rabbitmq-management for a long time,
%% so we check rabbit_management.load_definitions for backward compatibility.
@@ -224,20 +231,22 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),
try
- for_all(users, ActingUser, Map,
+ concurrent_for_all(users, ActingUser, Map,
fun(User, _Username) ->
rabbit_auth_backend_internal:put_user(User, Version, ActingUser)
end),
- for_all(vhosts, ActingUser, Map, fun add_vhost/2),
+ concurrent_for_all(vhosts, ActingUser, Map, fun add_vhost/2),
validate_limits(Map),
- for_all(permissions, ActingUser, Map, fun add_permission/2),
- for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
- for_all(parameters, ActingUser, Map, fun add_parameter/2),
- for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
- for_all(policies, ActingUser, Map, fun add_policy/2),
- for_all(queues, ActingUser, Map, fun add_queue/2),
- for_all(exchanges, ActingUser, Map, fun add_exchange/2),
- for_all(bindings, ActingUser, Map, fun add_binding/2),
+ concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2),
+ concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
+ sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2),
+ sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
+ %% importing policies concurrently can be unsafe as queues will be getting
+ %% potentially out of order notifications of applicable policy changes
+ sequential_for_all(policies, ActingUser, Map, fun add_policy/2),
+ concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
+ concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2),
+ concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2),
SuccessFun(),
ok
catch {error, E} -> {error, E};
@@ -254,11 +263,13 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
[VHost, ActingUser]),
try
validate_limits(Map, VHost),
- for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
- for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
- for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
- for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
- for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ %% importing policies concurrently can be unsafe as queues will be getting
+ %% potentially out of order notifications of applicable policy changes
+ sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
SuccessFun()
catch {error, E} -> {error, format(E)};
exit:E -> {error, format(E)}
@@ -275,17 +286,19 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
[VHost, ActingUser]),
try
validate_limits(Map, VHost),
- for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
- for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
- for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
- for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
- for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
+ sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
+ %% importing policies concurrently can be unsafe as queues will be getting
+ %% potentially out of order notifications of applicable policy changes
+ sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
+ concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
+ concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
+ concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
SuccessFun()
catch {error, E} -> ErrorFun(format(E));
exit:E -> ErrorFun(format(E))
end.
-for_all(Category, ActingUser, Definitions, Fun) ->
+sequential_for_all(Category, ActingUser, Definitions, Fun) ->
case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of
undefined -> ok;
List ->
@@ -295,14 +308,12 @@ for_all(Category, ActingUser, Definitions, Fun) ->
end,
[begin
%% keys are expected to be atoms
- Atomized = maps:fold(fun (K, V, Acc) ->
- maps:put(rabbit_data_coercion:to_atom(K), V, Acc)
- end, #{}, M),
+ Atomized = atomize_keys(M),
Fun(Atomized, ActingUser)
end || M <- List, is_map(M)]
end.
-for_all(Name, ActingUser, Definitions, VHost, Fun) ->
+sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
undefined -> ok;
@@ -311,6 +322,57 @@ for_all(Name, ActingUser, Definitions, VHost, Fun) ->
M <- List, is_map(M)]
end.
+concurrent_for_all(Category, ActingUser, Definitions, Fun) ->
+ case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of
+ undefined -> ok;
+ List ->
+ case length(List) of
+ 0 -> ok;
+ N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)])
+ end,
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ %% keys are expected to be atoms
+ Atomized = atomize_keys(M),
+ ok = gatherer:fork(Gatherer),
+ worker_pool:submit_async(
+ ?IMPORT_WORK_POOL,
+ fun() ->
+ Fun(Atomized, ActingUser),
+ gatherer:finish(Gatherer)
+ end)
+ end || M <- List, is_map(M)],
+ gatherer:out(Gatherer),
+ gatherer:stop(Gatherer)
+ end.
+
+concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
+ case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
+ undefined -> ok;
+ List ->
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ %% keys are expected to be atoms
+ Atomized = M = atomize_keys(M),
+ ok = gatherer:fork(Gatherer),
+ worker_pool:submit_async(
+ ?IMPORT_WORK_POOL,
+ fun() ->
+ Fun(VHost, Atomized, ActingUser),
+ gatherer:finish(Gatherer)
+ end)
+ end || M <- List, is_map(M)],
+ gatherer:out(Gatherer),
+ gatherer:stop(Gatherer)
+ end.
+
+-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}.
+
+atomize_keys(M) ->
+ maps:fold(fun(K, V, Acc) ->
+ maps:put(rabbit_data_coercion:to_atom(K), V, Acc)
+ end, #{}, M).
+
-spec human_readable_category_name(definition_category()) -> string().
human_readable_category_name(topic_permissions) -> "topic permissions";
@@ -390,6 +452,8 @@ add_policy(VHost, Param, Username) ->
exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S)))
end.
+-spec add_vhost(map(), rabbit_types:username()) -> ok.
+
add_vhost(VHost, ActingUser) ->
VHostName = maps:get(name, VHost, undefined),
VHostTrace = maps:get(tracing, VHost, undefined),