summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2014-01-27 17:57:20 +0000
committerTim Watson <tim@rabbitmq.com>2014-01-27 17:57:20 +0000
commit81bd7f7e2600a30d7473a512487a3d33a1c8a4af (patch)
treede7fbad92b6666b87f93a445748c522de08d1976 /src
parentc8fba9bc81323100b2998034397c73dada1f25f4 (diff)
downloadrabbitmq-server-git-81bd7f7e2600a30d7473a512487a3d33a1c8a4af.tar.gz
Refactor: tag replies safely and avoid using erlang:demonitor/2
For efficiency reasons, instead of demonitoring (which leads to scanning the entire mailbox), cope with duplicate monitor refs when collating gen_server replies. For safety, tag the exit signal (which we treat as a result) with a unique reference instead of the middle-man's pid.
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl36
1 files changed, 19 insertions, 17 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 57234c92fe..0fea8dd1bc 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -399,15 +399,16 @@ multi_call(Nodes, Name, Req, Timeout)
%%% message queue.
%%% -----------------------------------------------------------------
mcall(CallSpecs) ->
- {Receiver, MRef} = spawn_monitor(
- fun() ->
- Refs = lists:foldl(fun do_mcall/2, dict:new(),
- CallSpecs),
- collect_replies(Refs, [], [])
- end),
+ Tag = make_ref(),
+ {_, MRef} = spawn_monitor(
+ fun() ->
+ Refs = lists:foldl(fun do_mcall/2, dict:new(),
+ CallSpecs),
+ collect_replies(Tag, Refs, [], [])
+ end),
receive
- {'DOWN', MRef, _, _, {Receiver, Result}} -> Result;
- {'DOWN', MRef, _, _, Reason} -> exit(Reason)
+ {'DOWN', MRef, _, _, {Tag, Result}} -> Result;
+ {'DOWN', MRef, _, _, Reason} -> exit(Reason)
end.
do_mcall({Pid, Request}, Dict) ->
@@ -416,14 +417,14 @@ do_mcall({Pid, Request}, Dict) ->
[noconnect]),
dict:store(MRef, Pid, Dict).
-collect_replies(Refs, Replies, Errors) ->
+collect_replies(Tag, Refs, Replies, Errors) ->
case dict:size(Refs) of
- 0 -> exit({self(), {Replies, Errors}});
+ 0 -> exit({Tag, {Replies, Errors}});
_ -> receive
{MRef, Reply} ->
{Refs1, Replies1} = handle_call_result(MRef, Reply,
Refs, Replies),
- collect_replies(Refs1, Replies1, Errors);
+ collect_replies(Tag, Refs1, Replies1, Errors);
{'DOWN', MRef, _, _, Reason} ->
Reason1 = case Reason of
noconnection -> nodedown;
@@ -431,16 +432,17 @@ collect_replies(Refs, Replies, Errors) ->
end,
{Refs1, Errors1} = handle_call_result(MRef, Reason1,
Refs, Errors),
- collect_replies(Refs1, Replies, Errors1)
+ collect_replies(Tag, Refs1, Replies, Errors1)
end
end.
handle_call_result(MRef, Result, Refs, AccList) ->
- %% we use fetch instead of find, because we *do* want to crash if some
- %% unexpected monitor signal arrives in our inbox!
- Pid = dict:fetch(MRef, Refs),
- erlang:demonitor(MRef, [flush]),
- {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}.
+ %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2}
+ %% here, so we must cope with MRefs that we've already seen and erased
+ case dict:find(MRef, Refs) of
+ {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]};
+ _ -> {Refs, AccList}
+ end.
%% -----------------------------------------------------------------
%% Apply a function to a generic server's state.