summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexey Lebedeff <alebedev@mirantis.com>2016-03-09 18:09:04 +0300
committerAlexey Lebedeff <alebedev@mirantis.com>2016-05-18 15:18:48 +0300
commitd963bc8eff23802fb9f7ed9b0fa0b2f2040d9c1e (patch)
tree7d95583a2a086e4c1fb79b1fe299547970098876
parent79e216827b308902039fbcd60bad017148580a7a (diff)
downloadrabbitmq-server-git-d963bc8eff23802fb9f7ed9b0fa0b2f2040d9c1e.tar.gz
Avoid RPC roundtrips in list commands
Current implementation of various `list_XXX` commands require cross-node roundtrip for every processed item - because `rabbitmqctl` target node is responsible for gathering global list of all items of interest (channels etc.) and then processing them one by one. For example, listing 10000 channels evenly distributed across 3 nodes where network has 1ms delay takes more than 10 seconds on my machine. And with the proposed change listing will take almost the same time as it'll take to gather this info locally. E.g. in the case above listing now takes 0.7 second on the same machine with same 1ms delay. It works by invoking emitting_map on every node, where it should send info about only local items to aggregator, in an async fashion - as no reply from aggregator is needed.
-rw-r--r--src/rabbit_control_main.erl154
1 files changed, 96 insertions, 58 deletions
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 7f653c3780..da5756db3d 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -23,7 +23,7 @@
sync_queue/1, cancel_sync_queue/1, become/1,
purge_queue/1]).
--import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]).
+-import(rabbit_misc, [rpc_call/4, rpc_call/5]).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -579,56 +579,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
action(list_users, Node, [], _Opts, Inform, Timeout) ->
Inform("Listing users", []),
- call(Node, {rabbit_auth_backend_internal, list_users, []},
- rabbit_auth_backend_internal:user_info_keys(), true, Timeout);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_users, []},
+ rabbit_auth_backend_internal:user_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8]);
action(list_permissions, Node, [], Opts, Inform, Timeout) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Listing permissions in vhost \"~s\"", [VHost]),
- call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
- rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout,
- true);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
+ rabbit_auth_backend_internal:vhost_perms_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8, is_escaped]);
action(list_parameters, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing runtime parameters", []),
- call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
- rabbit_runtime_parameters:info_keys(), Timeout);
+ call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
+ rabbit_runtime_parameters:info_keys(),
+ [{timeout, Timeout}]);
action(list_policies, Node, [], Opts, Inform, Timeout) ->
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
Inform("Listing policies", []),
- call(Node, {rabbit_policy, list_formatted, [VHostArg]},
- rabbit_policy:info_keys(), Timeout);
+ call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]},
+ rabbit_policy:info_keys(),
+ [{timeout, Timeout}]);
action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing vhosts", []),
ArgAtoms = default_if_empty(Args, [name]),
- call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout);
+ call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms,
+ [{timeout, Timeout}, to_bin_utf8]);
action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
{error_string,
"list_user_permissions expects a username argument, but none provided."};
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
Inform("Listing permissions for user ~p", Args),
- call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
- rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout,
- true);
+ call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
+ rabbit_auth_backend_internal:user_perms_info_keys(),
+ [{timeout, Timeout}, to_bin_utf8, is_escaped]);
action(list_queues, Node, Args, Opts, Inform, Timeout) ->
- [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
Inform("Listing queues", []),
+ %% User options
+ [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, messages]),
- call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
- ArgAtoms, Timeout);
+
+ %% Data for emission
+ Nodes = nodes_in_cluster(Node, Timeout),
+ OnlineChunks = if Online -> length(Nodes); true -> 0 end,
+ OfflineChunks = if Offline -> 1; true -> 0 end,
+ ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
+ TimeoutOpt = {timeout, Timeout},
+ EmissionRef = make_ref(),
+ EmissionRefOpt = {ref, EmissionRef},
+
+ _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]),
+ _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
+ [TimeoutOpt, EmissionRefOpt]),
+ display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing exchanges", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
ArgAtoms = default_if_empty(Args, [name, type]),
- call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, Timeout);
+ call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}]);
action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing bindings", []),
@@ -636,27 +654,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
ArgAtoms = default_if_empty(Args, [source_name, source_kind,
destination_name, destination_kind,
routing_key, arguments]),
- call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
- ArgAtoms, Timeout);
+ call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}]);
action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing connections", []),
ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
- call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]},
- ArgAtoms, Timeout);
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]},
+ ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]);
action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
Inform("Listing channels", []),
ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
messages_unacknowledged]),
- call(Node, {rabbit_channel, info_all, [ArgAtoms]},
- ArgAtoms, Timeout);
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms,
+ [{timeout, Timeout}, {chunks, length(Nodes)}]);
action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
Inform("Listing consumers", []),
VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]},
- rabbit_amqqueue:consumer_info_keys(), Timeout).
+ Nodes = nodes_in_cluster(Node, Timeout),
+ call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]},
+ rabbit_amqqueue:consumer_info_keys(),
+ [{timeout, Timeout}, {chunks, length(Nodes)}]).
format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
@@ -766,17 +788,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
{X, Value} -> Value
end, IsEscaped) || X <- InfoItemKeys]).
-display_info_message(IsEscaped) ->
+display_info_message(IsEscaped, InfoItemKeys) ->
fun ([], _) ->
ok;
- ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) ->
+ ([FirstResult|_] = List, _) when is_list(FirstResult) ->
lists:foreach(fun(Result) ->
display_info_message_row(IsEscaped, Result, InfoItemKeys)
end,
List),
ok;
- (Result, InfoItemKeys) ->
- display_info_message_row(IsEscaped, Result, InfoItemKeys)
+ (Result, _) ->
+ display_info_message_row(IsEscaped, Result, InfoItemKeys),
+ ok
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
@@ -833,7 +856,10 @@ display_call_result(Node, MFA) ->
end.
unsafe_rpc(Node, Mod, Fun, Args) ->
- case rpc_call(Node, Mod, Fun, Args) of
+ unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
+
+unsafe_rpc(Node, Mod, Fun, Args, Timeout) ->
+ case rpc_call(Node, Mod, Fun, Args, Timeout) of
{badrpc, _} = Res -> throw(Res);
Normal -> Normal
end.
@@ -852,33 +878,42 @@ ensure_app_running(Node) ->
call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
-call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) ->
- call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false).
+call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) ->
+ Ref = start_emission(Node, {Mod, Fun, Args}, Opts),
+ display_emission_result(Ref, InfoKeys, Opts).
+
+start_emission(Node, {Mod, Fun, Args}, Opts) ->
+ ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false),
+ Timeout = proplists:get_value(timeout, Opts, infinity),
+ Ref = proplists:get_value(ref, Opts, make_ref()),
+ rabbit_control_misc:spawn_emitter_caller(
+ Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8),
+ Ref, self(), Timeout),
+ Ref.
+
+display_emission_result(Ref, InfoKeys, Opts) ->
+ IsEscaped = proplists:get_value(is_escaped, Opts, false),
+ Chunks = proplists:get_value(chunks, Opts, 1),
+ Timeout = proplists:get_value(timeout, Opts, infinity),
+ EmissionStatus = rabbit_control_misc:wait_for_info_messages(
+ self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks),
+ emission_to_action_result(EmissionStatus).
+
+%% Convert rabbit_control_misc:wait_for_info_messages/6 return value
+%% into form expected by rabbit_cli:main/3.
+emission_to_action_result({ok, ok}) ->
+ ok;
+emission_to_action_result({error, Error}) ->
+ Error.
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) ->
- call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false).
+prepare_call_args(Args, ToBinUtf8) ->
+ case ToBinUtf8 of
+ true -> valid_utf8_args(Args);
+ false -> Args
+ end.
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) ->
- Args0 = case ToBinUtf8 of
- true -> lists:map(fun list_to_binary_utf8/1, Args);
- false -> Args
- end,
- Ref = make_ref(),
- Pid = self(),
- spawn_link(
- fun () ->
- case rabbit_cli:rpc_call(Node, Mod, Fun, Args0,
- Ref, Pid, Timeout) of
- {error, _} = Error ->
- Pid ! {error, Error};
- {bad_argument, _} = Error ->
- Pid ! {error, Error};
- _ ->
- ok
- end
- end),
- rabbit_control_misc:wait_for_info_messages(
- Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout).
+valid_utf8_args(Args) ->
+ lists:map(fun list_to_binary_utf8/1, Args).
list_to_binary_utf8(L) ->
B = list_to_binary(L),
@@ -928,7 +963,10 @@ split_list([_]) -> exit(even_list_needed);
split_list([A, B | T]) -> [{A, B} | split_list(T)].
nodes_in_cluster(Node) ->
- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT).
+
+nodes_in_cluster(Node, Timeout) ->
+ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout).
alarms_by_node(Name) ->
Status = unsafe_rpc(Name, rabbit, status, []),