summaryrefslogtreecommitdiff
path: root/src/delegate.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-08-02 16:44:14 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-08-02 16:44:14 +0100
commitd2e2fa6099077c1f2cf6cb82fdddadca4858bc5a (patch)
treee0cd536f7237cdea6e3e0dfffb47bd42e1ef0202 /src/delegate.erl
parentb28f428065dcc90a6937782cd69d353aedf0ebe1 (diff)
parent2efd9f8a2a2b167b21fc22b427613564b0bf6807 (diff)
downloadrabbitmq-server-git-d2e2fa6099077c1f2cf6cb82fdddadca4858bc5a.tar.gz
Merging default into bug21875
Diffstat (limited to 'src/delegate.erl')
-rw-r--r--src/delegate.erl58
1 files changed, 32 insertions, 26 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 12eb814f8f..3f57953bf7 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -44,9 +44,10 @@
-ifdef(use_specs).
--spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()}).
--spec(invoke_no_result/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
--spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
+-spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok(pid())).
+-spec(invoke_no_result/2 ::
+ (pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A).
-spec(process_count/0 :: () -> non_neg_integer()).
@@ -63,7 +64,7 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) ->
- [Res] = invoke_per_node([{node(Pid), [Pid]}], Fun),
+ [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun),
case Res of
{ok, Result, _} ->
Result;
@@ -73,7 +74,7 @@ invoke(Pid, Fun) when is_pid(Pid) ->
invoke(Pids, Fun) when is_list(Pids) ->
lists:foldl(
- fun({Status, Result, Pid}, {Good, Bad}) ->
+ fun ({Status, Result, Pid}, {Good, Bad}) ->
case Status of
ok -> {[{Pid, Result}|Good], Bad};
error -> {Good, [{Pid, Result}|Bad]}
@@ -83,7 +84,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
invoke_per_node(split_delegate_per_node(Pids), Fun)).
invoke_no_result(Pid, Fun) when is_pid(Pid) ->
- invoke_no_result_per_node([{node(Pid), [Pid]}], Fun),
+ invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun),
ok;
invoke_no_result(Pids, Fun) when is_list(Pids) ->
@@ -99,42 +100,47 @@ internal_cast(Node, Thunk) when is_atom(Node) ->
gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}).
split_delegate_per_node(Pids) ->
- orddict:to_list(
- lists:foldl(
- fun (Pid, D) ->
- orddict:update(node(Pid),
- fun (Pids1) -> [Pid | Pids1] end,
- [Pid], D)
- end,
- orddict:new(), Pids)).
+ LocalNode = node(),
+ {Local, Remote} =
+ lists:foldl(
+ fun (Pid, {L, D}) ->
+ Node = node(Pid),
+ case Node of
+ LocalNode -> {[Pid|L], D};
+ _ -> {L, orddict:append(Node, Pid, D)}
+ end
+ end,
+ {[], orddict:new()}, Pids),
+ {Local, orddict:to_list(Remote)}.
-invoke_per_node([{Node, Pids}], Fun) when Node == node() ->
- safe_invoke(Pids, Fun);
invoke_per_node(NodePids, Fun) ->
lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)).
-invoke_no_result_per_node([{Node, Pids}], Fun) when Node == node() ->
- %% This is not actually async! However, in practice Fun will
- %% always be something that does a gen_server:cast or similar, so
- %% I don't think it's a problem unless someone misuses this
- %% function. Making this *actually* async would be painful as we
- %% can't spawn at this point or we break effect ordering.
- safe_invoke(Pids, Fun);
invoke_no_result_per_node(NodePids, Fun) ->
delegate_per_node(NodePids, Fun, fun internal_cast/2),
ok.
-delegate_per_node(NodePids, Fun, DelegateFun) ->
+delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) ->
+ %% In the case where DelegateFun is internal_cast, the safe_invoke
+ %% is not actually async! However, in practice Fun will always be
+ %% something that does a gen_server:cast or similar, so I don't
+ %% think it's a problem unless someone misuses this
+ %% function. Making this *actually* async would be painful as we
+ %% can't spawn at this point or we break effect ordering.
+ [safe_invoke(LocalPids, Fun)|
+ delegate_per_remote_node(NodePids, Fun, DelegateFun)].
+
+delegate_per_remote_node(NodePids, Fun, DelegateFun) ->
Self = self(),
%% Note that this is unsafe if the Fun requires reentrancy to the
%% local_server. I.e. if self() == local_server(Node) then we'll
%% block forever.
[gen_server2:cast(
local_server(Node),
- {thunk, fun() ->
+ {thunk, fun () ->
Self ! {result,
DelegateFun(
- Node, fun() -> safe_invoke(Pids, Fun) end)}
+ Node, fun () -> safe_invoke(Pids, Fun) end)}
end}) || {Node, Pids} <- NodePids],
[receive {result, Result} -> Result end || _ <- NodePids].