diff options
author | Philip Kuryloski <kuryloskip@vmware.com> | 2021-07-16 16:02:40 +0200 |
---|---|---|
committer | mergify-bot <noreply@mergify.com> | 2021-12-06 13:04:00 +0000 |
commit | 953b5be6add1d0f835c1879b55803c342288ef01 (patch) | |
tree | 3804a004a1a3f7e61eba856f8c0359486095253d | |
parent | 29763a9318546df42e7fef90a5732132b203e318 (diff) | |
download | rabbitmq-server-git-rabbitmq-server-2571.tar.gz |
Optimisation for 'delegate'rabbitmq-server-2571
This is a manual rebase of rabbitmq-common #349
https://github.com/rabbitmq/rabbitmq-common/pull/349/commits/1c09c0f824cf2eca9284035db3f079465edb5733
-rw-r--r-- | deps/rabbit_common/src/delegate.erl | 67 |
1 files changed, 61 insertions, 6 deletions
diff --git a/deps/rabbit_common/src/delegate.erl b/deps/rabbit_common/src/delegate.erl index bdea11534d..5bb996d189 100644 --- a/deps/rabbit_common/src/delegate.erl +++ b/deps/rabbit_common/src/delegate.erl @@ -36,6 +36,17 @@ %% the pool is configurable, the aim is to make sure we don't have too %% few delegates and thus limit performance on many-CPU machines. +%% Optimisation for 'delegate' +%% If a message is sent to only one queue(in most application scenarios), +%% passing through the 'delegate' is meaningless. +%% Hardcoding "?DEFAULT_NAME and/or gen_server2" is to avoid affecting those +%% operations that must go through the 'delegate', such as: +%% 1. "delegate:invoke(Pids, {erlang, process_info, [memory]})", "erlang, process_info" +%% must be called inside the target node. +%% 2. "{Results, Errors} = delegate:invoke(MemberPids, ?DELEGATE_PREFIX, FunOrMFA)", +%% For some reason, the operation specifically specifies a delegate name rather than +%% ?DEFAULT_NAME. + -behaviour(gen_server2). -export([start_link/1, start_link/2, invoke_no_result/2, @@ -77,28 +88,47 @@ start_link(Name, Num) -> Name1 = delegate_name(Name, Num), gen_server2:start_link({local, Name1}, ?MODULE, [Name1], []). +invoke(Pid, FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) -> %% optimisation + case safe_invoke(Pid, FunOrMFA) of + {ok, _, Result} -> Result; + {error, _, Error} -> {error, Error} + end; invoke(Pid, FunOrMFA) -> invoke(Pid, ?DEFAULT_NAME, FunOrMFA). invoke(Pid, _Name, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> apply1(FunOrMFA, Pid); +invoke(Pid, ?DEFAULT_NAME, FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) -> %% optimisation + case safe_invoke(Pid, FunOrMFA) of + {ok, _, Result} -> Result; + {error, _, Error} -> {error, Error} + end; invoke(Pid, Name, FunOrMFA) when is_pid(Pid) -> case invoke([Pid], Name, FunOrMFA) of - {[{Pid, Result}], []} -> - Result; - {[], [{Pid, {Class, Reason, StackTrace}}]} -> - erlang:raise(Class, Reason, StackTrace) + {[{Pid, Result}], []} -> Result; + {[], [{Pid, Error}]} -> {error, Error} end; invoke([], _Name, _FunOrMFA) -> %% optimisation {[], []}; +invoke([Pid], ?DEFAULT_NAME, FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) -> %% optimisation + case safe_invoke(Pid, FunOrMFA) of + {ok, _, Result} -> {[{Pid, Result}], []}; + {error, _, Error} -> {[], [{Pid, Error}]} + end; invoke([Pid], _Name, FunOrMFA) when node(Pid) =:= node() -> %% optimisation case safe_invoke(Pid, FunOrMFA) of {ok, _, Result} -> {[{Pid, Result}], []}; {error, _, Error} -> {[], [{Pid, Error}]} end; +invoke(Pids, Name = ?DEFAULT_NAME, FunOrMFA = {gen_server2, _F, _A}) when is_list(Pids) -> + {LocalCallPids, Grouped} = group_local_call_pids_by_node(Pids), + invoke(Pids, Name, FunOrMFA, LocalCallPids, Grouped); invoke(Pids, Name, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), + invoke(Pids, Name, FunOrMFA, LocalPids, Grouped). + +invoke(Pids, Name, FunOrMFA, LocalCallPids, Grouped) when is_list(Pids) -> %% The use of multi_call is only safe because the timeout is %% infinity, and thus there is no process spawned in order to do %% the sending. Thus calls can't overtake preceding calls/casts. @@ -112,7 +142,7 @@ invoke(Pids, Name, FunOrMFA) when is_list(Pids) -> BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, Pid <- maps:get(BadNode, Grouped)], - ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) | + ResultsNoNode = lists:append([safe_invoke(LocalCallPids, FunOrMFA) | [Results || {_Node, Results} <- Replies]]), lists:foldl( fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; @@ -134,6 +164,9 @@ demonitor(Ref) when is_reference(Ref) -> demonitor({Name, Pid}) -> gen_server2:cast(Name, {demonitor, self(), Pid}). +invoke_no_result(Pid, FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) -> + _ = safe_invoke(Pid, FunOrMFA), %% we don't care about any error + ok; invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> %% Optimization, avoids calling invoke_no_result/3. %% @@ -154,6 +187,9 @@ invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) -> ok; invoke_no_result([], _FunOrMFA) -> %% optimisation ok; +invoke_no_result([Pid], FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) -> %% optimisation + _ = safe_invoke(Pid, FunOrMFA), %% must not die + ok; invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation _ = safe_invoke(Pid, FunOrMFA), %% must not die ok; @@ -163,15 +199,21 @@ invoke_no_result([Pid], FunOrMFA) -> {invoke, FunOrMFA, maps:from_list([{RemoteNode, [Pid]}])}), ok; +invoke_no_result(Pids, FunOrMFA = {gen_server2, _F, _A}) when is_list(Pids) -> + {LocalCallPids, Grouped} = group_local_call_pids_by_node(Pids), + invoke_no_result(Pids, FunOrMFA, LocalCallPids, Grouped); invoke_no_result(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), + invoke_no_result(Pids, FunOrMFA, LocalPids, Grouped). + +invoke_no_result(Pids, FunOrMFA, LocalCallPids, Grouped) when is_list(Pids) -> case maps:keys(Grouped) of [] -> ok; RemoteNodes -> gen_server2:abcast( RemoteNodes, delegate(self(), ?DEFAULT_NAME, RemoteNodes), {invoke, FunOrMFA, Grouped}) end, - _ = safe_invoke(LocalPids, FunOrMFA), %% must not die + _ = safe_invoke(LocalCallPids, FunOrMFA), %% must not die ok. %%---------------------------------------------------------------------------- @@ -187,6 +229,19 @@ group_pids_by_node(Pids) -> node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} end, {[], maps:new()}, Pids). +group_local_call_pids_by_node(Pids) -> + {LocalPids0, Grouped0} = group_pids_by_node(Pids), + maps:fold(fun(K, V, {AccIn, MapsIn}) -> + case V of + %% just one Pid for the node + [SinglePid] -> {[SinglePid | AccIn], MapsIn}; + %% If the value is a list of more than one pid, + %% the (K,V) will be put into the new map which will be called + %% through delegate to reduce inter-node communication. + _ -> {AccIn, maps:update_with(K, fun(V1) -> V1 end, V, MapsIn)} + end + end, {LocalPids0, maps:new()}, Grouped0). + delegate_name(Name, Hash) -> list_to_atom(Name ++ integer_to_list(Hash)). |