summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_definitions.erl77
1 files changed, 32 insertions, 45 deletions
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl
index 08d5dc3064..6336fe4f31 100644
--- a/src/rabbit_definitions.erl
+++ b/src/rabbit_definitions.erl
@@ -313,7 +313,6 @@ sequential_for_all(Category, ActingUser, Definitions, Fun) ->
end.
sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
-
case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
undefined -> ok;
List -> [Fun(VHost, atomize_keys(M), ActingUser) || M <- List, is_map(M)]
@@ -327,56 +326,44 @@ concurrent_for_all(Category, ActingUser, Definitions, Fun) ->
0 -> ok;
N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)])
end,
- {ok, Gatherer} = gatherer:start_link(),
- [begin
- %% keys are expected to be atoms
- ok = gatherer:fork(Gatherer),
- worker_pool:submit_async(
- ?IMPORT_WORK_POOL,
- fun() ->
- try
- Fun(atomize_keys(M), ActingUser)
- catch _:E -> gatherer:in(Gatherer, {error, E});
- {error, E} -> gatherer:in(Gatherer, {error, E})
- end,
- gatherer:finish(Gatherer)
- end)
- end || M <- List, is_map(M)],
- case gatherer:out(Gatherer) of
- empty ->
- ok = gatherer:stop(Gatherer);
- {value, {error, E}} ->
- ok = gatherer:stop(Gatherer),
- throw({error, E})
- end
+ WorkPoolFun = fun(M) ->
+ Fun(atomize_keys(M), ActingUser)
+ end,
+ do_concurrent_for_all(List, WorkPoolFun)
end.
concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
undefined -> ok;
List ->
- {ok, Gatherer} = gatherer:start_link(),
- [begin
- %% keys are expected to be atoms
- ok = gatherer:fork(Gatherer),
- worker_pool:submit_async(
- ?IMPORT_WORK_POOL,
- fun() ->
- try
- Fun(VHost, atomize_keys(M), ActingUser)
- catch _:E -> gatherer:in(Gatherer, {error, E});
- {error, E} -> gatherer:in(Gatherer, {error, E})
- end,
- gatherer:finish(Gatherer)
- end)
- end || M <- List, is_map(M)],
- case gatherer:out(Gatherer) of
- empty ->
- ok = gatherer:stop(Gatherer);
- {value, {error, E}} ->
- ok = gatherer:stop(Gatherer),
- throw({error, E})
- end
+ WorkPoolFun = fun(M) ->
+ Fun(VHost, atomize_keys(M), ActingUser)
+ end,
+ do_concurrent_for_all(List, WorkPoolFun)
+ end.
+
+do_concurrent_for_all(List, WorkPoolFun) ->
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ %% keys are expected to be atoms
+ ok = gatherer:fork(Gatherer),
+ worker_pool:submit_async(
+ ?IMPORT_WORK_POOL,
+ fun() ->
+ try
+ WorkPoolFun(M)
+ catch _:E -> gatherer:in(Gatherer, {error, E});
+ {error, E} -> gatherer:in(Gatherer, {error, E})
+ end,
+ gatherer:finish(Gatherer)
+ end)
+ end || M <- List, is_map(M)],
+ case gatherer:out(Gatherer) of
+ empty ->
+ ok = gatherer:stop(Gatherer);
+ {value, {error, E}} ->
+ ok = gatherer:stop(Gatherer),
+ throw({error, E})
end.
-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}.