diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2020-02-19 14:53:53 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-02-19 14:54:53 +0300 |
| commit | 2406e12e00ae10eef7f5176d9517bb8eabeb14c6 (patch) | |
| tree | 9869121e4b06ef3d8111e63446edada81ae45c81 /src | |
| parent | 8ad0eb0e5f70b09236e2ec0da91132762070930d (diff) | |
| download | rabbitmq-server-git-2406e12e00ae10eef7f5176d9517bb8eabeb14c6.tar.gz | |
Catch and report errors when importing definitions concurrently
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_definitions.erl | 30 |
1 files changed, 24 insertions, 6 deletions
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl index b9058f6588..08d5dc3064 100644 --- a/src/rabbit_definitions.erl +++ b/src/rabbit_definitions.erl @@ -334,12 +334,21 @@ concurrent_for_all(Category, ActingUser, Definitions, Fun) -> worker_pool:submit_async( ?IMPORT_WORK_POOL, fun() -> - Fun(atomize_keys(M), ActingUser), + try + Fun(atomize_keys(M), ActingUser) + catch _:E -> gatherer:in(Gatherer, {error, E}); + {error, E} -> gatherer:in(Gatherer, {error, E}) + end, gatherer:finish(Gatherer) end) end || M <- List, is_map(M)], - gatherer:out(Gatherer), - gatherer:stop(Gatherer) + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer); + {value, {error, E}} -> + ok = gatherer:stop(Gatherer), + throw({error, E}) + end end. concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) -> @@ -353,12 +362,21 @@ concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) -> worker_pool:submit_async( ?IMPORT_WORK_POOL, fun() -> - Fun(VHost, atomize_keys(M), ActingUser), + try + Fun(VHost, atomize_keys(M), ActingUser) + catch _:E -> gatherer:in(Gatherer, {error, E}); + {error, E} -> gatherer:in(Gatherer, {error, E}) + end, gatherer:finish(Gatherer) end) end || M <- List, is_map(M)], - gatherer:out(Gatherer), - gatherer:stop(Gatherer) + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer); + {value, {error, E}} -> + ok = gatherer:stop(Gatherer), + throw({error, E}) + end end. -spec atomize_keys(#{any() => any()}) -> #{atom() => any()}. |
