summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAyanda Dube <ayanda.dube@erlang-solutions.com>2015-09-08 16:44:33 +0100
committerAyanda Dube <ayanda.dube@erlang-solutions.com>2015-10-07 08:46:40 +0100
commitae305dc2ad51be7d8aa2ef803edbfe49e7334231 (patch)
treee4db74584c7ccc7e79508b54413f1cf6c2c85cf6 /src
parent383defe127d93b13fc7b1ec10a977a5e8f43f189 (diff)
downloadrabbitmq-server-git-ae305dc2ad51be7d8aa2ef803edbfe49e7334231.tar.gz
Updates list_* operations to dynamically display info.
Adds notify_if_timeout/3 for terminating display of info process on timeout. References #62
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_control_main.erl122
1 files changed, 83 insertions, 39 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index fe0563bbc7..0b23291c6a 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -17,12 +17,12 @@
-module(rabbit_control_main).
-include("rabbit.hrl").
-include("rabbit_cli.hrl").
-
+-compile(export_all).
-export([start/0, stop/0, parse_arguments/2, action/5,
sync_queue/1, cancel_sync_queue/1, become/1,
purge_queue/1]).
--import(rabbit_cli, [rpc_call/4, rpc_call/5]).
+-import(rabbit_cli, [rpc_call/4, rpc_call/5, rpc_call/7]).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -499,62 +499,71 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
action(list_users, Node, [], _Opts, Inform, Timeout) ->
Inform("Listing users", []),
- display_info_list(
- call(Node, {rabbit_auth_backend_internal, list_users, []}, Timeout),
- rabbit_auth_backend_internal:user_info_keys());
+ call(Node, {rabbit_auth_backend_internal, list_users, []}, Ref=make_ref(),
+ Timeout),
+ wait_info_messages(self(), Ref,
+ rabbit_auth_backend_internal:user_info_keys(),
+ Timeout);
action(list_permissions, Node, [], Opts, Inform, Timeout) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost \"~s\"", [VHost]),
- display_info_list(call(Node, {rabbit_auth_backend_internal,
- list_vhost_permissions, [VHost]}, Timeout),
- rabbit_auth_backend_internal:vhost_perms_info_keys());
+ call(Node,{rabbit_auth_backend_internal, list_vhost_permissions,
+ [VHost]}, Ref=make_ref(), Timeout),
+ wait_info_messages(self(), Ref,
+ rabbit_auth_backend_internal:vhost_perms_info_keys(),
+ Timeout);
action(list_parameters, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing runtime parameters", []),
- display_info_list(
- rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg],
- Timeout),
- rabbit_runtime_parameters:info_keys());
+ spawn_link(rabbit_cli, rpc_call, [Node, rabbit_runtime_parameters,
+ list_formatted, [VHostArg], Ref=make_ref(),
+ Pid=self(), Timeout]),
+ wait_info_messages(Pid, Ref, rabbit_runtime_parameters:info_keys(), Timeout);
action(list_policies, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing policies", []),
- display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg],
- Timeout),
- rabbit_policy:info_keys());
+ spawn_link(rabbit_cli, rpc_call, [Node, rabbit_policy, list_formatted,
+ [VHostArg], Ref=make_ref(),
+ Pid=self(), Timeout]),
+ wait_info_messages(Pid, Ref, rabbit_policy:info_keys(), Timeout);
action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing vhosts", []),
ArgAtoms = default_if_empty(Args, [name]),
- display_info_list(call(Node, {rabbit_vhost, info_all, []}, Timeout),
- ArgAtoms);
+ call(Node, {rabbit_vhost, info_all, []}, Ref=make_ref(), Timeout),
+ wait_info_messages(self(), Ref, ArgAtoms, Timeout);
action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
{error_string,
"list_user_permissions expects a username argument, but none provided."};
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
Inform("Listing permissions for user ~p", Args),
- display_info_list(call(Node, {rabbit_auth_backend_internal,
- list_user_permissions, Args}, Timeout),
- rabbit_auth_backend_internal:user_perms_info_keys());
+ call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
+ Ref=make_ref(), Timeout),
+ wait_info_messages(self(), Ref,
+ rabbit_auth_backend_internal:user_perms_info_keys(),
+ Timeout);
action(list_queues, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing queues", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, messages]),
- display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
- [VHostArg, ArgAtoms], Timeout),
- ArgAtoms);
+ spawn_link(rabbit_cli, rpc_call, [Node, rabbit_amqqueue, info_all,
+ [VHostArg, ArgAtoms], Ref=make_ref(),
+ Pid=self(), Timeout]),
+ wait_info_messages(Pid, Ref, ArgAtoms, Timeout);
action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing exchanges", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, type]),
- display_info_list(rpc_call(Node, rabbit_exchange, info_all,
- [VHostArg, ArgAtoms], Timeout),
- ArgAtoms);
+ spawn_link(rabbit_cli, rpc_call, [Node, rabbit_exchange, info_all,
+ [VHostArg, ArgAtoms], Ref=make_ref(),
+ Pid=self(), Timeout]),
+ wait_info_messages(Pid, Ref, ArgAtoms, Timeout);
action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing bindings", []),
@@ -562,32 +571,37 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
ArgAtoms = default_if_empty(Args, [source_name, source_kind,
destination_name, destination_kind,
routing_key, arguments]),
- display_info_list(rpc_call(Node, rabbit_binding, info_all,
- [VHostArg, ArgAtoms], Timeout),
- ArgAtoms);
+ spawn_link(rabbit_cli, rpc_call, [Node, rabbit_binding, info_all,
+ [VHostArg, ArgAtoms], Ref=make_ref(),
+ Pid=self(), Timeout]),
+ wait_info_messages(Pid, Ref, ArgAtoms, Timeout);
action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing connections", []),
ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
- display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
- [ArgAtoms], Timeout),
- ArgAtoms);
+ spawn_link(rabbit_cli, rpc_call, [Node, rabbit_networking,
+ connection_info_all,
+ [ArgAtoms], Ref=make_ref(), Pid=self(),
+ Timeout]),
+ wait_info_messages(Pid, Ref, ArgAtoms, Timeout);
action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing channels", []),
ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
messages_unacknowledged]),
- display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms],
- Timeout),
- ArgAtoms);
+ spawn_link(rabbit_cli, rpc_call, [Node, rabbit_channel, info_all,
+ [ArgAtoms], Ref=make_ref(), Pid=self(),
+ Timeout]),
+ wait_info_messages(Pid, Ref, ArgAtoms, Timeout);
action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
Inform("Listing consumers", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg],
- Timeout),
- rabbit_amqqueue:consumer_info_keys()).
-
+ spawn_link(rabbit_cli, rpc_call, [Node, rabbit_amqqueue, consumers_all,
+ [VHostArg], Ref=make_ref(), Pid=self(),
+ Timeout]),
+ wait_info_messages(Pid, Ref, rabbit_amqqueue:consumer_info_keys(),
+ Timeout).
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
@@ -679,6 +693,29 @@ default_if_empty(List, Default) when is_list(List) ->
true -> [list_to_atom(X) || X <- List]
end.
+wait_info_messages(Pid, Ref, ArgAtoms, Timeout) ->
+ notify_if_timeout(Pid, Ref, Timeout),
+ wait_info_messages(Ref, ArgAtoms).
+
+wait_info_messages(Ref, InfoItemKeys) when is_reference(Ref) ->
+ receive
+ {Ref, finished} -> ok;
+ {Ref, {timeout, T}} -> exit({error, {timeout, (T / 1000)}});
+ {_Ref, []} -> wait_info_messages(Ref, InfoItemKeys);
+ {_Ref, Result} -> display_info_message(Result, InfoItemKeys),
+ wait_info_messages(Ref, InfoItemKeys);
+ _ -> wait_info_messages(Ref, InfoItemKeys)
+ end.
+
+display_info_message(Result, InfoItemKeys) ->
+ display_row([format_info_item(
+ case proplists:get_value(X, Result) of
+ undefined when is_list(Result), length(Result) > 0 ->
+ exit({error, {bad_info_key, X}});
+ undefined -> Result;
+ Value -> Value
+ end) || X <- InfoItemKeys]).
+
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
lists:foreach(
fun (Result) -> display_row(
@@ -748,12 +785,19 @@ ensure_app_running(Node) ->
Other -> Other
end.
+notify_if_timeout(Pid, Ref, Timeout) ->
+ timer:send_after(Timeout, Pid, {Ref, {timeout, Timeout}}).
+
call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
call(Node, {Mod, Fun, Args}, Timeout) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args), Timeout).
+call(Node, {Mod, Fun, Args}, Ref, Timeout) when is_reference(Ref) ->
+ Args0 = lists:map(fun list_to_binary_utf8/1, Args),
+ spawn_link(rabbit_cli, rpc_call, [Node, Mod, Fun, Args0, Ref, self(), Timeout]).
+
list_to_binary_utf8(L) ->
B = list_to_binary(L),
case rabbit_binary_parser:validate_utf8(B) of