diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-06-09 20:51:39 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-06-09 20:51:39 +0300 |
| commit | e758072e3d28581999b6a0b8aa7b91c3e1b538cb (patch) | |
| tree | 212b709843657f26eb914541455c3452824eaee3 /src | |
| parent | 0cc6b93d3b6773e429f1b3209c4d1df4c20243a1 (diff) | |
| parent | 2bcddc175519d8edd379c8bbecaf9c918d794361 (diff) | |
| download | rabbitmq-server-git-e758072e3d28581999b6a0b8aa7b91c3e1b538cb.tar.gz | |
Merge branch 'stable'
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_cli.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 264 |
2 files changed, 185 insertions, 97 deletions
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl index 58724af850..33098ce16b 100644 --- a/src/rabbit_cli.erl +++ b/src/rabbit_cli.erl @@ -18,7 +18,7 @@ -include("rabbit_cli.hrl"). -export([main/3, start_distribution/0, start_distribution/1, - parse_arguments/4, rpc_call/4]). + parse_arguments/4, rpc_call/4, rpc_call/5]). %%---------------------------------------------------------------------------- @@ -94,8 +94,13 @@ main(ParseFun, DoFun, UsageMod) -> print_error("~p", [Reason]), rabbit_misc:quit(2); {badrpc, Reason} -> - print_error("unable to connect to node ~w: ~w", [Node, Reason]), - print_badrpc_diagnostics([Node]), + case Reason of + timeout -> + print_error("operation ~w on node ~w timed out", [Command, Node]); + _ -> + print_error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics([Node]) + end, rabbit_misc:quit(2); {badrpc_multi, Reason, Nodes} -> print_error("unable to connect to nodes ~p: ~w", [Nodes, Reason]), @@ -210,8 +215,11 @@ print_badrpc_diagnostics(Nodes) -> %% a timeout unless we set our ticktime to be the same. So let's do %% that. rpc_call(Node, Mod, Fun, Args) -> - case rpc:call(Node, net_kernel, get_net_ticktime, [], ?RPC_TIMEOUT) of + rpc_call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). + +rpc_call(Node, Mod, Fun, Args, Timeout) -> + case rpc:call(Node, net_kernel, get_net_ticktime, [], Timeout) of {badrpc, _} = E -> E; Time -> net_kernel:set_net_ticktime(Time, 0), - rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT) + rpc:call(Node, Mod, Fun, Args, Timeout) end. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 25a0fbf9a6..82c8d80a98 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -21,11 +21,11 @@ -export([start/0, stop/0, parse_arguments/2, action/5, sync_queue/1, cancel_sync_queue/1, become/1]). --import(rabbit_cli, [rpc_call/4]). +-import(rabbit_cli, [rpc_call/4, rpc_call/5]). -define(EXTERNAL_CHECK_INTERVAL, 1000). --define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]). +-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node), ?TIMEOUT_DEF]). -define(COMMANDS, [stop, @@ -108,6 +108,11 @@ forget_cluster_node, rename_cluster_node, cluster_status, status, environment, eval, force_boot]). +-define(COMMANDS_WITH_TIMEOUT, + [list_user_permissions, list_policies, list_queues, list_exchanges, + list_bindings, list_connections, list_channels, list_consumers, + list_vhosts, list_parameters]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -119,6 +124,11 @@ fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(action/6 :: + (atom(), node(), [string()], [{string(), any()}], + fun ((string(), [any()]) -> 'ok'), timeout()) + -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -136,7 +146,19 @@ start() -> io:format(Format ++ " ...~n", Args1) end end, - do_action(Command, Node, Args, Opts, Inform) + try + T = case get_timeout(Opts) of + {ok, Timeout} -> + Timeout; + {error, _} -> + %% since this is an error with user input, ignore the quiet + %% setting + io:format("Failed to parse provided timeout value, using ~s~n", [?RPC_TIMEOUT]), + ?RPC_TIMEOUT + end, + do_action(Command, Node, Args, Opts, Inform, T) + catch _:E -> E + end end, rabbit_ctl_usage). parse_arguments(CmdLine, NodeStr) -> @@ -160,18 +182,67 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) -> end, io:nl(). +get_timeout(Opts) -> + parse_timeout(proplists:get_value(?TIMEOUT_OPT, Opts, ?RPC_TIMEOUT)). + +parse_number(N) when is_list(N) -> + try list_to_integer(N) of + Val -> Val + catch error:badarg -> + %% could have been a float, give it + %% another shot + list_to_float(N) + end. + +parse_timeout("infinity") -> + {ok, infinity}; +parse_timeout(infinity) -> + {ok, infinity}; +parse_timeout(N) when is_list(N) -> + try parse_number(N) of + M -> + Y = case M >= 0 of + true -> round(M) * 1000; + false -> ?RPC_TIMEOUT + end, + {ok, Y} + catch error:badarg -> + {error, infinity} + end; +parse_timeout(N) -> + {ok, N}. + +format_timeout(N) when is_number(N) -> + float_to_list(N / 1000, [{decimals, 0}]). + +announce_timeout(infinity, _Inform) -> + %% no-op + ok; +announce_timeout(Timeout, Inform) when is_number(Timeout) -> + Inform("Timeout: ~s seconds", [format_timeout(Timeout)]), + ok. + stop() -> ok. %%---------------------------------------------------------------------------- -do_action(Command, Node, Args, Opts, Inform) -> +do_action(Command, Node, Args, Opts, Inform, Timeout) -> case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of - false -> case ensure_app_running(Node) of - ok -> action(Command, Node, Args, Opts, Inform); - E -> E - end; - true -> action(Command, Node, Args, Opts, Inform) + false -> + case ensure_app_running(Node) of + ok -> + case lists:member(Command, ?COMMANDS_WITH_TIMEOUT) of + true -> + announce_timeout(Timeout, Inform), + action(Command, Node, Args, Opts, Inform, Timeout); + false -> + action(Command, Node, Args, Opts, Inform) + end; + E -> E + end; + true -> + action(Command, Node, Args, Opts, Inform) end. action(stop, Node, Args, _Opts, Inform) -> @@ -313,12 +384,6 @@ action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> rpc_call(Node, rabbit_auth_backend_internal, set_tags, [list_to_binary(Username), Tags]); -action(list_users, Node, [], _Opts, Inform) -> - Inform("Listing users", []), - display_info_list( - call(Node, {rabbit_auth_backend_internal, list_users, []}), - rabbit_auth_backend_internal:user_info_keys()); - action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Creating vhost \"~s\"", Args), call(Node, {rabbit_vhost, add, Args}); @@ -327,63 +392,6 @@ action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Deleting vhost \"~s\"", Args), call(Node, {rabbit_vhost, delete, Args}); -action(list_vhosts, Node, Args, _Opts, Inform) -> - Inform("Listing vhosts", []), - ArgAtoms = default_if_empty(Args, [name]), - display_info_list(call(Node, {rabbit_vhost, info_all, []}), ArgAtoms); - -action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> - Inform("Listing permissions for user ~p", Args), - display_info_list(call(Node, {rabbit_auth_backend_internal, - list_user_permissions, Args}), - rabbit_auth_backend_internal:user_perms_info_keys()); - -action(list_queues, Node, Args, Opts, Inform) -> - Inform("Listing queues", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [name, messages]), - display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, - [VHostArg, ArgAtoms]), - ArgAtoms); - -action(list_exchanges, Node, Args, Opts, Inform) -> - Inform("Listing exchanges", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [name, type]), - display_info_list(rpc_call(Node, rabbit_exchange, info_all, - [VHostArg, ArgAtoms]), - ArgAtoms); - -action(list_bindings, Node, Args, Opts, Inform) -> - Inform("Listing bindings", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - ArgAtoms = default_if_empty(Args, [source_name, source_kind, - destination_name, destination_kind, - routing_key, arguments]), - display_info_list(rpc_call(Node, rabbit_binding, info_all, - [VHostArg, ArgAtoms]), - ArgAtoms); - -action(list_connections, Node, Args, _Opts, Inform) -> - Inform("Listing connections", []), - ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), - display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, - [ArgAtoms]), - ArgAtoms); - -action(list_channels, Node, Args, _Opts, Inform) -> - Inform("Listing channels", []), - ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, - messages_unacknowledged]), - display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), - ArgAtoms); - -action(list_consumers, Node, _Args, Opts, Inform) -> - Inform("Listing consumers", []), - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg]), - rabbit_amqqueue:consumer_info_keys()); - action(trace_on, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Starting tracing for vhost \"~s\"", [VHost]), @@ -416,13 +424,6 @@ action(clear_permissions, Node, [Username], Opts, Inform) -> call(Node, {rabbit_auth_backend_internal, clear_permissions, [Username, VHost]}); -action(list_permissions, Node, [], Opts, Inform) -> - VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Listing permissions in vhost \"~s\"", [VHost]), - display_info_list(call(Node, {rabbit_auth_backend_internal, - list_vhost_permissions, [VHost]}), - rabbit_auth_backend_internal:vhost_perms_info_keys()); - action(set_parameter, Node, [Component, Key, Value], Opts, Inform) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Setting runtime parameter ~p for component ~p to ~p", @@ -438,13 +439,6 @@ action(clear_parameter, Node, [Component, Key], Opts, Inform) -> list_to_binary(Component), list_to_binary(Key)]); -action(list_parameters, Node, [], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Listing runtime parameters", []), - display_info_list( - rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]), - rabbit_runtime_parameters:info_keys()); - action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) -> Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p", VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), @@ -460,12 +454,6 @@ action(clear_policy, Node, [Key], Opts, Inform) -> Inform("Clearing policy ~p", [Key]), rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]); -action(list_policies, Node, [], Opts, Inform) -> - VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - Inform("Listing policies", []), - display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]), - rabbit_policy:info_keys()); - action(report, Node, _Args, _Opts, Inform) -> Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]), [begin ok = action(Action, N, [], [], Inform), io:nl() end || @@ -495,6 +483,95 @@ action(eval, Node, [Expr], _Opts, _Inform) -> {error_string, format_parse_error(E)} end. +action(list_users, Node, [], _Opts, Inform, Timeout) -> + Inform("Listing users", []), + display_info_list( + call(Node, {rabbit_auth_backend_internal, list_users, []}, Timeout), + rabbit_auth_backend_internal:user_info_keys()); + +action(list_permissions, Node, [], Opts, Inform, Timeout) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Listing permissions in vhost \"~s\"", [VHost]), + display_info_list(call(Node, {rabbit_auth_backend_internal, + list_vhost_permissions, [VHost]}, Timeout), + rabbit_auth_backend_internal:vhost_perms_info_keys()); + +action(list_parameters, Node, [], Opts, Inform, Timeout) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing runtime parameters", []), + display_info_list( + rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg], + Timeout), + rabbit_runtime_parameters:info_keys()); + +action(list_policies, Node, [], Opts, Inform, Timeout) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing policies", []), + display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg], + Timeout), + rabbit_policy:info_keys()); + +action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> + Inform("Listing vhosts", []), + ArgAtoms = default_if_empty(Args, [name]), + display_info_list(call(Node, {rabbit_vhost, info_all, []}, Timeout), + ArgAtoms); + +action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> + Inform("Listing permissions for user ~p", Args), + display_info_list(call(Node, {rabbit_auth_backend_internal, + list_user_permissions, Args}, Timeout), + rabbit_auth_backend_internal:user_perms_info_keys()); + +action(list_queues, Node, Args, Opts, Inform, Timeout) -> + Inform("Listing queues", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, messages]), + display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, + [VHostArg, ArgAtoms], Timeout), + ArgAtoms); + +action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> + Inform("Listing exchanges", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, type]), + display_info_list(rpc_call(Node, rabbit_exchange, info_all, + [VHostArg, ArgAtoms], Timeout), + ArgAtoms); + +action(list_bindings, Node, Args, Opts, Inform, Timeout) -> + Inform("Listing bindings", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [source_name, source_kind, + destination_name, destination_kind, + routing_key, arguments]), + display_info_list(rpc_call(Node, rabbit_binding, info_all, + [VHostArg, ArgAtoms], Timeout), + ArgAtoms); + +action(list_connections, Node, Args, _Opts, Inform, Timeout) -> + Inform("Listing connections", []), + ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), + display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, + [ArgAtoms], Timeout), + ArgAtoms); + +action(list_channels, Node, Args, _Opts, Inform, Timeout) -> + Inform("Listing channels", []), + ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, + messages_unacknowledged]), + display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms], + Timeout), + ArgAtoms); + +action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> + Inform("Listing consumers", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg], + Timeout), + rabbit_amqqueue:consumer_info_keys()). + + format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). sync_queue(Q) -> @@ -650,6 +727,9 @@ ensure_app_running(Node) -> call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). +call(Node, {Mod, Fun, Args}, Timeout) -> + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args), Timeout). + list_to_binary_utf8(L) -> B = list_to_binary(L), case rabbit_binary_parser:validate_utf8(B) of |
