diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2020-02-19 14:53:53 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-02-19 14:54:53 +0300 |
| commit | 2406e12e00ae10eef7f5176d9517bb8eabeb14c6 (patch) | |
| tree | 9869121e4b06ef3d8111e63446edada81ae45c81 | |
| parent | 8ad0eb0e5f70b09236e2ec0da91132762070930d (diff) | |
| download | rabbitmq-server-git-2406e12e00ae10eef7f5176d9517bb8eabeb14c6.tar.gz | |
Catch and report errors when importing definitions concurrently
| -rw-r--r-- | src/rabbit_definitions.erl | 30 | ||||
| -rw-r--r-- | test/definition_import_SUITE.erl | 19 | ||||
| -rw-r--r-- | test/definition_import_SUITE_data/failing_case12.json | 24 |
3 files changed, 66 insertions, 7 deletions
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl index b9058f6588..08d5dc3064 100644 --- a/src/rabbit_definitions.erl +++ b/src/rabbit_definitions.erl @@ -334,12 +334,21 @@ concurrent_for_all(Category, ActingUser, Definitions, Fun) -> worker_pool:submit_async( ?IMPORT_WORK_POOL, fun() -> - Fun(atomize_keys(M), ActingUser), + try + Fun(atomize_keys(M), ActingUser) + catch _:E -> gatherer:in(Gatherer, {error, E}); + {error, E} -> gatherer:in(Gatherer, {error, E}) + end, gatherer:finish(Gatherer) end) end || M <- List, is_map(M)], - gatherer:out(Gatherer), - gatherer:stop(Gatherer) + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer); + {value, {error, E}} -> + ok = gatherer:stop(Gatherer), + throw({error, E}) + end end. concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) -> @@ -353,12 +362,21 @@ concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) -> worker_pool:submit_async( ?IMPORT_WORK_POOL, fun() -> - Fun(VHost, atomize_keys(M), ActingUser), + try + Fun(VHost, atomize_keys(M), ActingUser) + catch _:E -> gatherer:in(Gatherer, {error, E}); + {error, E} -> gatherer:in(Gatherer, {error, E}) + end, gatherer:finish(Gatherer) end) end || M <- List, is_map(M)], - gatherer:out(Gatherer), - gatherer:stop(Gatherer) + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer); + {value, {error, E}} -> + ok = gatherer:stop(Gatherer), + throw({error, E}) + end end. -spec atomize_keys(#{any() => any()}) -> #{atom() => any()}. diff --git a/test/definition_import_SUITE.erl b/test/definition_import_SUITE.erl index ef827a25ec..7cc91a7a51 100644 --- a/test/definition_import_SUITE.erl +++ b/test/definition_import_SUITE.erl @@ -45,7 +45,8 @@ groups() -> import_case8, import_case9, import_case10, - import_case11 + import_case11, + import_case12 ]} ]. @@ -105,12 +106,18 @@ import_case5(Config) -> {<<"1884">>,<<"vhost2">>}]). import_case11(Config) -> import_file_case(Config, "case11"). +import_case12(Config) -> import_invalid_file_case(Config, "failing_case12"). import_file_case(Config, CaseName) -> CasePath = filename:join(?config(data_dir, Config), CaseName ++ ".json"), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_import_case, [CasePath]), ok. +import_invalid_file_case(Config, CaseName) -> + CasePath = filename:join(?config(data_dir, Config), CaseName ++ ".json"), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, run_invalid_import_case, [CasePath]), + ok. + import_from_directory_case(Config, CaseName) -> import_from_directory_case_expect(Config, CaseName, ok). @@ -144,3 +151,13 @@ run_import_case(Path) -> ct:pal("Import case ~p failed: ~p~n", [Path, E]), ct:fail({failure, Path, E}) end. + +run_invalid_import_case(Path) -> + {ok, Body} = file:read_file(Path), + ct:pal("Successfully loaded a definition to import from ~p~n", [Path]), + case rabbit_definitions:import_raw(Body) of + ok -> + ct:pal("Expected import case ~p to fail~n", [Path]), + ct:fail({failure, Path}); + {error, _E} -> ok + end. diff --git a/test/definition_import_SUITE_data/failing_case12.json b/test/definition_import_SUITE_data/failing_case12.json new file mode 100644 index 0000000000..6ce0366a70 --- /dev/null +++ b/test/definition_import_SUITE_data/failing_case12.json @@ -0,0 +1,24 @@ +{ + "rabbit_version": "3.8.0+rc.1.5.g9148053", + "rabbitmq_version": "3.8.0+rc.1.5.g9148053", + "queues": [ + { + "name": "amq.queuebar", + "vhost": "/", + "durable": true, + "auto_delete": false, + "arguments": {} + } + ], + "exchanges": [ + { + "name": "invalid_type", + "vhost": "/", + "type": "definitly not direct", + "durable": true, + "auto_delete": false, + "internal": false, + "arguments": {} + } + ] +} |
