diff options
| author | Tim Watson <tim@rabbitmq.com> | 2014-01-27 17:57:20 +0000 |
|---|---|---|
| committer | Tim Watson <tim@rabbitmq.com> | 2014-01-27 17:57:20 +0000 |
| commit | 81bd7f7e2600a30d7473a512487a3d33a1c8a4af (patch) | |
| tree | de7fbad92b6666b87f93a445748c522de08d1976 /src | |
| parent | c8fba9bc81323100b2998034397c73dada1f25f4 (diff) | |
| download | rabbitmq-server-git-81bd7f7e2600a30d7473a512487a3d33a1c8a4af.tar.gz | |
Refactor: tag replies safely and avoid using erlang:demonitor/2
For efficiency reasons, instead of demonitoring (which leads to scanning the
entire mailbox), cope with duplicate monitor refs when collating gen_server
replies.
For safety, tag the exit signal (which we treat as a result) with a unique
reference instead of the middle-man's pid.
Diffstat (limited to 'src')
| -rw-r--r-- | src/gen_server2.erl | 36 |
1 files changed, 19 insertions, 17 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 57234c92fe..0fea8dd1bc 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -399,15 +399,16 @@ multi_call(Nodes, Name, Req, Timeout) %%% message queue. %%% ----------------------------------------------------------------- mcall(CallSpecs) -> - {Receiver, MRef} = spawn_monitor( - fun() -> - Refs = lists:foldl(fun do_mcall/2, dict:new(), - CallSpecs), - collect_replies(Refs, [], []) - end), + Tag = make_ref(), + {_, MRef} = spawn_monitor( + fun() -> + Refs = lists:foldl(fun do_mcall/2, dict:new(), + CallSpecs), + collect_replies(Tag, Refs, [], []) + end), receive - {'DOWN', MRef, _, _, {Receiver, Result}} -> Result; - {'DOWN', MRef, _, _, Reason} -> exit(Reason) + {'DOWN', MRef, _, _, {Tag, Result}} -> Result; + {'DOWN', MRef, _, _, Reason} -> exit(Reason) end. do_mcall({Pid, Request}, Dict) -> @@ -416,14 +417,14 @@ do_mcall({Pid, Request}, Dict) -> [noconnect]), dict:store(MRef, Pid, Dict). -collect_replies(Refs, Replies, Errors) -> +collect_replies(Tag, Refs, Replies, Errors) -> case dict:size(Refs) of - 0 -> exit({self(), {Replies, Errors}}); + 0 -> exit({Tag, {Replies, Errors}}); _ -> receive {MRef, Reply} -> {Refs1, Replies1} = handle_call_result(MRef, Reply, Refs, Replies), - collect_replies(Refs1, Replies1, Errors); + collect_replies(Tag, Refs1, Replies1, Errors); {'DOWN', MRef, _, _, Reason} -> Reason1 = case Reason of noconnection -> nodedown; @@ -431,16 +432,17 @@ collect_replies(Refs, Replies, Errors) -> end, {Refs1, Errors1} = handle_call_result(MRef, Reason1, Refs, Errors), - collect_replies(Refs1, Replies, Errors1) + collect_replies(Tag, Refs1, Replies, Errors1) end end. handle_call_result(MRef, Result, Refs, AccList) -> - %% we use fetch instead of find, because we *do* want to crash if some - %% unexpected monitor signal arrives in our inbox! - Pid = dict:fetch(MRef, Refs), - erlang:demonitor(MRef, [flush]), - {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}. + %% we avoid the mailbox scanning cost of a call to erlang:demonitor/{1,2} + %% here, so we must cope with MRefs that we've already seen and erased + case dict:find(MRef, Refs) of + {ok, Pid} -> {dict:erase(MRef, Refs), [{Pid, Result}|AccList]}; + _ -> {Refs, AccList} + end. %% ----------------------------------------------------------------- %% Apply a function to a generic server's state. |
