diff options
| author | Michael Klishin <michael@novemberain.com> | 2018-09-17 10:41:36 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-09-17 10:41:36 +0200 |
| commit | 8f71523f001aa484fa514724dee15ed9ace87adc (patch) | |
| tree | 493c5e2b04ff46384a9181cf9f0de15d83261599 /src | |
| parent | 3d08081d8f4b0ef8220bceadc7f3db2e6d8a7b03 (diff) | |
| parent | 7a0c8b6f95067446c51b9e1d2eba21d655e41503 (diff) | |
| download | rabbitmq-server-git-8f71523f001aa484fa514724dee15ed9ace87adc.tar.gz | |
Merge pull request #1700 from rabbitmq/rabbitmq-server-1699
Make pg_local:member_died/2 more resilient
Diffstat (limited to 'src')
| -rw-r--r-- | src/pg_local.erl | 72 |
1 files changed, 45 insertions, 27 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl index e1f5219dcb..0ed7e9d85d 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -35,7 +35,8 @@ -module(pg_local). -export([join/2, leave/2, get_members/1, in_group/2]). --export([sync/0]). %% intended for testing only; not part of official API +%% intended for testing only; not part of official API +-export([sync/0, clear/0]). -export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -54,7 +55,7 @@ %%---------------------------------------------------------------------------- -%%% As of R13B03 monitors are used instead of links. +-define(TABLE, pg_local_table). %%% %%% Exported functions @@ -92,6 +93,10 @@ sync() -> _ = ensure_started(), gen_server:call(?MODULE, sync, infinity). +clear() -> + _ = ensure_started(), + gen_server:call(?MODULE, clear, infinity). + %%% %%% Callback functions from gen_server %%% @@ -99,12 +104,16 @@ sync() -> -record(state, {}). init([]) -> - pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]), + ?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]), {ok, #state{}}. handle_call(sync, _From, S) -> {reply, ok, S}; +handle_call(clear, _From, S) -> + ets:delete_all_objects(?TABLE), + {reply, ok, S}; + handle_call(Request, From, S) -> error_logger:warning_msg("The pg_local server received an unexpected message:\n" "handle_call(~p, ~p, _)\n", @@ -120,14 +129,14 @@ handle_cast({leave, Name, Pid}, S) -> handle_cast(_, S) -> {noreply, S}. -handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> - member_died(MonitorRef), +handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) -> + member_died(MonitorRef, Pid), {noreply, S}; handle_info(_, S) -> {noreply, S}. terminate(_Reason, _S) -> - true = ets:delete(pg_local_table), + true = ets:delete(?TABLE), ok. %%% @@ -148,46 +157,55 @@ terminate(_Reason, _S) -> %%% {{pid, Pid, Name}} %%% Pid is a member of group Name. -member_died(Ref) -> - [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}), +member_died(Ref, Pid) -> + case ets:lookup(?TABLE, {ref, Ref}) of + [{{ref, Ref}, Pid}] -> + leave_all_groups(Pid); + %% in case the key has already been removed + %% we can clean up using the value from the DOWN message + _ -> + leave_all_groups(Pid) + end, + ok. + +leave_all_groups(Pid) -> Names = member_groups(Pid), _ = [leave_group(Name, P) || Name <- Names, - P <- member_in_group(Pid, Name)], - ok. + P <- member_in_group(Pid, Name)]. join_group(Name, Pid) -> Ref_Pid = {ref, Pid}, - try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1}) + try _ = ets:update_counter(?TABLE, Ref_Pid, {3, +1}) catch _:_ -> Ref = erlang:monitor(process, Pid), - true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}), - true = ets:insert(pg_local_table, {{ref, Ref}, Pid}) + true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}), + true = ets:insert(?TABLE, {{ref, Ref}, Pid}) end, Member_Name_Pid = {member, Name, Pid}, - try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1}) + try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1}) catch _:_ -> - true = ets:insert(pg_local_table, {Member_Name_Pid, 1}), - true = ets:insert(pg_local_table, {{pid, Pid, Name}}) + true = ets:insert(?TABLE, {Member_Name_Pid, 1}), + true = ets:insert(?TABLE, {{pid, Pid, Name}}) end. leave_group(Name, Pid) -> Member_Name_Pid = {member, Name, Pid}, - try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of + try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of N -> if N =:= 0 -> - true = ets:delete(pg_local_table, {pid, Pid, Name}), - true = ets:delete(pg_local_table, Member_Name_Pid); + true = ets:delete(?TABLE, {pid, Pid, Name}), + true = ets:delete(?TABLE, Member_Name_Pid); true -> ok end, Ref_Pid = {ref, Pid}, - case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of + case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of 0 -> - [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid), - true = ets:delete(pg_local_table, {ref, Ref}), - true = ets:delete(pg_local_table, Ref_Pid), + [{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid), + true = ets:delete(?TABLE, {ref, Ref}), + true = ets:delete(?TABLE, Ref_Pid), true = erlang:demonitor(Ref, [flush]), ok; _ -> @@ -199,21 +217,21 @@ leave_group(Name, Pid) -> group_members(Name) -> [P || - [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}), + [P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}), _ <- lists:seq(1, N)]. member_in_group(Pid, Name) -> - [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}), + [{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}), lists:duplicate(N, Pid). member_present(Name, Pid) -> - case ets:lookup(pg_local_table, {member, Name, Pid}) of + case ets:lookup(?TABLE, {member, Name, Pid}) of [_] -> true; [] -> false end. member_groups(Pid) -> - [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})]. + [Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})]. ensure_started() -> case whereis(?MODULE) of |
