diff options
| -rw-r--r-- | src/gatherer.erl | 1 | ||||
| -rw-r--r-- | src/rabbit.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_definitions.erl | 127 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | test/definition_import_SUITE.erl | 19 | ||||
| -rw-r--r-- | test/definition_import_SUITE_data/failing_case12.json | 24 |
8 files changed, 158 insertions, 37 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl index 52efef2bbb..99a487ed12 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -56,6 +56,7 @@ start_link() -> -spec stop(pid()) -> 'ok'. stop(Pid) -> + unlink(Pid), gen_server2:call(Pid, stop, infinity). -spec fork(pid()) -> 'ok'. diff --git a/src/rabbit.erl b/src/rabbit.erl index 0f98e50504..b1aee4248c 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -88,7 +88,7 @@ {enables, worker_pool}]}). -rabbit_boot_step({worker_pool, - [{description, "worker pool"}, + [{description, "default worker pool"}, {mfa, {rabbit_sup, start_supervisor_child, [worker_pool_sup]}}, {requires, pre_boot}, @@ -229,6 +229,11 @@ [{description, "ready to communicate with peers and clients"}, {requires, [core_initialized, recovery, routing_ready]}]}). +-rabbit_boot_step({definition_import_worker_pool, + [{description, "dedicated worker pool for definition import"}, + {mfa, {rabbit_definitions, boot, []}}, + {requires, pre_flight}]}). + -rabbit_boot_step({cluster_name, [{description, "sets cluster name if configured"}, {mfa, {rabbit_nodes, boot, []}}, diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl index 4d666b88fc..6336fe4f31 100644 --- a/src/rabbit_definitions.erl +++ b/src/rabbit_definitions.erl @@ -17,6 +17,7 @@ -module(rabbit_definitions). -include_lib("rabbit_common/include/rabbit.hrl"). +-export([boot/0]). %% automatic import on boot -export([maybe_load_definitions/0, maybe_load_definitions_from/2]). %% import @@ -58,6 +59,12 @@ -export_type([definition_object/0, definition_list/0, definition_category/0, definitions/0]). +-define(IMPORT_WORK_POOL, definition_import_pool). + +boot() -> + PoolSize = application:get_env(rabbit, definition_import_work_pool_size, rabbit_runtime:guess_number_of_cpu_cores()), + rabbit_sup:start_supervisor_child(definition_import_pool_sup, worker_pool_sup, [PoolSize, ?IMPORT_WORK_POOL]). + maybe_load_definitions() -> %% this feature was a part of rabbitmq-management for a long time, %% so we check rabbit_management.load_definitions for backward compatibility. @@ -224,20 +231,22 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) -> apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) -> Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)), try - for_all(users, ActingUser, Map, + concurrent_for_all(users, ActingUser, Map, fun(User, _Username) -> rabbit_auth_backend_internal:put_user(User, Version, ActingUser) end), - for_all(vhosts, ActingUser, Map, fun add_vhost/2), + concurrent_for_all(vhosts, ActingUser, Map, fun add_vhost/2), validate_limits(Map), - for_all(permissions, ActingUser, Map, fun add_permission/2), - for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), - for_all(parameters, ActingUser, Map, fun add_parameter/2), - for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), - for_all(policies, ActingUser, Map, fun add_policy/2), - for_all(queues, ActingUser, Map, fun add_queue/2), - for_all(exchanges, ActingUser, Map, fun add_exchange/2), - for_all(bindings, ActingUser, Map, fun add_binding/2), + concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2), + concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2), + sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2), + sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, fun add_policy/2), + concurrent_for_all(queues, ActingUser, Map, fun add_queue/2), + concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2), + concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2), SuccessFun(), ok catch {error, E} -> {error, E}; @@ -254,11 +263,13 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) -> [VHost, ActingUser]), try validate_limits(Map, VHost), - for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), - for_all(policies, ActingUser, Map, VHost, fun add_policy/3), - for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), SuccessFun() catch {error, E} -> {error, format(E)}; exit:E -> {error, format(E)} @@ -275,17 +286,19 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) -> [VHost, ActingUser]), try validate_limits(Map, VHost), - for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), - for_all(policies, ActingUser, Map, VHost, fun add_policy/3), - for_all(queues, ActingUser, Map, VHost, fun add_queue/3), - for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), - for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), + sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3), + %% importing policies concurrently can be unsafe as queues will be getting + %% potentially out of order notifications of applicable policy changes + sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3), + concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3), + concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3), + concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3), SuccessFun() catch {error, E} -> ErrorFun(format(E)); exit:E -> ErrorFun(format(E)) end. -for_all(Category, ActingUser, Definitions, Fun) -> +sequential_for_all(Category, ActingUser, Definitions, Fun) -> case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of undefined -> ok; List -> @@ -295,22 +308,71 @@ for_all(Category, ActingUser, Definitions, Fun) -> end, [begin %% keys are expected to be atoms - Atomized = maps:fold(fun (K, V, Acc) -> - maps:put(rabbit_data_coercion:to_atom(K), V, Acc) - end, #{}, M), - Fun(Atomized, ActingUser) + Fun(atomize_keys(M), ActingUser) end || M <- List, is_map(M)] end. -for_all(Name, ActingUser, Definitions, VHost, Fun) -> +sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of + undefined -> ok; + List -> [Fun(VHost, atomize_keys(M), ActingUser) || M <- List, is_map(M)] + end. + +concurrent_for_all(Category, ActingUser, Definitions, Fun) -> + case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of + undefined -> ok; + List -> + case length(List) of + 0 -> ok; + N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)]) + end, + WorkPoolFun = fun(M) -> + Fun(atomize_keys(M), ActingUser) + end, + do_concurrent_for_all(List, WorkPoolFun) + end. +concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) -> case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of undefined -> ok; - List -> [Fun(VHost, maps:from_list([{atomise_name(K), V} || {K, V} <- maps:to_list(M)]), - ActingUser) || - M <- List, is_map(M)] + List -> + WorkPoolFun = fun(M) -> + Fun(VHost, atomize_keys(M), ActingUser) + end, + do_concurrent_for_all(List, WorkPoolFun) end. +do_concurrent_for_all(List, WorkPoolFun) -> + {ok, Gatherer} = gatherer:start_link(), + [begin + %% keys are expected to be atoms + ok = gatherer:fork(Gatherer), + worker_pool:submit_async( + ?IMPORT_WORK_POOL, + fun() -> + try + WorkPoolFun(M) + catch _:E -> gatherer:in(Gatherer, {error, E}); + {error, E} -> gatherer:in(Gatherer, {error, E}) + end, + gatherer:finish(Gatherer) + end) + end || M <- List, is_map(M)], + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer); + {value, {error, E}} -> + ok = gatherer:stop(Gatherer), + throw({error, E}) + end. + +-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}. + +atomize_keys(M) -> + maps:fold(fun(K, V, Acc) -> + maps:put(rabbit_data_coercion:to_atom(K), V, Acc) + end, #{}, M). + -spec human_readable_category_name(definition_category()) -> string(). human_readable_category_name(topic_permissions) -> "topic permissions"; @@ -390,6 +452,8 @@ add_policy(VHost, Param, Username) -> exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S))) end. +-spec add_vhost(map(), rabbit_types:username()) -> ok. + add_vhost(VHost, ActingUser) -> VHostName = maps:get(name, VHost, undefined), VHostTrace = maps:get(tracing, VHost, undefined), @@ -588,7 +652,8 @@ list_exchanges() -> %% exclude internal exchanges, they are not meant to be declared or used by %% applications [exchange_definition(X) || X <- lists:filter(fun(#exchange{internal = true}) -> false; - (#exchange{}) -> true + (#exchange{name = #resource{name = <<>>}}) -> false; + (X) -> not rabbit_exchange:is_amq_prefixed(X) end, rabbit_exchange:list())]. @@ -664,7 +729,7 @@ list_users() -> end || U <- rabbit_auth_backend_internal:list_users()]. list_runtime_parameters() -> - [runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list()]. + [runtime_parameter_definition(P) || P <- rabbit_runtime_parameters:list(), is_list(P)]. runtime_parameter_definition(Param) -> #{ diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index df0138d165..fd9ff0c05b 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -24,7 +24,7 @@ update_scratch/3, update_decorators/1, immutable/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4, route/2, delete/3, validate_binding/2, count/0]). --export([list_names/0]). +-export([list_names/0, is_amq_prefixed/1]). %% these must be run inside a mnesia tx -export([maybe_auto_delete/2, serial/1, peek_serial/1, update/2]). @@ -102,6 +102,18 @@ serial(#exchange{name = XName} = X) -> (false) -> none end. +-spec is_amq_prefixed(rabbit_types:exchange() | binary()) -> boolean(). + +is_amq_prefixed(Name) when is_binary(Name) -> + case re:run(Name, <<"^amq\.">>) of + nomatch -> false; + {match, _} -> true + end; +is_amq_prefixed(#exchange{name = #resource{name = <<>>}}) -> + false; +is_amq_prefixed(#exchange{name = #resource{name = Name}}) -> + is_amq_prefixed(Name). + -spec declare (name(), type(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:username()) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index c845575f5c..fdef5c3606 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1751,7 +1751,6 @@ build_index(Gatherer, Left, [], sum_file_size = SumFileSize }) -> case gatherer:out(Gatherer) of empty -> - unlink(Gatherer), ok = gatherer:stop(Gatherer), ok = index_clean_up_temporary_reference_count_entries(State), Offset = case ets:lookup(FileSummaryEts, Left) of diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 42995685f6..dd7e973905 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -717,7 +717,6 @@ queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> case gatherer:out(Gatherer) of empty -> - unlink(Gatherer), ok = gatherer:stop(Gatherer), finished; {value, {MsgId, Count}} -> @@ -1432,7 +1431,6 @@ foreach_queue_index(Funs) -> end) end || QueueDirName <- QueueDirNames], empty = gatherer:out(Gatherer), - unlink(Gatherer), ok = gatherer:stop(Gatherer). transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> diff --git a/test/definition_import_SUITE.erl b/test/definition_import_SUITE.erl index ef827a25ec..7cc91a7a51 100644 --- a/test/definition_import_SUITE.erl +++ b/test/definition_import_SUITE.erl @@ -45,7 +45,8 @@ groups() -> import_case8, import_case9, import_case10, - import_case11 + import_case11, + import_case12 ]} ]. @@ -105,12 +106,18 @@ import_case5(Config) -> {<<"1884">>,<<"vhost2">>}]). import_case11(Config) -> import_file_case(Config, "case11"). +import_case12(Config) -> import_invalid_file_case(Config, "failing_case12"). import_file_case(Config, CaseName) -> CasePath = filename:join(?config(data_dir, Config), CaseName ++ ".json"), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_import_case, [CasePath]), ok. +import_invalid_file_case(Config, CaseName) -> + CasePath = filename:join(?config(data_dir, Config), CaseName ++ ".json"), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_invalid_import_case, [CasePath]), + ok. + import_from_directory_case(Config, CaseName) -> import_from_directory_case_expect(Config, CaseName, ok). @@ -144,3 +151,13 @@ run_import_case(Path) -> ct:pal("Import case ~p failed: ~p~n", [Path, E]), ct:fail({failure, Path, E}) end. + +run_invalid_import_case(Path) -> + {ok, Body} = file:read_file(Path), + ct:pal("Successfully loaded a definition to import from ~p~n", [Path]), + case rabbit_definitions:import_raw(Body) of + ok -> + ct:pal("Expected import case ~p to fail~n", [Path]), + ct:fail({failure, Path}); + {error, _E} -> ok + end. diff --git a/test/definition_import_SUITE_data/failing_case12.json b/test/definition_import_SUITE_data/failing_case12.json new file mode 100644 index 0000000000..6ce0366a70 --- /dev/null +++ b/test/definition_import_SUITE_data/failing_case12.json @@ -0,0 +1,24 @@ +{ + "rabbit_version": "3.8.0+rc.1.5.g9148053", + "rabbitmq_version": "3.8.0+rc.1.5.g9148053", + "queues": [ + { + "name": "amq.queuebar", + "vhost": "/", + "durable": true, + "auto_delete": false, + "arguments": {} + } + ], + "exchanges": [ + { + "name": "invalid_type", + "vhost": "/", + "type": "definitly not direct", + "durable": true, + "auto_delete": false, + "internal": false, + "arguments": {} + } + ] +} |
