summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_control_main.erl154
1 files changed, 96 insertions, 58 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index bee7128907..bf50fd6c49 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -23,7 +23,7 @@
sync_queue/1, cancel_sync_queue/1, become/1,
purge_queue/1]).
--import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]).
+-import(rabbit_misc, [rpc_call/4, rpc_call/5]).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -590,56 +590,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
action(list_users, Node, [], _Opts, Inform, Timeout) ->
Inform("Listing users", []),
- call(Node, {rabbit_auth_backend_internal, list_users, []},
- rabbit_auth_backend_internal:user_info_keys(), true, Timeout);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_users, []},
+ rabbit_auth_backend_internal:user_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8]);
action(list_permissions, Node, [], Opts, Inform, Timeout) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost \"~s\"", [VHost]),
- call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
- rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout,
- true);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
+ rabbit_auth_backend_internal:vhost_perms_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8, is_escaped]);
action(list_parameters, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing runtime parameters", []),
- call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
- rabbit_runtime_parameters:info_keys(), Timeout);
+ call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
+ rabbit_runtime_parameters:info_keys(),
+ [{timeout, Timeout}]);
action(list_policies, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing policies", []),
- call(Node, {rabbit_policy, list_formatted, [VHostArg]},
- rabbit_policy:info_keys(), Timeout);
+ call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]},
+ rabbit_policy:info_keys(),
+ [{timeout, Timeout}]);
action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing vhosts", []),
ArgAtoms = default_if_empty(Args, [name]),
- call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout);
+ call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms,
+ [{timeout, Timeout}, to_bin_utf8]);
action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
{error_string,
"list_user_permissions expects a username argument, but none provided."};
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
Inform("Listing permissions for user ~p", Args),
- call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
- rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout,
- true);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
+ rabbit_auth_backend_internal:user_perms_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8, is_escaped]);
action(list_queues, Node, Args, Opts, Inform, Timeout) ->
- [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
Inform("Listing queues", []),
+ %% User options
+ [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, messages]),
- call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
- ArgAtoms, Timeout);
+
+ %% Data for emission
+ Nodes = nodes_in_cluster(Node, Timeout),
+ OnlineChunks = if Online -> length(Nodes); true -> 0 end,
+ OfflineChunks = if Offline -> 1; true -> 0 end,
+ ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
+ TimeoutOpt = {timeout, Timeout},
+ EmissionRef = make_ref(),
+ EmissionRefOpt = {ref, EmissionRef},
+
+ _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]),
+ _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]),
+ display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing exchanges", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, type]),
- call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, Timeout);
+ call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}]);
action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing bindings", []),
@@ -647,27 +665,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
ArgAtoms = default_if_empty(Args, [source_name, source_kind,
destination_name, destination_kind,
routing_key, arguments]),
- call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, Timeout);
+ call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}]);
action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing connections", []),
ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
- call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]},
- ArgAtoms, Timeout);
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]);
action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing channels", []),
ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
messages_unacknowledged]),
- call(Node, {rabbit_channel, info_all, [ArgAtoms]},
- ArgAtoms, Timeout);
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms,
+ [{timeout, Timeout}, {chunks, length(Nodes)}]);
action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
Inform("Listing consumers", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]},
- rabbit_amqqueue:consumer_info_keys(), Timeout).
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]},
+ rabbit_amqqueue:consumer_info_keys(),
+ [{timeout, Timeout}, {chunks, length(Nodes)}]).
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
@@ -777,17 +799,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
{X, Value} -> Value
end, IsEscaped) || X <- InfoItemKeys]).
-display_info_message(IsEscaped) ->
+display_info_message(IsEscaped, InfoItemKeys) ->
fun ([], _) ->
ok;
- ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) ->
+ ([FirstResult|_] = List, _) when is_list(FirstResult) ->
lists:foreach(fun(Result) ->
display_info_message_row(IsEscaped, Result, InfoItemKeys)
end,
List),
ok;
- (Result, InfoItemKeys) ->
- display_info_message_row(IsEscaped, Result, InfoItemKeys)
+ (Result, _) ->
+ display_info_message_row(IsEscaped, Result, InfoItemKeys),
+ ok
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
@@ -844,7 +867,10 @@ display_call_result(Node, MFA) ->
end.
unsafe_rpc(Node, Mod, Fun, Args) ->
- case rpc_call(Node, Mod, Fun, Args) of
+ unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
+
+unsafe_rpc(Node, Mod, Fun, Args, Timeout) ->
+ case rpc_call(Node, Mod, Fun, Args, Timeout) of
{badrpc, _} = Res -> throw(Res);
Normal -> Normal
end.
@@ -863,33 +889,42 @@ ensure_app_running(Node) ->
call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
-call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) ->
- call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false).
+call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) ->
+ Ref = start_emission(Node, {Mod, Fun, Args}, Opts),
+ display_emission_result(Ref, InfoKeys, Opts).
+
+start_emission(Node, {Mod, Fun, Args}, Opts) ->
+ ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false),
+ Timeout = proplists:get_value(timeout, Opts, infinity),
+ Ref = proplists:get_value(ref, Opts, make_ref()),
+ rabbit_control_misc:spawn_emitter_caller(
+ Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8),
+ Ref, self(), Timeout),
+ Ref.
+
+display_emission_result(Ref, InfoKeys, Opts) ->
+ IsEscaped = proplists:get_value(is_escaped, Opts, false),
+ Chunks = proplists:get_value(chunks, Opts, 1),
+ Timeout = proplists:get_value(timeout, Opts, infinity),
+ EmissionStatus = rabbit_control_misc:wait_for_info_messages(
+ self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks),
+ emission_to_action_result(EmissionStatus).
+
+%% Convert rabbit_control_misc:wait_for_info_messages/6 return value
+%% into form expected by rabbit_cli:main/3.
+emission_to_action_result({ok, ok}) ->
+ ok;
+emission_to_action_result({error, Error}) ->
+ Error.
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) ->
- call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false).
+prepare_call_args(Args, ToBinUtf8) ->
+ case ToBinUtf8 of
+ true -> valid_utf8_args(Args);
+ false -> Args
+ end.
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) ->
- Args0 = case ToBinUtf8 of
- true -> lists:map(fun list_to_binary_utf8/1, Args);
- false -> Args
- end,
- Ref = make_ref(),
- Pid = self(),
- spawn_link(
- fun () ->
- case rabbit_cli:rpc_call(Node, Mod, Fun, Args0,
- Ref, Pid, Timeout) of
- {error, _} = Error ->
- Pid ! {error, Error};
- {bad_argument, _} = Error ->
- Pid ! {error, Error};
- _ ->
- ok
- end
- end),
- rabbit_control_misc:wait_for_info_messages(
- Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout).
+valid_utf8_args(Args) ->
+ lists:map(fun list_to_binary_utf8/1, Args).
list_to_binary_utf8(L) ->
B = list_to_binary(L),
@@ -939,7 +974,10 @@ split_list([_]) -> exit(even_list_needed);
split_list([A, B | T]) -> [{A, B} | split_list(T)].
nodes_in_cluster(Node) ->
- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT).
+
+nodes_in_cluster(Node, Timeout) ->
+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout).
alarms_by_node(Name) ->
Status = unsafe_rpc(Name, rabbit, status, []),