summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-23 14:33:17 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-23 14:33:17 +0100
commit9338ced21cffb440eeb434a2d9b91b8c933a3582 (patch)
tree1ba7a26a3011e4c51f7de73486a3e49d8370fe11
parentf1c394837d0a23c63242e38b6a199f66d9fd08a4 (diff)
downloadrabbitmq-server-git-9338ced21cffb440eeb434a2d9b91b8c933a3582.tar.gz
Make monitoring via delegates async. This has the downside that you can't monitor the same pid more than once from the same process, but that is enforced by pmon anyway which is the only client of this code. The upside is that cross-cluster basic.get doesn't deadlock...
-rw-r--r--src/delegate.erl51
1 files changed, 29 insertions, 22 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index dad2dd3c9b..30ee33b6f3 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -24,7 +24,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {node, monitors}).
+-record(state, {node, monitors, name}).
%%----------------------------------------------------------------------------
@@ -32,7 +32,7 @@
-export_type([monitor_ref/0]).
--type(monitor_ref() :: reference() | {atom(), reference()}).
+-type(monitor_ref() :: reference() | {atom(), pid()}).
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
@@ -61,7 +61,8 @@
%%----------------------------------------------------------------------------
start_link(Num) ->
- gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).
+ Name = delegate_name(Num),
+ gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
Fun(Pid);
@@ -128,14 +129,15 @@ monitor(Type, Pid) when node(Pid) =:= node() ->
erlang:monitor(Type, Pid);
monitor(Type, Pid) ->
Name = delegate(Pid, [node(Pid)]),
- {Name, gen_server2:call(Name, {monitor, Type, self(), Pid}, infinity)}.
+ gen_server2:cast(Name, {monitor, Type, self(), Pid}),
+ {Name, Pid}.
demonitor(Ref) -> ?MODULE:demonitor(Ref, []).
demonitor(Ref, Options) when is_reference(Ref) ->
erlang:demonitor(Ref, Options);
-demonitor({Name, Ref}, Options) ->
- gen_server2:call(Name, {demonitor, Ref, Options}, infinity).
+demonitor({Name, Pid}, Options) ->
+ gen_server2:cast(Name, {demonitor, Pid, Options}).
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
@@ -180,34 +182,39 @@ safe_invoke(Pid, Fun) when is_pid(Pid) ->
%%----------------------------------------------------------------------------
-init([]) ->
- {ok, #state{node = node(), monitors = dict:new()}, hibernate,
+init([Name]) ->
+ {ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
- {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate};
+ {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}.
-handle_call({monitor, Type, WantsMonitor, ToMonitor}, _From,
+handle_cast({monitor, Type, WantsMonitor, Pid},
State = #state{monitors = Monitors}) ->
- Ref = erlang:monitor(Type, ToMonitor),
- State1 = State#state{monitors = dict:store(Ref, WantsMonitor, Monitors)},
- {reply, Ref, State1, hibernate};
+ Ref = erlang:monitor(Type, Pid),
+ Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors),
+ {noreply, State#state{monitors = Monitors1}, hibernate};
-handle_call({demonitor, Ref, Options}, _From,
+handle_cast({demonitor, Pid, Options},
State = #state{monitors = Monitors}) ->
- State1 = State#state{monitors = dict:erase(Ref, Monitors)},
- {reply, erlang:demonitor(Ref, Options), State1, hibernate}.
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {_WantsMonitor, Ref}} ->
+ erlang:demonitor(Ref, Options),
+ State#state{monitors = dict:erase(Pid, Monitors)};
+ error ->
+ State
+ end, hibernate};
handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),
{noreply, State, hibernate}.
-handle_info({'DOWN', Ref, process, Object, Info},
- State = #state{monitors = Monitors}) ->
- {noreply, case dict:find(Ref, Monitors) of
- {ok, WantsMonitor} ->
- WantsMonitor ! {'DOWN', Ref, process, Object, Info},
- State#state{monitors = dict:erase(Ref, Monitors)};
+handle_info({'DOWN', Ref, process, Pid, Info},
+ State = #state{monitors = Monitors, name = Name}) ->
+ {noreply, case dict:find(Pid, Monitors) of
+ {ok, {WantsMonitor, Ref}} ->
+ WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info},
+ State#state{monitors = dict:erase(Pid, Monitors)};
error ->
State
end, hibernate};