diff options
| author | Simon MacMullen <simon@lshift.net> | 2010-05-28 14:25:05 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@lshift.net> | 2010-05-28 14:25:05 +0100 |
| commit | 4c035fae81a0d07529c8c18c115bfdcf02ef8514 (patch) | |
| tree | 2e2d42b94d9c8a0a11455c40e63fb366b1f9b177 /src/delegate.erl | |
| parent | f5bb52f0b28e7c628290e7596ea866c0dcc2f8aa (diff) | |
| parent | ab9024d762cfc1eb3da82d0a0293cbfd69d2bfea (diff) | |
| download | rabbitmq-server-git-4c035fae81a0d07529c8c18c115bfdcf02ef8514.tar.gz | |
Merge default into amqp_0_9_1.
Diffstat (limited to 'src/delegate.erl')
| -rw-r--r-- | src/delegate.erl | 45 |
1 files changed, 25 insertions, 20 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 12eb814f8f..98353453ff 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -63,7 +63,7 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun), + [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), case Res of {ok, Result, _} -> Result; @@ -83,7 +83,7 @@ invoke(Pids, Fun) when is_list(Pids) -> invoke_per_node(split_delegate_per_node(Pids), Fun)). invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node([{node(Pid), [Pid]}], Fun), + invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), ok; invoke_no_result(Pids, Fun) when is_list(Pids) -> @@ -99,32 +99,37 @@ internal_cast(Node, Thunk) when is_atom(Node) -> gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). split_delegate_per_node(Pids) -> - orddict:to_list( - lists:foldl( - fun (Pid, D) -> - orddict:update(node(Pid), - fun (Pids1) -> [Pid | Pids1] end, - [Pid], D) - end, - orddict:new(), Pids)). + LocalNode = node(), + {Local, Remote} = + lists:foldl( + fun (Pid, {L, D}) -> + Node = node(Pid), + case Node of + LocalNode -> {[Pid|L], D}; + _ -> {L, orddict:append(Node, Pid, D)} + end + end, + {[], orddict:new()}, Pids), + {Local, orddict:to_list(Remote)}. -invoke_per_node([{Node, Pids}], Fun) when Node == node() -> - safe_invoke(Pids, Fun); invoke_per_node(NodePids, Fun) -> lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). -invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() -> - %% This is not actually async! However, in practice Fun will - %% always be something that does a gen_server:cast or similar, so - %% I don't think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - safe_invoke(Pids, Fun); invoke_no_result_per_node(NodePids, Fun) -> delegate_per_node(NodePids, Fun, fun internal_cast/2), ok. -delegate_per_node(NodePids, Fun, DelegateFun) -> +delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> + %% In the case where DelegateFun is internal_cast, the safe_invoke + %% is not actually async! However, in practice Fun will always be + %% something that does a gen_server:cast or similar, so I don't + %% think it's a problem unless someone misuses this + %% function. Making this *actually* async would be painful as we + %% can't spawn at this point or we break effect ordering. + [safe_invoke(LocalPids, Fun)| + delegate_per_remote_node(NodePids, Fun, DelegateFun)]. + +delegate_per_remote_node(NodePids, Fun, DelegateFun) -> Self = self(), %% Note that this is unsafe if the Fun requires reentrancy to the %% local_server. I.e. if self() == local_server(Node) then we'll |
