diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-08-09 07:50:02 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-08-09 07:50:02 +0100 |
| commit | 7d2c1b90c0a0f862d9058d506e3364def6056978 (patch) | |
| tree | fb89e167d1abd1e9bbcd0ac6bd9812be2336864a /src/delegate.erl | |
| parent | d75c3c969a9b19b51d85c8e872c6e9db8887ec23 (diff) | |
| parent | 555671358dabe274308932d0184325b50cfae1a7 (diff) | |
| download | rabbitmq-server-git-7d2c1b90c0a0f862d9058d506e3364def6056978.tar.gz | |
merge bug25704 into default
Diffstat (limited to 'src/delegate.erl')
| -rw-r--r-- | src/delegate.erl | 75 |
1 files changed, 39 insertions, 36 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 7a06c1e48f..5277e59fcc 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -33,15 +33,14 @@ -export_type([monitor_ref/0]). -type(monitor_ref() :: reference() | {atom(), pid()}). +-type(fun_or_mfa(A) :: fun ((pid()) -> A) | {atom(), atom(), [any()]}). -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). --spec(invoke/2 :: - ( pid(), fun ((pid()) -> A)) -> A; - ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], - [{pid(), term()}]}). --spec(invoke_no_result/2 :: - (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(invoke/2 :: ( pid(), fun_or_mfa(A)) -> A; + ([pid()], fun_or_mfa(A)) -> {[{pid(), A}], + [{pid(), term()}]}). +-spec(invoke_no_result/2 :: (pid() | [pid()], fun_or_mfa(any())) -> 'ok'). -spec(monitor/2 :: ('process', pid()) -> monitor_ref()). -spec(demonitor/1 :: (monitor_ref()) -> 'true'). -spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true'). @@ -64,24 +63,24 @@ start_link(Num) -> Name = delegate_name(Num), gen_server2:start_link({local, Name}, ?MODULE, [Name], []). -invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - Fun(Pid); -invoke(Pid, Fun) when is_pid(Pid) -> - case invoke([Pid], Fun) of +invoke(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + apply1(FunOrMFA, Pid); +invoke(Pid, FunOrMFA) when is_pid(Pid) -> + case invoke([Pid], FunOrMFA) of {[{Pid, Result}], []} -> Result; {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; -invoke([], _Fun) -> %% optimisation +invoke([], _FunOrMFA) -> %% optimisation {[], []}; -invoke([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - case safe_invoke(Pid, Fun) of +invoke([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + case safe_invoke(Pid, FunOrMFA) of {ok, _, Result} -> {[{Pid, Result}], []}; {error, _, Error} -> {[], [{Pid, Error}]} end; -invoke(Pids, Fun) when is_list(Pids) -> +invoke(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), %% The use of multi_call is only safe because the timeout is %% infinity, and thus there is no process spawned in order to do @@ -91,38 +90,38 @@ invoke(Pids, Fun) when is_list(Pids) -> [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( RemoteNodes, delegate(self(), RemoteNodes), - {invoke, Fun, Grouped}, infinity) + {invoke, FunOrMFA, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || BadNode <- BadNodes, Pid <- orddict:fetch(BadNode, Grouped)], - ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + ResultsNoNode = lists:append([safe_invoke(LocalPids, FunOrMFA) | [Results || {_Node, Results} <- Replies]]), lists:foldl( fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> - safe_invoke(Pid, Fun), %% we don't care about any error +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, FunOrMFA), %% we don't care about any error ok; -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result([Pid], Fun); +invoke_no_result(Pid, FunOrMFA) when is_pid(Pid) -> + invoke_no_result([Pid], FunOrMFA); -invoke_no_result([], _Fun) -> %% optimisation +invoke_no_result([], _FunOrMFA) -> %% optimisation ok; -invoke_no_result([Pid], Fun) when node(Pid) =:= node() -> %% optimisation - safe_invoke(Pid, Fun), %% must not die +invoke_no_result([Pid], FunOrMFA) when node(Pid) =:= node() -> %% optimisation + safe_invoke(Pid, FunOrMFA), %% must not die ok; -invoke_no_result(Pids, Fun) when is_list(Pids) -> +invoke_no_result(Pids, FunOrMFA) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; RemoteNodes -> gen_server2:abcast( RemoteNodes, delegate(self(), RemoteNodes), - {invoke, Fun, Grouped}) + {invoke, FunOrMFA, Grouped}) end, - safe_invoke(LocalPids, Fun), %% must not die + safe_invoke(LocalPids, FunOrMFA), %% must not die ok. monitor(Type, Pid) when node(Pid) =:= node() -> @@ -140,10 +139,10 @@ demonitor({Name, Pid}, Options) -> gen_server2:cast(Name, {demonitor, Pid, Options}). call(PidOrPids, Msg) -> - invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). + invoke(PidOrPids, {gen_server2, call, [Msg, infinity]}). cast(PidOrPids, Msg) -> - invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end). + invoke_no_result(PidOrPids, {gen_server2, cast, [Msg]}). %%---------------------------------------------------------------------------- @@ -171,23 +170,27 @@ delegate(Pid, RemoteNodes) -> Name -> Name end. -safe_invoke(Pids, Fun) when is_list(Pids) -> - [safe_invoke(Pid, Fun) || Pid <- Pids]; -safe_invoke(Pid, Fun) when is_pid(Pid) -> +safe_invoke(Pids, FunOrMFA) when is_list(Pids) -> + [safe_invoke(Pid, FunOrMFA) || Pid <- Pids]; +safe_invoke(Pid, FunOrMFA) when is_pid(Pid) -> try - {ok, Pid, Fun(Pid)} + {ok, Pid, apply1(FunOrMFA, Pid)} catch Class:Reason -> {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. +apply1({M, F, A}, Arg) -> apply(M, F, [Arg | A]); +apply1(Fun, Arg) -> Fun(Arg). + %%---------------------------------------------------------------------------- init([Name]) -> {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}. +handle_call({invoke, FunOrMFA, Grouped}, _From, State = #state{node = Node}) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), State, + hibernate}. handle_cast({monitor, Type, WantsMonitor, Pid}, State = #state{monitors = Monitors}) -> @@ -205,8 +208,8 @@ handle_cast({demonitor, Pid, Options}, State end, hibernate}; -handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> - safe_invoke(orddict:fetch(Node, Grouped), Fun), +handle_cast({invoke, FunOrMFA, Grouped}, State = #state{node = Node}) -> + safe_invoke(orddict:fetch(Node, Grouped), FunOrMFA), {noreply, State, hibernate}. handle_info({'DOWN', Ref, process, Pid, Info}, |
