summaryrefslogtreecommitdiff
path: root/src/delegate.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-02 14:20:29 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-02 14:20:29 +0100
commitc7bf242421ae71b81c0b892f711c5fe079178245 (patch)
tree0dbccdbef95268f5f0cac6ac9677fd2d980ebf34 /src/delegate.erl
parent06a8b1ffcf633ba10c177736db33775636c9a20c (diff)
downloadrabbitmq-server-git-c7bf242421ae71b81c0b892f711c5fe079178245.tar.gz
Make the delegate monitoring API a drop in replacement for the built in one, and thus parameterise pmon and remove dmon.
Diffstat (limited to 'src/delegate.erl')
-rw-r--r--src/delegate.erl48
1 files changed, 25 insertions, 23 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index 475b087f47..03086a590e 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,8 +18,8 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2, monitor/1, demonitor/2,
- call/2, cast/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2, monitor/2,
+ demonitor/1, demonitor/2, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -30,6 +30,10 @@
-ifdef(use_specs).
+-export_type([monitor_ref/0]).
+
+-type(monitor_ref() :: reference() | {atom(), reference()}).
+
-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
-spec(invoke/2 ::
@@ -38,8 +42,9 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
--spec(monitor/1 :: (pid()) -> reference()).
--spec(demonitor/2 :: (pid(), reference()) -> 'true').
+-spec(monitor/2 :: (any(), pid()) -> monitor_ref()).
+-spec(demonitor/1 :: (monitor_ref()) -> 'true').
+-spec(demonitor/2 :: (monitor_ref(), [any()]) -> 'true').
-spec(call/2 ::
( pid(), any()) -> any();
@@ -119,19 +124,18 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
safe_invoke(LocalPids, Fun), %% must not die
ok.
-monitor(Pid) when node(Pid) =:= node() ->
- erlang:monitor(process, Pid);
-monitor(Pid) ->
- Node = node(Pid),
- Name = delegate(Pid, [Node]),
- gen_server2:call(Name, {monitor, self(), Pid}, infinity).
+monitor(Type, Pid) when node(Pid) =:= node() ->
+ erlang:monitor(Type, Pid);
+monitor(Type, Pid) ->
+ Name = delegate(Pid, [node(Pid)]),
+ {Name, gen_server2:call(Name, {monitor, Type, self(), Pid}, infinity)}.
-demonitor(Pid, Ref) when node(Pid) =:= node() ->
- erlang:demonitor(Ref, [flush]);
-demonitor(Pid, Ref) ->
- Node = node(Pid),
- Name = delegate(Pid, [Node]),
- gen_server2:call(Name, {demonitor, Ref}, infinity).
+demonitor(Ref) -> ?MODULE:demonitor(Ref, []).
+
+demonitor(Ref, Options) when is_reference(Ref) ->
+ erlang:demonitor(Ref, Options);
+demonitor({Name, Ref}, Options) ->
+ gen_server2:call(Name, {demonitor, Ref, Options}, infinity).
call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
@@ -183,18 +187,16 @@ init([]) ->
handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
{reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate};
-handle_call({monitor, WantsMonitor, ToMonitor}, _From,
+handle_call({monitor, Type, WantsMonitor, ToMonitor}, _From,
State = #state{monitors = Monitors}) ->
- Ref = erlang:monitor(process, ToMonitor),
+ Ref = erlang:monitor(Type, ToMonitor),
State1 = State#state{monitors = dict:store(Ref, WantsMonitor, Monitors)},
{reply, Ref, State1, hibernate};
-handle_call({demonitor, Ref}, _From, State = #state{monitors = Monitors}) ->
- %% We need to ensure we don't then respond to a 'DOWN' that's
- %% currently in our mailbox - if we did then our client might then
- %% get a 'DOWN' after demonitor() returns.
+handle_call({demonitor, Ref, Options}, _From,
+ State = #state{monitors = Monitors}) ->
State1 = State#state{monitors = dict:erase(Ref, Monitors)},
- {reply, erlang:demonitor(Ref, [flush]), State1, hibernate}.
+ {reply, erlang:demonitor(Ref, Options), State1, hibernate}.
handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),