summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-06-09 18:32:02 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-06-09 18:32:02 +0200
commit2bcddc175519d8edd379c8bbecaf9c918d794361 (patch)
tree6ad6a33b7a6b19cbf5d8b953496c92b70f952737
parent73f47630aa566d6ae7f4849aa36d0243195d7713 (diff)
parent54ec4d41bb85db2afb8806032cbf93fa0c45fbf7 (diff)
downloadrabbitmq-server-git-2bcddc175519d8edd379c8bbecaf9c918d794361.tar.gz
Merge pull request #181 from rabbitmq/rabbitmq-server-62-part1
Introduce a timeout flag to rabbitmqctl (for some commands)
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--include/rabbit_cli.hrl2
-rw-r--r--src/rabbit_cli.erl18
-rw-r--r--src/rabbit_control_main.erl264
4 files changed, 197 insertions, 97 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 40d8978e9b..854dd277b4 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -41,6 +41,7 @@
<cmdsynopsis>
<command>rabbitmqctl</command>
<arg choice="opt">-n <replaceable>node</replaceable></arg>
+ <arg choice="opt">-t <replaceable>timeout</replaceable></arg>
<arg choice="opt">-q</arg>
<arg choice="req"><replaceable>command</replaceable></arg>
<arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg>
@@ -92,6 +93,15 @@
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><arg choice="opt">-t <replaceable>timeout</replaceable></arg></cmdsynopsis></term>
+ <listitem>
+ <para role="usage">
+ Operation timeout in seconds. Only applicable to "list" commands.
+ Default is "infinity".
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</refsect1>
diff --git a/include/rabbit_cli.hrl b/include/rabbit_cli.hrl
index 58c5a3a99e..1bffc9a604 100644
--- a/include/rabbit_cli.hrl
+++ b/include/rabbit_cli.hrl
@@ -17,6 +17,7 @@
-define(NODE_OPT, "-n").
-define(QUIET_OPT, "-q").
-define(VHOST_OPT, "-p").
+-define(TIMEOUT_OPT, "-t").
-define(VERBOSE_OPT, "-v").
-define(MINIMAL_OPT, "-m").
@@ -33,6 +34,7 @@
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+-define(TIMEOUT_DEF, {?TIMEOUT_OPT, {option, "infinity"}}).
-define(VERBOSE_DEF, {?VERBOSE_OPT, flag}).
-define(MINIMAL_DEF, {?MINIMAL_OPT, flag}).
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index 58724af850..33098ce16b 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -18,7 +18,7 @@
-include("rabbit_cli.hrl").
-export([main/3, start_distribution/0, start_distribution/1,
- parse_arguments/4, rpc_call/4]).
+ parse_arguments/4, rpc_call/4, rpc_call/5]).
%%----------------------------------------------------------------------------
@@ -94,8 +94,13 @@ main(ParseFun, DoFun, UsageMod) ->
print_error("~p", [Reason]),
rabbit_misc:quit(2);
{badrpc, Reason} ->
- print_error("unable to connect to node ~w: ~w", [Node, Reason]),
- print_badrpc_diagnostics([Node]),
+ case Reason of
+ timeout ->
+ print_error("operation ~w on node ~w timed out", [Command, Node]);
+ _ ->
+ print_error("unable to connect to node ~w: ~w", [Node, Reason]),
+ print_badrpc_diagnostics([Node])
+ end,
rabbit_misc:quit(2);
{badrpc_multi, Reason, Nodes} ->
print_error("unable to connect to nodes ~p: ~w", [Nodes, Reason]),
@@ -210,8 +215,11 @@ print_badrpc_diagnostics(Nodes) ->
%% a timeout unless we set our ticktime to be the same. So let's do
%% that.
rpc_call(Node, Mod, Fun, Args) ->
- case rpc:call(Node, net_kernel, get_net_ticktime, [], ?RPC_TIMEOUT) of
+ rpc_call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
+
+rpc_call(Node, Mod, Fun, Args, Timeout) ->
+ case rpc:call(Node, net_kernel, get_net_ticktime, [], Timeout) of
{badrpc, _} = E -> E;
Time -> net_kernel:set_net_ticktime(Time, 0),
- rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT)
+ rpc:call(Node, Mod, Fun, Args, Timeout)
end.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 25a0fbf9a6..82c8d80a98 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -21,11 +21,11 @@
-export([start/0, stop/0, parse_arguments/2, action/5,
sync_queue/1, cancel_sync_queue/1, become/1]).
--import(rabbit_cli, [rpc_call/4]).
+-import(rabbit_cli, [rpc_call/4, rpc_call/5]).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
--define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
+-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node), ?TIMEOUT_DEF]).
-define(COMMANDS,
[stop,
@@ -108,6 +108,11 @@
forget_cluster_node, rename_cluster_node, cluster_status, status,
environment, eval, force_boot]).
+-define(COMMANDS_WITH_TIMEOUT,
+ [list_user_permissions, list_policies, list_queues, list_exchanges,
+ list_bindings, list_connections, list_channels, list_consumers,
+ list_vhosts, list_parameters]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -119,6 +124,11 @@
fun ((string(), [any()]) -> 'ok'))
-> 'ok').
+-spec(action/6 ::
+ (atom(), node(), [string()], [{string(), any()}],
+ fun ((string(), [any()]) -> 'ok'), timeout())
+ -> 'ok').
+
-endif.
%%----------------------------------------------------------------------------
@@ -136,7 +146,19 @@ start() ->
io:format(Format ++ " ...~n", Args1)
end
end,
- do_action(Command, Node, Args, Opts, Inform)
+ try
+ T = case get_timeout(Opts) of
+ {ok, Timeout} ->
+ Timeout;
+ {error, _} ->
+ %% since this is an error with user input, ignore the quiet
+ %% setting
+ io:format("Failed to parse provided timeout value, using ~s~n", [?RPC_TIMEOUT]),
+ ?RPC_TIMEOUT
+ end,
+ do_action(Command, Node, Args, Opts, Inform, T)
+ catch _:E -> E
+ end
end, rabbit_ctl_usage).
parse_arguments(CmdLine, NodeStr) ->
@@ -160,18 +182,67 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
end,
io:nl().
+get_timeout(Opts) ->
+ parse_timeout(proplists:get_value(?TIMEOUT_OPT, Opts, ?RPC_TIMEOUT)).
+
+parse_number(N) when is_list(N) ->
+ try list_to_integer(N) of
+ Val -> Val
+ catch error:badarg ->
+ %% could have been a float, give it
+ %% another shot
+ list_to_float(N)
+ end.
+
+parse_timeout("infinity") ->
+ {ok, infinity};
+parse_timeout(infinity) ->
+ {ok, infinity};
+parse_timeout(N) when is_list(N) ->
+ try parse_number(N) of
+ M ->
+ Y = case M >= 0 of
+ true -> round(M) * 1000;
+ false -> ?RPC_TIMEOUT
+ end,
+ {ok, Y}
+ catch error:badarg ->
+ {error, infinity}
+ end;
+parse_timeout(N) ->
+ {ok, N}.
+
+format_timeout(N) when is_number(N) ->
+ float_to_list(N / 1000, [{decimals, 0}]).
+
+announce_timeout(infinity, _Inform) ->
+ %% no-op
+ ok;
+announce_timeout(Timeout, Inform) when is_number(Timeout) ->
+ Inform("Timeout: ~s seconds", [format_timeout(Timeout)]),
+ ok.
+
stop() ->
ok.
%%----------------------------------------------------------------------------
-do_action(Command, Node, Args, Opts, Inform) ->
+do_action(Command, Node, Args, Opts, Inform, Timeout) ->
case lists:member(Command, ?COMMANDS_NOT_REQUIRING_APP) of
- false -> case ensure_app_running(Node) of
- ok -> action(Command, Node, Args, Opts, Inform);
- E -> E
- end;
- true -> action(Command, Node, Args, Opts, Inform)
+ false ->
+ case ensure_app_running(Node) of
+ ok ->
+ case lists:member(Command, ?COMMANDS_WITH_TIMEOUT) of
+ true ->
+ announce_timeout(Timeout, Inform),
+ action(Command, Node, Args, Opts, Inform, Timeout);
+ false ->
+ action(Command, Node, Args, Opts, Inform)
+ end;
+ E -> E
+ end;
+ true ->
+ action(Command, Node, Args, Opts, Inform)
end.
action(stop, Node, Args, _Opts, Inform) ->
@@ -313,12 +384,6 @@ action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) ->
rpc_call(Node, rabbit_auth_backend_internal, set_tags,
[list_to_binary(Username), Tags]);
-action(list_users, Node, [], _Opts, Inform) ->
- Inform("Listing users", []),
- display_info_list(
- call(Node, {rabbit_auth_backend_internal, list_users, []}),
- rabbit_auth_backend_internal:user_info_keys());
-
action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Creating vhost \"~s\"", Args),
call(Node, {rabbit_vhost, add, Args});
@@ -327,63 +392,6 @@ action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Deleting vhost \"~s\"", Args),
call(Node, {rabbit_vhost, delete, Args});
-action(list_vhosts, Node, Args, _Opts, Inform) ->
- Inform("Listing vhosts", []),
- ArgAtoms = default_if_empty(Args, [name]),
- display_info_list(call(Node, {rabbit_vhost, info_all, []}), ArgAtoms);
-
-action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
- Inform("Listing permissions for user ~p", Args),
- display_info_list(call(Node, {rabbit_auth_backend_internal,
- list_user_permissions, Args}),
- rabbit_auth_backend_internal:user_perms_info_keys());
-
-action(list_queues, Node, Args, Opts, Inform) ->
- Inform("Listing queues", []),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [name, messages]),
- display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
- [VHostArg, ArgAtoms]),
- ArgAtoms);
-
-action(list_exchanges, Node, Args, Opts, Inform) ->
- Inform("Listing exchanges", []),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [name, type]),
- display_info_list(rpc_call(Node, rabbit_exchange, info_all,
- [VHostArg, ArgAtoms]),
- ArgAtoms);
-
-action(list_bindings, Node, Args, Opts, Inform) ->
- Inform("Listing bindings", []),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [source_name, source_kind,
- destination_name, destination_kind,
- routing_key, arguments]),
- display_info_list(rpc_call(Node, rabbit_binding, info_all,
- [VHostArg, ArgAtoms]),
- ArgAtoms);
-
-action(list_connections, Node, Args, _Opts, Inform) ->
- Inform("Listing connections", []),
- ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
- display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
- [ArgAtoms]),
- ArgAtoms);
-
-action(list_channels, Node, Args, _Opts, Inform) ->
- Inform("Listing channels", []),
- ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
- messages_unacknowledged]),
- display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
- ArgAtoms);
-
-action(list_consumers, Node, _Args, Opts, Inform) ->
- Inform("Listing consumers", []),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg]),
- rabbit_amqqueue:consumer_info_keys());
-
action(trace_on, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Starting tracing for vhost \"~s\"", [VHost]),
@@ -416,13 +424,6 @@ action(clear_permissions, Node, [Username], Opts, Inform) ->
call(Node, {rabbit_auth_backend_internal, clear_permissions,
[Username, VHost]});
-action(list_permissions, Node, [], Opts, Inform) ->
- VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Listing permissions in vhost \"~s\"", [VHost]),
- display_info_list(call(Node, {rabbit_auth_backend_internal,
- list_vhost_permissions, [VHost]}),
- rabbit_auth_backend_internal:vhost_perms_info_keys());
-
action(set_parameter, Node, [Component, Key, Value], Opts, Inform) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Setting runtime parameter ~p for component ~p to ~p",
@@ -438,13 +439,6 @@ action(clear_parameter, Node, [Component, Key], Opts, Inform) ->
list_to_binary(Component),
list_to_binary(Key)]);
-action(list_parameters, Node, [], Opts, Inform) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Listing runtime parameters", []),
- display_info_list(
- rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg]),
- rabbit_runtime_parameters:info_keys());
-
action(set_policy, Node, [Key, Pattern, Defn], Opts, Inform) ->
Msg = "Setting policy ~p for pattern ~p to ~p with priority ~p",
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
@@ -460,12 +454,6 @@ action(clear_policy, Node, [Key], Opts, Inform) ->
Inform("Clearing policy ~p", [Key]),
rpc_call(Node, rabbit_policy, delete, [VHostArg, list_to_binary(Key)]);
-action(list_policies, Node, [], Opts, Inform) ->
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- Inform("Listing policies", []),
- display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg]),
- rabbit_policy:info_keys());
-
action(report, Node, _Args, _Opts, Inform) ->
Inform("Reporting server status on ~p~n~n", [erlang:universaltime()]),
[begin ok = action(Action, N, [], [], Inform), io:nl() end ||
@@ -495,6 +483,95 @@ action(eval, Node, [Expr], _Opts, _Inform) ->
{error_string, format_parse_error(E)}
end.
+action(list_users, Node, [], _Opts, Inform, Timeout) ->
+ Inform("Listing users", []),
+ display_info_list(
+ call(Node, {rabbit_auth_backend_internal, list_users, []}, Timeout),
+ rabbit_auth_backend_internal:user_info_keys());
+
+action(list_permissions, Node, [], Opts, Inform, Timeout) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Inform("Listing permissions in vhost \"~s\"", [VHost]),
+ display_info_list(call(Node, {rabbit_auth_backend_internal,
+ list_vhost_permissions, [VHost]}, Timeout),
+ rabbit_auth_backend_internal:vhost_perms_info_keys());
+
+action(list_parameters, Node, [], Opts, Inform, Timeout) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Listing runtime parameters", []),
+ display_info_list(
+ rpc_call(Node, rabbit_runtime_parameters, list_formatted, [VHostArg],
+ Timeout),
+ rabbit_runtime_parameters:info_keys());
+
+action(list_policies, Node, [], Opts, Inform, Timeout) ->
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ Inform("Listing policies", []),
+ display_info_list(rpc_call(Node, rabbit_policy, list_formatted, [VHostArg],
+ Timeout),
+ rabbit_policy:info_keys());
+
+action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
+ Inform("Listing vhosts", []),
+ ArgAtoms = default_if_empty(Args, [name]),
+ display_info_list(call(Node, {rabbit_vhost, info_all, []}, Timeout),
+ ArgAtoms);
+
+action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
+ Inform("Listing permissions for user ~p", Args),
+ display_info_list(call(Node, {rabbit_auth_backend_internal,
+ list_user_permissions, Args}, Timeout),
+ rabbit_auth_backend_internal:user_perms_info_keys());
+
+action(list_queues, Node, Args, Opts, Inform, Timeout) ->
+ Inform("Listing queues", []),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, messages]),
+ display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
+ [VHostArg, ArgAtoms], Timeout),
+ ArgAtoms);
+
+action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
+ Inform("Listing exchanges", []),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, type]),
+ display_info_list(rpc_call(Node, rabbit_exchange, info_all,
+ [VHostArg, ArgAtoms], Timeout),
+ ArgAtoms);
+
+action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
+ Inform("Listing bindings", []),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [source_name, source_kind,
+ destination_name, destination_kind,
+ routing_key, arguments]),
+ display_info_list(rpc_call(Node, rabbit_binding, info_all,
+ [VHostArg, ArgAtoms], Timeout),
+ ArgAtoms);
+
+action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
+ Inform("Listing connections", []),
+ ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
+ display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
+ [ArgAtoms], Timeout),
+ ArgAtoms);
+
+action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
+ Inform("Listing channels", []),
+ ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
+ messages_unacknowledged]),
+ display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms],
+ Timeout),
+ ArgAtoms);
+
+action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
+ Inform("Listing consumers", []),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ display_info_list(rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg],
+ Timeout),
+ rabbit_amqqueue:consumer_info_keys()).
+
+
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
sync_queue(Q) ->
@@ -650,6 +727,9 @@ ensure_app_running(Node) ->
call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
+call(Node, {Mod, Fun, Args}, Timeout) ->
+ rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args), Timeout).
+
list_to_binary_utf8(L) ->
B = list_to_binary(L),
case rabbit_binary_parser:validate_utf8(B) of