summaryrefslogtreecommitdiff
path: root/src/delegate.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/delegate.erl')
-rw-r--r--src/delegate.erl55
1 files changed, 30 insertions, 25 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 12eb814f8f..8af2812781 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -45,8 +45,8 @@
-ifdef(use_specs).
-spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
--spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
+-spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
-spec(process_count/0 :: () -> non_neg_integer()).
@@ -63,7 +63,7 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun),
+ [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
case Res of
{ok, Result, _} ->
Result;
@@ -73,7 +73,7 @@ invoke(Pid, Fun) when is_pid(Pid) ->
invoke(Pids, Fun) when is_list(Pids) ->
lists:foldl(
- fun({Status, Result, Pid}, {Good, Bad}) ->
+ fun ({Status, Result, Pid}, {Good, Bad}) ->
case Status of
ok -> {[{Pid, Result}|Good], Bad};
error -> {Good, [{Pid, Result}|Bad]}
@@ -83,7 +83,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
invoke_per_node(split_delegate_per_node(Pids), Fun)).
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node([{node(Pid), [Pid]}], Fun),
+ invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
@@ -99,42 +99,47 @@ internal_cast(Node, Thunk) when is_atom(Node) ->
gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
split_delegate_per_node(Pids) ->
- orddict:to_list(
- lists:foldl(
- fun (Pid, D) ->
- orddict:update(node(Pid),
- fun (Pids1) -> [Pid | Pids1] end,
- [Pid], D)
- end,
- orddict:new(), Pids)).
+ LocalNode = node(),
+ {Local, Remote} =
+ lists:foldl(
+ fun (Pid, {L, D}) ->
+ Node = node(Pid),
+ case Node of
+ LocalNode -> {[Pid|L], D};
+ _ -> {L, orddict:append(Node, Pid, D)}
+ end
+ end,
+ {[], orddict:new()}, Pids),
+ {Local, orddict:to_list(Remote)}.
-invoke_per_node([{Node, Pids}], Fun) when Node == node() ->
- safe_invoke(Pids, Fun);
invoke_per_node(NodePids, Fun) ->
lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() ->
- %% This is not actually async! However, in practice Fun will
- %% always be something that does a gen_server:cast or similar, so
- %% I don't think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- safe_invoke(Pids, Fun);
invoke_no_result_per_node(NodePids, Fun) ->
delegate_per_node(NodePids, Fun, fun internal_cast/2),
ok.
-delegate_per_node(NodePids, Fun, DelegateFun) ->
+delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
+ %% In the case where DelegateFun is internal_cast, the safe_invoke
+ %% is not actually async! However, in practice Fun will always be
+ %% something that does a gen_server:cast or similar, so I don't
+ %% think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
+ [safe_invoke(LocalPids, Fun)|
+ delegate_per_remote_node(NodePids, Fun, DelegateFun)].
+
+delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
Self = self(),
%% Note that this is unsafe if the Fun requires reentrancy to the
%% local_server. I.e. if self() == local_server(Node) then we'll
%% block forever.
[gen_server2:cast(
local_server(Node),
- {thunk, fun() ->
+ {thunk, fun () ->
Self ! {result,
DelegateFun(
- Node, fun() -> safe_invoke(Pids, Fun) end)}
+ Node, fun () -> safe_invoke(Pids, Fun) end)}
end}) || {Node, Pids} <- NodePids],
[receive {result, Result} -> Result end || _ <- NodePids].