summaryrefslogtreecommitdiff
path: root/src/delegate.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-08-14 17:12:44 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-08-14 17:12:44 +0100
commit01b80ec8a3eb5865f40216d9e408e838790c8add (patch)
treed2fdde86a7686b8c05b8b8d8351663ff6ca1616a /src/delegate.erl
parent471692ab160059c131ca6b91f0aa4dd0d4036adb (diff)
downloadrabbitmq-server-git-01b80ec8a3eb5865f40216d9e408e838790c8add.tar.gz
Revert dd08c9204760
Diffstat (limited to 'src/delegate.erl')
-rw-r--r--src/delegate.erl89
1 files changed, 18 insertions, 71 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 7a06c1e48f..4e1dcd2e01 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,22 +18,15 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, monitor/2,
- demonitor/1, demonitor/2, call/2, cast/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {node, monitors, name}).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([monitor_ref/0]).
-
--type(monitor_ref() :: reference() | {atom(), pid()}).
-
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
-spec(invoke/2 ::
@@ -42,10 +35,6 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
--spec(monitor/2 :: ('process', pid()) -> monitor_ref()).
--spec(demonitor/1 :: (monitor_ref()) -> 'true').
--spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true').
-
-spec(call/2 ::
( pid(), any()) -> any();
([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
@@ -61,8 +50,7 @@
%%----------------------------------------------------------------------------
start_link(Num) ->
- Name = delegate_name(Num),
- gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
+ gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
Fun(Pid);
@@ -90,7 +78,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
- RemoteNodes, delegate(self(), RemoteNodes),
+ RemoteNodes, delegate(RemoteNodes),
{invoke, Fun, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
@@ -118,27 +106,12 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
- RemoteNodes -> gen_server2:abcast(
- RemoteNodes, delegate(self(), RemoteNodes),
- {invoke, Fun, Grouped})
+ RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
+ {invoke, Fun, Grouped})
end,
safe_invoke(LocalPids, Fun), %% must not die
ok.
-monitor(Type, Pid) when node(Pid) =:= node() ->
- erlang:monitor(Type, Pid);
-monitor(Type, Pid) ->
- Name = delegate(Pid, [node(Pid)]),
- gen_server2:cast(Name, {monitor, Type, self(), Pid}),
- {Name, Pid}.
-
-demonitor(Ref) -> ?MODULE:demonitor(Ref, []).
-
-demonitor(Ref, Options) when is_reference(Ref) ->
- erlang:demonitor(Ref, Options);
-demonitor({Name, Pid}, Options) ->
- gen_server2:cast(Name, {demonitor, Pid, Options}).
-
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
@@ -161,10 +134,10 @@ group_pids_by_node(Pids) ->
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).
-delegate(Pid, RemoteNodes) ->
+delegate(RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
- erlang:phash2(Pid,
+ erlang:phash2(self(),
delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
@@ -182,48 +155,22 @@ safe_invoke(Pid, Fun) when is_pid(Pid) ->
%%----------------------------------------------------------------------------
-init([Name]) ->
- {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
+init([]) ->
+ {ok, node(), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}.
-
-handle_cast({monitor, Type, WantsMonitor, Pid},
- State = #state{monitors = Monitors}) ->
- Ref = erlang:monitor(Type, Pid),
- Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors),
- {noreply, State#state{monitors = Monitors1}, hibernate};
-
-handle_cast({demonitor, Pid, Options},
- State = #state{monitors = Monitors}) ->
- {noreply, case dict:find(Pid, Monitors) of
- {ok, {_WantsMonitor, Ref}} ->
- erlang:demonitor(Ref, Options),
- State#state{monitors = dict:erase(Pid, Monitors)};
- error ->
- State
- end, hibernate};
-
-handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
- safe_invoke(orddict:fetch(Node, Grouped), Fun),
- {noreply, State, hibernate}.
+handle_call({invoke, Fun, Grouped}, _From, Node) ->
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.
-handle_info({'DOWN', Ref, process, Pid, Info},
- State = #state{monitors = Monitors, name = Name}) ->
- {noreply, case dict:find(Pid, Monitors) of
- {ok, {WantsMonitor, Ref}} ->
- WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info},
- State#state{monitors = dict:erase(Pid, Monitors)};
- error ->
- State
- end, hibernate};
+handle_cast({invoke, Fun, Grouped}, Node) ->
+ safe_invoke(orddict:fetch(Node, Grouped), Fun),
+ {noreply, Node, hibernate}.
-handle_info(_Info, State) ->
- {noreply, State, hibernate}.
+handle_info(_Info, Node) ->
+ {noreply, Node, hibernate}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+code_change(_OldVsn, Node, _Extra) ->
+ {ok, Node}.