diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-08-14 17:12:44 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-08-14 17:12:44 +0100 |
| commit | 01b80ec8a3eb5865f40216d9e408e838790c8add (patch) | |
| tree | d2fdde86a7686b8c05b8b8d8351663ff6ca1616a /src/delegate.erl | |
| parent | 471692ab160059c131ca6b91f0aa4dd0d4036adb (diff) | |
| download | rabbitmq-server-git-01b80ec8a3eb5865f40216d9e408e838790c8add.tar.gz | |
Revert dd08c9204760
Diffstat (limited to 'src/delegate.erl')
| -rw-r--r-- | src/delegate.erl | 89 |
1 files changed, 18 insertions, 71 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index 7a06c1e48f..4e1dcd2e01 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,22 +18,15 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2, monitor/2, - demonitor/1, demonitor/2, call/2, cast/2]). +-export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {node, monitors, name}). - %%---------------------------------------------------------------------------- -ifdef(use_specs). --export_type([monitor_ref/0]). - --type(monitor_ref() :: reference() | {atom(), pid()}). - -spec(start_link/1 :: (non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}). -spec(invoke/2 :: @@ -42,10 +35,6 @@ [{pid(), term()}]}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). --spec(monitor/2 :: ('process', pid()) -> monitor_ref()). --spec(demonitor/1 :: (monitor_ref()) -> 'true'). --spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true'). - -spec(call/2 :: ( pid(), any()) -> any(); ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}). @@ -61,8 +50,7 @@ %%---------------------------------------------------------------------------- start_link(Num) -> - Name = delegate_name(Num), - gen_server2:start_link({local, Name}, ?MODULE, [Name], []). + gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> Fun(Pid); @@ -90,7 +78,7 @@ invoke(Pids, Fun) when is_list(Pids) -> case orddict:fetch_keys(Grouped) of [] -> {[], []}; RemoteNodes -> gen_server2:multi_call( - RemoteNodes, delegate(self(), RemoteNodes), + RemoteNodes, delegate(RemoteNodes), {invoke, Fun, Grouped}, infinity) end, BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || @@ -118,27 +106,12 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> {LocalPids, Grouped} = group_pids_by_node(Pids), case orddict:fetch_keys(Grouped) of [] -> ok; - RemoteNodes -> gen_server2:abcast( - RemoteNodes, delegate(self(), RemoteNodes), - {invoke, Fun, Grouped}) + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes), + {invoke, Fun, Grouped}) end, safe_invoke(LocalPids, Fun), %% must not die ok. -monitor(Type, Pid) when node(Pid) =:= node() -> - erlang:monitor(Type, Pid); -monitor(Type, Pid) -> - Name = delegate(Pid, [node(Pid)]), - gen_server2:cast(Name, {monitor, Type, self(), Pid}), - {Name, Pid}. - -demonitor(Ref) -> ?MODULE:demonitor(Ref, []). - -demonitor(Ref, Options) when is_reference(Ref) -> - erlang:demonitor(Ref, Options); -demonitor({Name, Pid}, Options) -> - gen_server2:cast(Name, {demonitor, Pid, Options}). - call(PidOrPids, Msg) -> invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). @@ -161,10 +134,10 @@ group_pids_by_node(Pids) -> delegate_name(Hash) -> list_to_atom("delegate_" ++ integer_to_list(Hash)). -delegate(Pid, RemoteNodes) -> +delegate(RemoteNodes) -> case get(delegate) of undefined -> Name = delegate_name( - erlang:phash2(Pid, + erlang:phash2(self(), delegate_sup:count(RemoteNodes))), put(delegate, Name), Name; @@ -182,48 +155,22 @@ safe_invoke(Pid, Fun) when is_pid(Pid) -> %%---------------------------------------------------------------------------- -init([Name]) -> - {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate, +init([]) -> + {ok, node(), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) -> - {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}. - -handle_cast({monitor, Type, WantsMonitor, Pid}, - State = #state{monitors = Monitors}) -> - Ref = erlang:monitor(Type, Pid), - Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors), - {noreply, State#state{monitors = Monitors1}, hibernate}; - -handle_cast({demonitor, Pid, Options}, - State = #state{monitors = Monitors}) -> - {noreply, case dict:find(Pid, Monitors) of - {ok, {_WantsMonitor, Ref}} -> - erlang:demonitor(Ref, Options), - State#state{monitors = dict:erase(Pid, Monitors)}; - error -> - State - end, hibernate}; - -handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) -> - safe_invoke(orddict:fetch(Node, Grouped), Fun), - {noreply, State, hibernate}. +handle_call({invoke, Fun, Grouped}, _From, Node) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. -handle_info({'DOWN', Ref, process, Pid, Info}, - State = #state{monitors = Monitors, name = Name}) -> - {noreply, case dict:find(Pid, Monitors) of - {ok, {WantsMonitor, Ref}} -> - WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info}, - State#state{monitors = dict:erase(Pid, Monitors)}; - error -> - State - end, hibernate}; +handle_cast({invoke, Fun, Grouped}, Node) -> + safe_invoke(orddict:fetch(Node, Grouped), Fun), + {noreply, Node, hibernate}. -handle_info(_Info, State) -> - {noreply, State, hibernate}. +handle_info(_Info, Node) -> + {noreply, Node, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +code_change(_OldVsn, Node, _Extra) -> + {ok, Node}. |
