diff options
| -rw-r--r-- | src/rabbit_definitions.erl | 77 |
1 files changed, 32 insertions, 45 deletions
diff --git a/src/rabbit_definitions.erl b/src/rabbit_definitions.erl index 08d5dc3064..6336fe4f31 100644 --- a/src/rabbit_definitions.erl +++ b/src/rabbit_definitions.erl @@ -313,7 +313,6 @@ sequential_for_all(Category, ActingUser, Definitions, Fun) -> end. sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) -> - case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of undefined -> ok; List -> [Fun(VHost, atomize_keys(M), ActingUser) || M <- List, is_map(M)] @@ -327,56 +326,44 @@ concurrent_for_all(Category, ActingUser, Definitions, Fun) -> 0 -> ok; N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)]) end, - {ok, Gatherer} = gatherer:start_link(), - [begin - %% keys are expected to be atoms - ok = gatherer:fork(Gatherer), - worker_pool:submit_async( - ?IMPORT_WORK_POOL, - fun() -> - try - Fun(atomize_keys(M), ActingUser) - catch _:E -> gatherer:in(Gatherer, {error, E}); - {error, E} -> gatherer:in(Gatherer, {error, E}) - end, - gatherer:finish(Gatherer) - end) - end || M <- List, is_map(M)], - case gatherer:out(Gatherer) of - empty -> - ok = gatherer:stop(Gatherer); - {value, {error, E}} -> - ok = gatherer:stop(Gatherer), - throw({error, E}) - end + WorkPoolFun = fun(M) -> + Fun(atomize_keys(M), ActingUser) + end, + do_concurrent_for_all(List, WorkPoolFun) end. concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) -> case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of undefined -> ok; List -> - {ok, Gatherer} = gatherer:start_link(), - [begin - %% keys are expected to be atoms - ok = gatherer:fork(Gatherer), - worker_pool:submit_async( - ?IMPORT_WORK_POOL, - fun() -> - try - Fun(VHost, atomize_keys(M), ActingUser) - catch _:E -> gatherer:in(Gatherer, {error, E}); - {error, E} -> gatherer:in(Gatherer, {error, E}) - end, - gatherer:finish(Gatherer) - end) - end || M <- List, is_map(M)], - case gatherer:out(Gatherer) of - empty -> - ok = gatherer:stop(Gatherer); - {value, {error, E}} -> - ok = gatherer:stop(Gatherer), - throw({error, E}) - end + WorkPoolFun = fun(M) -> + Fun(VHost, atomize_keys(M), ActingUser) + end, + do_concurrent_for_all(List, WorkPoolFun) + end. + +do_concurrent_for_all(List, WorkPoolFun) -> + {ok, Gatherer} = gatherer:start_link(), + [begin + %% keys are expected to be atoms + ok = gatherer:fork(Gatherer), + worker_pool:submit_async( + ?IMPORT_WORK_POOL, + fun() -> + try + WorkPoolFun(M) + catch _:E -> gatherer:in(Gatherer, {error, E}); + {error, E} -> gatherer:in(Gatherer, {error, E}) + end, + gatherer:finish(Gatherer) + end) + end || M <- List, is_map(M)], + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer); + {value, {error, E}} -> + ok = gatherer:stop(Gatherer), + throw({error, E}) end. -spec atomize_keys(#{any() => any()}) -> #{atom() => any()}. |
