diff options
| -rw-r--r-- | src/rabbit.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_definitions.erl | 114 |
2 files changed, 95 insertions, 26 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 0f98e50504..b1aee4248c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -88,7 +88,7 @@ {enables, worker_pool}]}). -rabbit_boot_step({worker_pool, - [{description, "worker pool"}, + [{description, "default worker pool"}, {mfa, {rabbit_sup, start_supervisor_child, [worker_pool_sup]}}, {requires, pre_boot}, @@ -229,6 +229,11 @@ [{description, "ready to communicate with peers and clients"}, {requires, [core_initialized, recovery, routing_ready]}]}). +-rabbit_boot_step({definition_import_worker_pool, + [{description, "dedicated worker pool for definition import"}, + {mfa, {rabbit_definitions, boot, []}}, + {requires, pre_flight}]}). + -rabbit_boot_step({cluster_name, [{description, "sets cluster name if configured"}, {mfa, {rabbit_nodes, boot, []}}, diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl index 1d0a09f682..14e991cac5 100644 --- a/src/rabbit_definitions.erl +++ b/src/rabbit_definitions.erl @@ -17,6 +17,7 @@ -module(rabbit_definitions). -include_lib("rabbit_common/include/rabbit.hrl"). +-export([boot/0]). %% automatic import on boot -export([maybe_load_definitions/0, maybe_load_definitions_from/2]). %% import @@ -58,6 +59,12 @@ -export_type([definition_object/0, definition_list/0, definition_category/0, definitions/0]). +-define(IMPORT_WORK_POOL, definition_import_pool). + +boot() -> + PoolSize = application:get_env(rabbit, definition_import_work_pool_size, rabbit_runtime:guess_number_of_cpu_cores()), + rabbit_sup:start_supervisor_child(definition_import_pool_sup, worker_pool_sup, [PoolSize, ?IMPORT_WORK_POOL]). + maybe_load_definitions() -> %% this feature was a part of rabbitmq-management for a long time, %% so we check rabbit_management.load_definitions for backward compatibility. @@ -224,20 +231,22 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) -> apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)), try - for_all(users, ActingUser, Map, + concurrent_for_all(users, ActingUser, Map, fun(User, _Username) -> rabbit_auth_backend_internal:put_user(User, Version, ActingUser) end), - for_all(vhosts, ActingUser, Map, fun add_vhost/2), + concurrent_for_all(vhosts, ActingUser, Map, fun add_vhost/2), validate_limits(Map), - for_all(permissions, ActingUser, Map, fun add_permission/2), - for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), - for_all(parameters, ActingUser, Map, fun add_parameter/2), - for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), - for_all(policies, ActingUser, Map, fun add_policy/2), - for_all(queues, ActingUser, Map, fun add_queue/2), - for_all(exchanges, ActingUser, Map, fun add_exchange/2), - for_all(bindings, ActingUser, Map, fun add_binding/2), + concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2), + concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), + sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2), + sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, fun add_policy/2), + concurrent_for_all(queues, ActingUser, Map, fun add_queue/2), + concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2), + concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2), SuccessFun(), ok catch {error, E} -> {error, E}; @@ -254,11 +263,13 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) -> [VHost, ActingUser]), try validate_limits(Map, VHost), - for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), - for_all(policies, ActingUser, Map, VHost, fun add_policy/3), - for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), SuccessFun() catch {error, E} -> {error, format(E)}; exit:E -> {error, format(E)} @@ -275,17 +286,19 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) -> [VHost, ActingUser]), try validate_limits(Map, VHost), - for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), - for_all(policies, ActingUser, Map, VHost, fun add_policy/3), - for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), SuccessFun() catch {error, E} -> ErrorFun(format(E)); exit:E -> ErrorFun(format(E)) end. -for_all(Category, ActingUser, Definitions, Fun) -> +sequential_for_all(Category, ActingUser, Definitions, Fun) -> case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of undefined -> ok; List -> @@ -295,14 +308,12 @@ for_all(Category, ActingUser, Definitions, Fun) -> end, [begin %% keys are expected to be atoms - Atomized = maps:fold(fun (K, V, Acc) -> - maps:put(rabbit_data_coercion:to_atom(K), V, Acc) - end, #{}, M), + Atomized = atomize_keys(M), Fun(Atomized, ActingUser) end || M <- List, is_map(M)] end. -for_all(Name, ActingUser, Definitions, VHost, Fun) -> +sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) -> case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of undefined -> ok; @@ -311,6 +322,57 @@ for_all(Name, ActingUser, Definitions, VHost, Fun) -> M <- List, is_map(M)] end. +concurrent_for_all(Category, ActingUser, Definitions, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of + undefined -> ok; + List -> + case length(List) of + 0 -> ok; + N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)]) + end, + {ok, Gatherer} = gatherer:start_link(), + [begin + %% keys are expected to be atoms + Atomized = atomize_keys(M), + ok = gatherer:fork(Gatherer), + worker_pool:submit_async( + ?IMPORT_WORK_POOL, + fun() -> + Fun(Atomized, ActingUser), + gatherer:finish(Gatherer) + end) + end || M <- List, is_map(M)], + gatherer:out(Gatherer), + gatherer:stop(Gatherer) + end. + +concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of + undefined -> ok; + List -> + {ok, Gatherer} = gatherer:start_link(), + [begin + %% keys are expected to be atoms + Atomized = M = atomize_keys(M), + ok = gatherer:fork(Gatherer), + worker_pool:submit_async( + ?IMPORT_WORK_POOL, + fun() -> + Fun(VHost, Atomized, ActingUser), + gatherer:finish(Gatherer) + end) + end || M <- List, is_map(M)], + gatherer:out(Gatherer), + gatherer:stop(Gatherer) + end. + +-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}. + +atomize_keys(M) -> + maps:fold(fun(K, V, Acc) -> + maps:put(rabbit_data_coercion:to_atom(K), V, Acc) + end, #{}, M). + -spec human_readable_category_name(definition_category()) -> string(). human_readable_category_name(topic_permissions) -> "topic permissions"; @@ -390,6 +452,8 @@ add_policy(VHost, Param, Username) -> exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S))) end. +-spec add_vhost(map(), rabbit_types:username()) -> ok. + add_vhost(VHost, ActingUser) -> VHostName = maps:get(name, VHost, undefined), VHostTrace = maps:get(tracing, VHost, undefined), |
