summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2018-09-15 01:57:07 +0200
committerMichael Klishin <michael@clojurewerkz.org>2018-09-15 01:57:07 +0200
commitdc894579c8497c645f55ab925e77c5b8824f8425 (patch)
treece9eb8fd7557ff5e7fe7be9e8d96ea00966bfbb9 /src
parent3d08081d8f4b0ef8220bceadc7f3db2e6d8a7b03 (diff)
downloadrabbitmq-server-git-dc894579c8497c645f55ab925e77c5b8824f8425.tar.gz
Make pg_local:member_died/2 more resilient
See #1699 for background. [#160530707]
Diffstat (limited to 'src')
-rw-r--r--src/pg_local.erl74
1 files changed, 47 insertions, 27 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl
index e1f5219dcb..3a00e835ce 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -35,7 +35,8 @@
-module(pg_local).
-export([join/2, leave/2, get_members/1, in_group/2]).
--export([sync/0]). %% intended for testing only; not part of official API
+%% intended for testing only; not part of official API
+-export([sync/0, clear/0]).
-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2]).
@@ -54,7 +55,7 @@
%%----------------------------------------------------------------------------
-%%% As of R13B03 monitors are used instead of links.
+-define(TABLE, pg_local_table).
%%%
%%% Exported functions
@@ -92,6 +93,10 @@ sync() ->
_ = ensure_started(),
gen_server:call(?MODULE, sync, infinity).
+clear() ->
+ _ = ensure_started(),
+ gen_server:call(?MODULE, clear, infinity).
+
%%%
%%% Callback functions from gen_server
%%%
@@ -99,12 +104,16 @@ sync() ->
-record(state, {}).
init([]) ->
- pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
+ ?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]),
{ok, #state{}}.
handle_call(sync, _From, S) ->
{reply, ok, S};
+handle_call(clear, _From, S) ->
+ ets:delete_all_objects(?TABLE),
+ {reply, ok, S};
+
handle_call(Request, From, S) ->
error_logger:warning_msg("The pg_local server received an unexpected message:\n"
"handle_call(~p, ~p, _)\n",
@@ -120,14 +129,14 @@ handle_cast({leave, Name, Pid}, S) ->
handle_cast(_, S) ->
{noreply, S}.
-handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
- member_died(MonitorRef),
+handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) ->
+ member_died(MonitorRef, Pid),
{noreply, S};
handle_info(_, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
- true = ets:delete(pg_local_table),
+ true = ets:delete(?TABLE),
ok.
%%%
@@ -148,46 +157,57 @@ terminate(_Reason, _S) ->
%%% {{pid, Pid, Name}}
%%% Pid is a member of group Name.
-member_died(Ref) ->
- [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}),
+member_died(Ref, Pid) ->
+ case ets:lookup(?TABLE, {ref, Ref}) of
+ [{{ref, Ref}, Pid}] ->
+ leave_all_groups(Pid);
+ %% in case the key has already been removed
+ %% we can perform the lookup using the DOWN message pid
+ [] ->
+ leave_all_groups(Pid);
+ _ ->
+ leave_all_groups(Pid)
+ end,
+ ok.
+
+leave_all_groups(Pid) ->
Names = member_groups(Pid),
_ = [leave_group(Name, P) ||
Name <- Names,
- P <- member_in_group(Pid, Name)],
- ok.
+ P <- member_in_group(Pid, Name)].
join_group(Name, Pid) ->
Ref_Pid = {ref, Pid},
- try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1})
+ try _ = ets:update_counter(?TABLE, Ref_Pid, {3, +1})
catch _:_ ->
Ref = erlang:monitor(process, Pid),
- true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}),
- true = ets:insert(pg_local_table, {{ref, Ref}, Pid})
+ true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}),
+ true = ets:insert(?TABLE, {{ref, Ref}, Pid})
end,
Member_Name_Pid = {member, Name, Pid},
- try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1})
+ try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1})
catch _:_ ->
- true = ets:insert(pg_local_table, {Member_Name_Pid, 1}),
- true = ets:insert(pg_local_table, {{pid, Pid, Name}})
+ true = ets:insert(?TABLE, {Member_Name_Pid, 1}),
+ true = ets:insert(?TABLE, {{pid, Pid, Name}})
end.
leave_group(Name, Pid) ->
Member_Name_Pid = {member, Name, Pid},
- try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of
+ try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of
N ->
if
N =:= 0 ->
- true = ets:delete(pg_local_table, {pid, Pid, Name}),
- true = ets:delete(pg_local_table, Member_Name_Pid);
+ true = ets:delete(?TABLE, {pid, Pid, Name}),
+ true = ets:delete(?TABLE, Member_Name_Pid);
true ->
ok
end,
Ref_Pid = {ref, Pid},
- case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of
+ case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of
0 ->
- [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid),
- true = ets:delete(pg_local_table, {ref, Ref}),
- true = ets:delete(pg_local_table, Ref_Pid),
+ [{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid),
+ true = ets:delete(?TABLE, {ref, Ref}),
+ true = ets:delete(?TABLE, Ref_Pid),
true = erlang:demonitor(Ref, [flush]),
ok;
_ ->
@@ -199,21 +219,21 @@ leave_group(Name, Pid) ->
group_members(Name) ->
[P ||
- [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}),
+ [P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}),
_ <- lists:seq(1, N)].
member_in_group(Pid, Name) ->
- [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
+ [{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}),
lists:duplicate(N, Pid).
member_present(Name, Pid) ->
- case ets:lookup(pg_local_table, {member, Name, Pid}) of
+ case ets:lookup(?TABLE, {member, Name, Pid}) of
[_] -> true;
[] -> false
end.
member_groups(Pid) ->
- [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
+ [Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})].
ensure_started() ->
case whereis(?MODULE) of