diff options
Diffstat (limited to 'src/delegate.erl')
| -rw-r--r-- | src/delegate.erl | 64 |
1 files changed, 33 insertions, 31 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 76fd9d7224..a7020d9b47 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -68,34 +68,29 @@ gs2_pcast(Pid, Pri, Msg) -> cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). -% TODO reimplement the single-node optimisation - -call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); - call(Pid, FPid) when is_pid(Pid) -> - [[{Status, Res, _}]] = delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun call/2, FPid)), + [[{Status, Res, _}]] = call_per_node([{node(Pid), [Pid]}], FPid), {Status, Res}; call(Pids, FPid) when is_list(Pids) -> lists:flatten( - delegate_per_node(split_delegate_per_node(Pids), - f_pid_node(fun call/2, FPid))). + call_per_node(split_delegate_per_node(Pids), FPid)). + +internal_call(Node, Thunk) when is_atom(Node) -> + gen_server2:call({server(), Node}, {thunk, Thunk}, infinity). -cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({server(), Node}, {thunk, Thunk}); cast(Pid, FPid) when is_pid(Pid) -> - delegate_per_node([{node(Pid), [Pid]}], - f_pid_node(fun cast/2, FPid)), + cast_per_node([{node(Pid), [Pid]}], FPid), ok; cast(Pids, FPid) when is_list(Pids) -> - delegate_per_node(split_delegate_per_node(Pids), - f_pid_node(fun cast/2, FPid)), + cast_per_node(split_delegate_per_node(Pids), FPid), ok. +internal_cast(Node, Thunk) when is_atom(Node) -> + gen_server2:cast({server(), Node}, {thunk, Thunk}). + %%---------------------------------------------------------------------------- split_delegate_per_node(Pids) -> @@ -108,17 +103,22 @@ split_delegate_per_node(Pids) -> end, dict:new(), Pids)). -f_pid_node(DelegateFun, FPid) -> - fun(Pid, Node) -> - DelegateFun(Node, fun() -> FPid(Pid) end) - end. +call_per_node([{Node, Pids}], FPid) when Node == node() -> + local_delegate(Pids, FPid); +call_per_node(NodePids, FPid) -> + delegate_per_node(NodePids, FPid, fun internal_call/2). + +cast_per_node([{Node, Pids}], FPid) when Node == node() -> + local_delegate(Pids, FPid); +cast_per_node(NodePids, FPid) -> + delegate_per_node(NodePids, FPid, fun internal_cast/2). -delegate_per_node(NodePids, FPidNode) -> - [[add_pid(FPidNode(Pid, Node), Pid) || Pid <- Pids] || - {Node, Pids} <- NodePids]. +local_delegate(Pids, FPid) -> + [[safe_invoke(FPid, Pid) || Pid <- Pids]]. -add_pid({Status, Result}, Pid) -> {Status, Result, Pid}; -add_pid(Status, Pid) -> {Status, Pid}. +delegate_per_node(NodePids, FPid, DelegateFun) -> + [DelegateFun(Node, fun() -> [safe_invoke(FPid, Pid) || Pid <- Pids] end) || + {Node, Pids} <- NodePids]. server() -> server(erlang:phash(self(), ?DELEGATE_PROCESSES)). @@ -126,19 +126,21 @@ server() -> server(Hash) -> list_to_atom(string:concat("delegate_process_", integer_to_list(Hash))). +safe_invoke(FPid, Pid) -> + case catch FPid(Pid) of + {'EXIT', Reason} -> + {error, {'EXIT', Reason}, Pid}; + Result -> + {ok, Result, Pid} + end. + %%-------------------------------------------------------------------- init([]) -> {ok, no_state}. handle_call({thunk, Thunk}, _From, State) -> - Res = case catch Thunk() of - {'EXIT', Reason} -> - {error, {'EXIT', Reason}}; - Result -> - {ok, Result} - end, - {reply, Res, State}. + {reply, Thunk(), State}. handle_cast({thunk, Thunk}, State) -> catch Thunk(), |
