summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilip Kuryloski <kuryloskip@vmware.com>2021-07-16 16:02:40 +0200
committermergify-bot <noreply@mergify.com>2021-12-06 13:04:00 +0000
commit953b5be6add1d0f835c1879b55803c342288ef01 (patch)
tree3804a004a1a3f7e61eba856f8c0359486095253d
parent29763a9318546df42e7fef90a5732132b203e318 (diff)
downloadrabbitmq-server-git-rabbitmq-server-2571.tar.gz
Optimisation for 'delegate'rabbitmq-server-2571
This is a manual rebase of rabbitmq-common #349 https://github.com/rabbitmq/rabbitmq-common/pull/349/commits/1c09c0f824cf2eca9284035db3f079465edb5733
-rw-r--r--deps/rabbit_common/src/delegate.erl67
1 files changed, 61 insertions, 6 deletions
diff --git a/deps/rabbit_common/src/delegate.erl b/deps/rabbit_common/src/delegate.erl
index bdea11534d..5bb996d189 100644
--- a/deps/rabbit_common/src/delegate.erl
+++ b/deps/rabbit_common/src/delegate.erl
@@ -36,6 +36,17 @@
%% the pool is configurable, the aim is to make sure we don't have too
%% few delegates and thus limit performance on many-CPU machines.
+%% Optimisation for 'delegate'
+%% If a message is sent to only one queue(in most application scenarios),
+%% passing through the 'delegate' is meaningless.
+%% Hardcoding "?DEFAULT_NAME and/or gen_server2" is to avoid affecting those
+%% operations that must go through the 'delegate', such as:
+%% 1. "delegate:invoke(Pids, {erlang, process_info, [memory]})", "erlang, process_info"
+%% must be called inside the target node.
+%% 2. "{Results, Errors} = delegate:invoke(MemberPids, ?DELEGATE_PREFIX, FunOrMFA)",
+%% For some reason, the operation specifically specifies a delegate name rather than
+%% ?DEFAULT_NAME.
+
-behaviour(gen_server2).
-export([start_link/1, start_link/2, invoke_no_result/2,
@@ -77,28 +88,47 @@ start_link(Name, Num) ->
Name1 = delegate_name(Name, Num),
gen_server2:start_link({local, Name1}, ?MODULE, [Name1], []).
+invoke(Pid, FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) -> %% optimisation
+ case safe_invoke(Pid, FunOrMFA) of
+ {ok, _, Result} -> Result;
+ {error, _, Error} -> {error, Error}
+ end;
invoke(Pid, FunOrMFA) ->
invoke(Pid, ?DEFAULT_NAME, FunOrMFA).
invoke(Pid, _Name, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
apply1(FunOrMFA, Pid);
+invoke(Pid, ?DEFAULT_NAME, FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) -> %% optimisation
+ case safe_invoke(Pid, FunOrMFA) of
+ {ok, _, Result} -> Result;
+ {error, _, Error} -> {error, Error}
+ end;
invoke(Pid, Name, FunOrMFA) when is_pid(Pid) ->
case invoke([Pid], Name, FunOrMFA) of
- {[{Pid, Result}], []} ->
- Result;
- {[], [{Pid, {Class, Reason, StackTrace}}]} ->
- erlang:raise(Class, Reason, StackTrace)
+ {[{Pid, Result}], []} -> Result;
+ {[], [{Pid, Error}]} -> {error, Error}
end;
invoke([], _Name, _FunOrMFA) -> %% optimisation
{[], []};
+invoke([Pid], ?DEFAULT_NAME, FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) -> %% optimisation
+ case safe_invoke(Pid, FunOrMFA) of
+ {ok, _, Result} -> {[{Pid, Result}], []};
+ {error, _, Error} -> {[], [{Pid, Error}]}
+ end;
invoke([Pid], _Name, FunOrMFA) when node(Pid) =:= node() -> %% optimisation
case safe_invoke(Pid, FunOrMFA) of
{ok, _, Result} -> {[{Pid, Result}], []};
{error, _, Error} -> {[], [{Pid, Error}]}
end;
+invoke(Pids, Name = ?DEFAULT_NAME, FunOrMFA = {gen_server2, _F, _A}) when is_list(Pids) ->
+ {LocalCallPids, Grouped} = group_local_call_pids_by_node(Pids),
+ invoke(Pids, Name, FunOrMFA, LocalCallPids, Grouped);
invoke(Pids, Name, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
+ invoke(Pids, Name, FunOrMFA, LocalPids, Grouped).
+
+invoke(Pids, Name, FunOrMFA, LocalCallPids, Grouped) when is_list(Pids) ->
%% The use of multi_call is only safe because the timeout is
%% infinity, and thus there is no process spawned in order to do
%% the sending. Thus calls can't overtake preceding calls/casts.
@@ -112,7 +142,7 @@ invoke(Pids, Name, FunOrMFA) when is_list(Pids) ->
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
BadNode <- BadNodes,
Pid <- maps:get(BadNode, Grouped)],
- ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) |
+ ResultsNoNode = lists:append([safe_invoke(LocalCallPids, FunOrMFA) |
[Results || {_Node, Results} <- Replies]]),
lists:foldl(
fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad};
@@ -134,6 +164,9 @@ demonitor(Ref) when is_reference(Ref) ->
demonitor({Name, Pid}) ->
gen_server2:cast(Name, {demonitor, self(), Pid}).
+invoke_no_result(Pid, FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) ->
+ _ = safe_invoke(Pid, FunOrMFA), %% we don't care about any error
+ ok;
invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() ->
%% Optimization, avoids calling invoke_no_result/3.
%%
@@ -154,6 +187,9 @@ invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) ->
ok;
invoke_no_result([], _FunOrMFA) -> %% optimisation
ok;
+invoke_no_result([Pid], FunOrMFA = {gen_server2, _F, _A}) when is_pid(Pid) -> %% optimisation
+ _ = safe_invoke(Pid, FunOrMFA), %% must not die
+ ok;
invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation
_ = safe_invoke(Pid, FunOrMFA), %% must not die
ok;
@@ -163,15 +199,21 @@ invoke_no_result([Pid], FunOrMFA) ->
{invoke, FunOrMFA,
maps:from_list([{RemoteNode, [Pid]}])}),
ok;
+invoke_no_result(Pids, FunOrMFA = {gen_server2, _F, _A}) when is_list(Pids) ->
+ {LocalCallPids, Grouped} = group_local_call_pids_by_node(Pids),
+ invoke_no_result(Pids, FunOrMFA, LocalCallPids, Grouped);
invoke_no_result(Pids, FunOrMFA) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
+ invoke_no_result(Pids, FunOrMFA, LocalPids, Grouped).
+
+invoke_no_result(Pids, FunOrMFA, LocalCallPids, Grouped) when is_list(Pids) ->
case maps:keys(Grouped) of
[] -> ok;
RemoteNodes -> gen_server2:abcast(
RemoteNodes, delegate(self(), ?DEFAULT_NAME, RemoteNodes),
{invoke, FunOrMFA, Grouped})
end,
- _ = safe_invoke(LocalPids, FunOrMFA), %% must not die
+ _ = safe_invoke(LocalCallPids, FunOrMFA), %% must not die
ok.
%%----------------------------------------------------------------------------
@@ -187,6 +229,19 @@ group_pids_by_node(Pids) ->
node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)}
end, {[], maps:new()}, Pids).
+group_local_call_pids_by_node(Pids) ->
+ {LocalPids0, Grouped0} = group_pids_by_node(Pids),
+ maps:fold(fun(K, V, {AccIn, MapsIn}) ->
+ case V of
+ %% just one Pid for the node
+ [SinglePid] -> {[SinglePid | AccIn], MapsIn};
+ %% If the value is a list of more than one pid,
+ %% the (K,V) will be put into the new map which will be called
+ %% through delegate to reduce inter-node communication.
+ _ -> {AccIn, maps:update_with(K, fun(V1) -> V1 end, V, MapsIn)}
+ end
+ end, {LocalPids0, maps:new()}, Grouped0).
+
delegate_name(Name, Hash) ->
list_to_atom(Name ++ integer_to_list(Hash)).