summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-02-19 14:53:53 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-02-19 14:54:53 +0300
commit2406e12e00ae10eef7f5176d9517bb8eabeb14c6 (patch)
tree9869121e4b06ef3d8111e63446edada81ae45c81 /src
parent8ad0eb0e5f70b09236e2ec0da91132762070930d (diff)
downloadrabbitmq-server-git-2406e12e00ae10eef7f5176d9517bb8eabeb14c6.tar.gz
Catch and report errors when importing definitions concurrently
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_definitions.erl30
1 files changed, 24 insertions, 6 deletions
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl
index b9058f6588..08d5dc3064 100644
--- a/src/rabbit_definitions.erl
+++ b/src/rabbit_definitions.erl
@@ -334,12 +334,21 @@ concurrent_for_all(Category, ActingUser, Definitions, Fun) ->
worker_pool:submit_async(
?IMPORT_WORK_POOL,
fun() ->
- Fun(atomize_keys(M), ActingUser),
+ try
+ Fun(atomize_keys(M), ActingUser)
+ catch _:E -> gatherer:in(Gatherer, {error, E});
+ {error, E} -> gatherer:in(Gatherer, {error, E})
+ end,
gatherer:finish(Gatherer)
end)
end || M <- List, is_map(M)],
- gatherer:out(Gatherer),
- gatherer:stop(Gatherer)
+ case gatherer:out(Gatherer) of
+ empty ->
+ ok = gatherer:stop(Gatherer);
+ {value, {error, E}} ->
+ ok = gatherer:stop(Gatherer),
+ throw({error, E})
+ end
end.
concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
@@ -353,12 +362,21 @@ concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
worker_pool:submit_async(
?IMPORT_WORK_POOL,
fun() ->
- Fun(VHost, atomize_keys(M), ActingUser),
+ try
+ Fun(VHost, atomize_keys(M), ActingUser)
+ catch _:E -> gatherer:in(Gatherer, {error, E});
+ {error, E} -> gatherer:in(Gatherer, {error, E})
+ end,
gatherer:finish(Gatherer)
end)
end || M <- List, is_map(M)],
- gatherer:out(Gatherer),
- gatherer:stop(Gatherer)
+ case gatherer:out(Gatherer) of
+ empty ->
+ ok = gatherer:stop(Gatherer);
+ {value, {error, E}} ->
+ ok = gatherer:stop(Gatherer),
+ throw({error, E})
+ end
end.
-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}.