diff options
| author | Michael Klishin <michael@novemberain.com> | 2016-05-22 20:34:07 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2016-05-22 20:34:07 +0300 |
| commit | fe8cecad7116259cc83f26efb3b25b3d0ec6a655 (patch) | |
| tree | cee6ef36d2bd226e00159753abc1834318e9fdfc | |
| parent | dbff4cb5ef56cbc1171ebccdaefe6be1dff65b0b (diff) | |
| parent | d963bc8eff23802fb9f7ed9b0fa0b2f2040d9c1e (diff) | |
| download | rabbitmq-server-git-fe8cecad7116259cc83f26efb3b25b3d0ec6a655.tar.gz | |
Merge pull request #683 from binarin/rabbitmq-server-parallel-listing
Listing items in parallel
| -rw-r--r-- | src/rabbit_control_main.erl | 154 |
1 files changed, 96 insertions, 58 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index bee7128907..bf50fd6c49 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -23,7 +23,7 @@ sync_queue/1, cancel_sync_queue/1, become/1, purge_queue/1]). --import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]). +-import(rabbit_misc, [rpc_call/4, rpc_call/5]). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -590,56 +590,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) -> action(list_users, Node, [], _Opts, Inform, Timeout) -> Inform("Listing users", []), - call(Node, {rabbit_auth_backend_internal, list_users, []}, - rabbit_auth_backend_internal:user_info_keys(), true, Timeout); + call_emitter(Node, {rabbit_auth_backend_internal, list_users, []}, + rabbit_auth_backend_internal:user_info_keys(), + [{timeout, Timeout}, to_bin_utf8]); action(list_permissions, Node, [], Opts, Inform, Timeout) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost \"~s\"", [VHost]), - call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, - rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout, - true); + call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, + rabbit_auth_backend_internal:vhost_perms_info_keys(), + [{timeout, Timeout}, to_bin_utf8, is_escaped]); action(list_parameters, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing runtime parameters", []), - call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, - rabbit_runtime_parameters:info_keys(), Timeout); + call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, + rabbit_runtime_parameters:info_keys(), + [{timeout, Timeout}]); action(list_policies, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing policies", []), - call(Node, {rabbit_policy, list_formatted, [VHostArg]}, - rabbit_policy:info_keys(), Timeout); + call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]}, + rabbit_policy:info_keys(), + [{timeout, Timeout}]); action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing vhosts", []), ArgAtoms = default_if_empty(Args, [name]), - call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout); + call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms, + [{timeout, Timeout}, to_bin_utf8]); action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) -> {error_string, "list_user_permissions expects a username argument, but none provided."}; action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> Inform("Listing permissions for user ~p", Args), - call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, - rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout, - true); + call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, + rabbit_auth_backend_internal:user_perms_info_keys(), + [{timeout, Timeout}, to_bin_utf8, is_escaped]); action(list_queues, Node, Args, Opts, Inform, Timeout) -> - [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), Inform("Listing queues", []), + %% User options + [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, messages]), - call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]}, - ArgAtoms, Timeout); + + %% Data for emission + Nodes = nodes_in_cluster(Node, Timeout), + OnlineChunks = if Online -> length(Nodes); true -> 0 end, + OfflineChunks = if Offline -> 1; true -> 0 end, + ChunksOpt = {chunks, OnlineChunks + OfflineChunks}, + TimeoutOpt = {timeout, Timeout}, + EmissionRef = make_ref(), + EmissionRefOpt = {ref, EmissionRef}, + + _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]}, + [TimeoutOpt, EmissionRefOpt]), + _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]}, + [TimeoutOpt, EmissionRefOpt]), + display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]); action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> Inform("Listing exchanges", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, type]), - call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, Timeout); + call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}]); action(list_bindings, Node, Args, Opts, Inform, Timeout) -> Inform("Listing bindings", []), @@ -647,27 +665,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) -> ArgAtoms = default_if_empty(Args, [source_name, source_kind, destination_name, destination_kind, routing_key, arguments]), - call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, Timeout); + call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}]); action(list_connections, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing connections", []), ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), - call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]}, - ArgAtoms, Timeout); + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]); action(list_channels, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing channels", []), ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, messages_unacknowledged]), - call(Node, {rabbit_channel, info_all, [ArgAtoms]}, - ArgAtoms, Timeout); + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms, + [{timeout, Timeout}, {chunks, length(Nodes)}]); action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> Inform("Listing consumers", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]}, - rabbit_amqqueue:consumer_info_keys(), Timeout). + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]}, + rabbit_amqqueue:consumer_info_keys(), + [{timeout, Timeout}, {chunks, length(Nodes)}]). format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). @@ -777,17 +799,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) -> {X, Value} -> Value end, IsEscaped) || X <- InfoItemKeys]). -display_info_message(IsEscaped) -> +display_info_message(IsEscaped, InfoItemKeys) -> fun ([], _) -> ok; - ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) -> + ([FirstResult|_] = List, _) when is_list(FirstResult) -> lists:foreach(fun(Result) -> display_info_message_row(IsEscaped, Result, InfoItemKeys) end, List), ok; - (Result, InfoItemKeys) -> - display_info_message_row(IsEscaped, Result, InfoItemKeys) + (Result, _) -> + display_info_message_row(IsEscaped, Result, InfoItemKeys), + ok end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> @@ -844,7 +867,10 @@ display_call_result(Node, MFA) -> end. unsafe_rpc(Node, Mod, Fun, Args) -> - case rpc_call(Node, Mod, Fun, Args) of + unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT). + +unsafe_rpc(Node, Mod, Fun, Args, Timeout) -> + case rpc_call(Node, Mod, Fun, Args, Timeout) of {badrpc, _} = Res -> throw(Res); Normal -> Normal end. @@ -863,33 +889,42 @@ ensure_app_running(Node) -> call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). -call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) -> - call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false). +call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) -> + Ref = start_emission(Node, {Mod, Fun, Args}, Opts), + display_emission_result(Ref, InfoKeys, Opts). + +start_emission(Node, {Mod, Fun, Args}, Opts) -> + ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false), + Timeout = proplists:get_value(timeout, Opts, infinity), + Ref = proplists:get_value(ref, Opts, make_ref()), + rabbit_control_misc:spawn_emitter_caller( + Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8), + Ref, self(), Timeout), + Ref. + +display_emission_result(Ref, InfoKeys, Opts) -> + IsEscaped = proplists:get_value(is_escaped, Opts, false), + Chunks = proplists:get_value(chunks, Opts, 1), + Timeout = proplists:get_value(timeout, Opts, infinity), + EmissionStatus = rabbit_control_misc:wait_for_info_messages( + self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks), + emission_to_action_result(EmissionStatus). + +%% Convert rabbit_control_misc:wait_for_info_messages/6 return value +%% into form expected by rabbit_cli:main/3. +emission_to_action_result({ok, ok}) -> + ok; +emission_to_action_result({error, Error}) -> + Error. -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) -> - call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false). +prepare_call_args(Args, ToBinUtf8) -> + case ToBinUtf8 of + true -> valid_utf8_args(Args); + false -> Args + end. -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) -> - Args0 = case ToBinUtf8 of - true -> lists:map(fun list_to_binary_utf8/1, Args); - false -> Args - end, - Ref = make_ref(), - Pid = self(), - spawn_link( - fun () -> - case rabbit_cli:rpc_call(Node, Mod, Fun, Args0, - Ref, Pid, Timeout) of - {error, _} = Error -> - Pid ! {error, Error}; - {bad_argument, _} = Error -> - Pid ! {error, Error}; - _ -> - ok - end - end), - rabbit_control_misc:wait_for_info_messages( - Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout). +valid_utf8_args(Args) -> + lists:map(fun list_to_binary_utf8/1, Args). list_to_binary_utf8(L) -> B = list_to_binary(L), @@ -939,7 +974,10 @@ split_list([_]) -> exit(even_list_needed); split_list([A, B | T]) -> [{A, B} | split_list(T)]. nodes_in_cluster(Node) -> - unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]). + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT). + +nodes_in_cluster(Node, Timeout) -> + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout). alarms_by_node(Name) -> Status = unsafe_rpc(Name, rabbit, status, []), |
