diff options
| author | Ayanda Dube <ayanda.dube@erlang-solutions.com> | 2015-09-08 16:44:33 +0100 |
|---|---|---|
| committer | Ayanda Dube <ayanda.dube@erlang-solutions.com> | 2015-10-07 08:46:40 +0100 |
| commit | ae305dc2ad51be7d8aa2ef803edbfe49e7334231 (patch) | |
| tree | e4db74584c7ccc7e79508b54413f1cf6c2c85cf6 /src | |
| parent | 383defe127d93b13fc7b1ec10a977a5e8f43f189 (diff) | |
| download | rabbitmq-server-git-ae305dc2ad51be7d8aa2ef803edbfe49e7334231.tar.gz | |
Updates list_* operations to dynamically display info.
Adds notify_if_timeout/3 for terminating display of info process on timeout.
References #62
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_control_main.erl | 122 |
1 files changed, 83 insertions, 39 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index fe0563bbc7..0b23291c6a 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -17,12 +17,12 @@ -module(rabbit_control_main). -include("rabbit.hrl"). -include("rabbit_cli.hrl"). - +-compile(export_all). -export([start/0, stop/0, parse_arguments/2, action/5, sync_queue/1, cancel_sync_queue/1, become/1, purge_queue/1]). --import(rabbit_cli, [rpc_call/4, rpc_call/5]). +-import(rabbit_cli, [rpc_call/4, rpc_call/5, rpc_call/7]). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -499,62 +499,71 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) -> action(list_users, Node, [], _Opts, Inform, Timeout) -> Inform("Listing users", []), - display_info_list( - call(Node, {rabbit_auth_backend_internal, list_users, []}, Timeout), - rabbit_auth_backend_internal:user_info_keys()); + call(Node, {rabbit_auth_backend_internal, list_users, []}, Ref=make_ref(), + Timeout), + wait_info_messages(self(), Ref, + rabbit_auth_backend_internal:user_info_keys(), + Timeout); action(list_permissions, Node, [], Opts, Inform, Timeout) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost \"~s\"", [VHost]), - display_info_list(call(Node, {rabbit_auth_backend_internal, - list_vhost_permissions, [VHost]}, Timeout), - rabbit_auth_backend_internal:vhost_perms_info_keys()); + call(Node,{rabbit_auth_backend_internal, list_vhost_permissions, + [VHost]}, Ref=make_ref(), Timeout), + wait_info_messages(self(), Ref, + rabbit_auth_backend_internal:vhost_perms_info_keys(), + Timeout); action(list_parameters, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing runtime parameters", []), - display_info_list( - rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg], - Timeout), - rabbit_runtime_parameters:info_keys()); + spawn_link(rabbit_cli, rpc_call, [Node, rabbit_runtime_parameters, + list_formatted, [VHostArg], Ref=make_ref(), + Pid=self(), Timeout]), + wait_info_messages(Pid, Ref, rabbit_runtime_parameters:info_keys(), Timeout); action(list_policies, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing policies", []), - display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg], - Timeout), - rabbit_policy:info_keys()); + spawn_link(rabbit_cli, rpc_call, [Node, rabbit_policy, list_formatted, + [VHostArg], Ref=make_ref(), + Pid=self(), Timeout]), + wait_info_messages(Pid, Ref, rabbit_policy:info_keys(), Timeout); action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing vhosts", []), ArgAtoms = default_if_empty(Args, [name]), - display_info_list(call(Node, {rabbit_vhost, info_all, []}, Timeout), - ArgAtoms); + call(Node, {rabbit_vhost, info_all, []}, Ref=make_ref(), Timeout), + wait_info_messages(self(), Ref, ArgAtoms, Timeout); action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) -> {error_string, "list_user_permissions expects a username argument, but none provided."}; action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> Inform("Listing permissions for user ~p", Args), - display_info_list(call(Node, {rabbit_auth_backend_internal, - list_user_permissions, Args}, Timeout), - rabbit_auth_backend_internal:user_perms_info_keys()); + call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, + Ref=make_ref(), Timeout), + wait_info_messages(self(), Ref, + rabbit_auth_backend_internal:user_perms_info_keys(), + Timeout); action(list_queues, Node, Args, Opts, Inform, Timeout) -> Inform("Listing queues", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, messages]), - display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, - [VHostArg, ArgAtoms], Timeout), - ArgAtoms); + spawn_link(rabbit_cli, rpc_call, [Node, rabbit_amqqueue, info_all, + [VHostArg, ArgAtoms], Ref=make_ref(), + Pid=self(), Timeout]), + wait_info_messages(Pid, Ref, ArgAtoms, Timeout); action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> Inform("Listing exchanges", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, type]), - display_info_list(rpc_call(Node, rabbit_exchange, info_all, - [VHostArg, ArgAtoms], Timeout), - ArgAtoms); + spawn_link(rabbit_cli, rpc_call, [Node, rabbit_exchange, info_all, + [VHostArg, ArgAtoms], Ref=make_ref(), + Pid=self(), Timeout]), + wait_info_messages(Pid, Ref, ArgAtoms, Timeout); action(list_bindings, Node, Args, Opts, Inform, Timeout) -> Inform("Listing bindings", []), @@ -562,32 +571,37 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) -> ArgAtoms = default_if_empty(Args, [source_name, source_kind, destination_name, destination_kind, routing_key, arguments]), - display_info_list(rpc_call(Node, rabbit_binding, info_all, - [VHostArg, ArgAtoms], Timeout), - ArgAtoms); + spawn_link(rabbit_cli, rpc_call, [Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms], Ref=make_ref(), + Pid=self(), Timeout]), + wait_info_messages(Pid, Ref, ArgAtoms, Timeout); action(list_connections, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing connections", []), ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), - display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, - [ArgAtoms], Timeout), - ArgAtoms); + spawn_link(rabbit_cli, rpc_call, [Node, rabbit_networking, + connection_info_all, + [ArgAtoms], Ref=make_ref(), Pid=self(), + Timeout]), + wait_info_messages(Pid, Ref, ArgAtoms, Timeout); action(list_channels, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing channels", []), ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, messages_unacknowledged]), - display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms], - Timeout), - ArgAtoms); + spawn_link(rabbit_cli, rpc_call, [Node, rabbit_channel, info_all, + [ArgAtoms], Ref=make_ref(), Pid=self(), + Timeout]), + wait_info_messages(Pid, Ref, ArgAtoms, Timeout); action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> Inform("Listing consumers", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg], - Timeout), - rabbit_amqqueue:consumer_info_keys()). - + spawn_link(rabbit_cli, rpc_call, [Node, rabbit_amqqueue, consumers_all, + [VHostArg], Ref=make_ref(), Pid=self(), + Timeout]), + wait_info_messages(Pid, Ref, rabbit_amqqueue:consumer_info_keys(), + Timeout). format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). @@ -679,6 +693,29 @@ default_if_empty(List, Default) when is_list(List) -> true -> [list_to_atom(X) || X <- List] end. +wait_info_messages(Pid, Ref, ArgAtoms, Timeout) -> + notify_if_timeout(Pid, Ref, Timeout), + wait_info_messages(Ref, ArgAtoms). + +wait_info_messages(Ref, InfoItemKeys) when is_reference(Ref) -> + receive + {Ref, finished} -> ok; + {Ref, {timeout, T}} -> exit({error, {timeout, (T / 1000)}}); + {_Ref, []} -> wait_info_messages(Ref, InfoItemKeys); + {_Ref, Result} -> display_info_message(Result, InfoItemKeys), + wait_info_messages(Ref, InfoItemKeys); + _ -> wait_info_messages(Ref, InfoItemKeys) + end. + +display_info_message(Result, InfoItemKeys) -> + display_row([format_info_item( + case proplists:get_value(X, Result) of + undefined when is_list(Result), length(Result) > 0 -> + exit({error, {bad_info_key, X}}); + undefined -> Result; + Value -> Value + end) || X <- InfoItemKeys]). + display_info_list(Results, InfoItemKeys) when is_list(Results) -> lists:foreach( fun (Result) -> display_row( @@ -748,12 +785,19 @@ ensure_app_running(Node) -> Other -> Other end. +notify_if_timeout(Pid, Ref, Timeout) -> + timer:send_after(Timeout, Pid, {Ref, {timeout, Timeout}}). + call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). call(Node, {Mod, Fun, Args}, Timeout) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args), Timeout). +call(Node, {Mod, Fun, Args}, Ref, Timeout) when is_reference(Ref) -> + Args0 = lists:map(fun list_to_binary_utf8/1, Args), + spawn_link(rabbit_cli, rpc_call, [Node, Mod, Fun, Args0, Ref, self(), Timeout]). + list_to_binary_utf8(L) -> B = list_to_binary(L), case rabbit_binary_parser:validate_utf8(B) of |
