diff options
| -rw-r--r-- | src/pg_local.erl | 74 | ||||
| -rw-r--r-- | test/unit_SUITE.erl | 61 |
2 files changed, 104 insertions, 31 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl index e1f5219dcb..3a00e835ce 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -35,7 +35,8 @@ -module(pg_local). -export([join/2, leave/2, get_members/1, in_group/2]). --export([sync/0]). %% intended for testing only; not part of official API +%% intended for testing only; not part of official API +-export([sync/0, clear/0]). -export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). @@ -54,7 +55,7 @@ %%---------------------------------------------------------------------------- -%%% As of R13B03 monitors are used instead of links. +-define(TABLE, pg_local_table). %%% %%% Exported functions @@ -92,6 +93,10 @@ sync() -> _ = ensure_started(), gen_server:call(?MODULE, sync, infinity). +clear() -> + _ = ensure_started(), + gen_server:call(?MODULE, clear, infinity). + %%% %%% Callback functions from gen_server %%% @@ -99,12 +104,16 @@ sync() -> -record(state, {}). init([]) -> - pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]), + ?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]), {ok, #state{}}. handle_call(sync, _From, S) -> {reply, ok, S}; +handle_call(clear, _From, S) -> + ets:delete_all_objects(?TABLE), + {reply, ok, S}; + handle_call(Request, From, S) -> error_logger:warning_msg("The pg_local server received an unexpected message:\n" "handle_call(~p, ~p, _)\n", @@ -120,14 +129,14 @@ handle_cast({leave, Name, Pid}, S) -> handle_cast(_, S) -> {noreply, S}. -handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> - member_died(MonitorRef), +handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) -> + member_died(MonitorRef, Pid), {noreply, S}; handle_info(_, S) -> {noreply, S}. terminate(_Reason, _S) -> - true = ets:delete(pg_local_table), + true = ets:delete(?TABLE), ok. %%% @@ -148,46 +157,57 @@ terminate(_Reason, _S) -> %%% {{pid, Pid, Name}} %%% Pid is a member of group Name. -member_died(Ref) -> - [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}), +member_died(Ref, Pid) -> + case ets:lookup(?TABLE, {ref, Ref}) of + [{{ref, Ref}, Pid}] -> + leave_all_groups(Pid); + %% in case the key has already been removed + %% we can perform the lookup using the DOWN message pid + [] -> + leave_all_groups(Pid); + _ -> + leave_all_groups(Pid) + end, + ok. + +leave_all_groups(Pid) -> Names = member_groups(Pid), _ = [leave_group(Name, P) || Name <- Names, - P <- member_in_group(Pid, Name)], - ok. + P <- member_in_group(Pid, Name)]. join_group(Name, Pid) -> Ref_Pid = {ref, Pid}, - try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1}) + try _ = ets:update_counter(?TABLE, Ref_Pid, {3, +1}) catch _:_ -> Ref = erlang:monitor(process, Pid), - true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}), - true = ets:insert(pg_local_table, {{ref, Ref}, Pid}) + true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}), + true = ets:insert(?TABLE, {{ref, Ref}, Pid}) end, Member_Name_Pid = {member, Name, Pid}, - try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1}) + try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1}) catch _:_ -> - true = ets:insert(pg_local_table, {Member_Name_Pid, 1}), - true = ets:insert(pg_local_table, {{pid, Pid, Name}}) + true = ets:insert(?TABLE, {Member_Name_Pid, 1}), + true = ets:insert(?TABLE, {{pid, Pid, Name}}) end. leave_group(Name, Pid) -> Member_Name_Pid = {member, Name, Pid}, - try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of + try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of N -> if N =:= 0 -> - true = ets:delete(pg_local_table, {pid, Pid, Name}), - true = ets:delete(pg_local_table, Member_Name_Pid); + true = ets:delete(?TABLE, {pid, Pid, Name}), + true = ets:delete(?TABLE, Member_Name_Pid); true -> ok end, Ref_Pid = {ref, Pid}, - case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of + case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of 0 -> - [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid), - true = ets:delete(pg_local_table, {ref, Ref}), - true = ets:delete(pg_local_table, Ref_Pid), + [{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid), + true = ets:delete(?TABLE, {ref, Ref}), + true = ets:delete(?TABLE, Ref_Pid), true = erlang:demonitor(Ref, [flush]), ok; _ -> @@ -199,21 +219,21 @@ leave_group(Name, Pid) -> group_members(Name) -> [P || - [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}), + [P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}), _ <- lists:seq(1, N)]. member_in_group(Pid, Name) -> - [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}), + [{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}), lists:duplicate(N, Pid). member_present(Name, Pid) -> - case ets:lookup(pg_local_table, {member, Name, Pid}) of + case ets:lookup(?TABLE, {member, Name, Pid}) of [_] -> true; [] -> false end. member_groups(Pid) -> - [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})]. + [Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})]. ensure_started() -> case whereis(?MODULE) of diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl index 06288ac1b6..be9c8d8698 100644 --- a/test/unit_SUITE.erl +++ b/test/unit_SUITE.erl @@ -17,6 +17,7 @@ -module(unit_SUITE). -include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). @@ -44,7 +45,6 @@ groups() -> decrypt_config, listing_plugins_from_multiple_directories, rabbitmqctl_encode, - pg_local, pmerge, plmerge, priority_queue, @@ -68,6 +68,9 @@ groups() -> ]} ]}, {sequential_tests, [], [ + pg_local, + pg_local_with_unexpected_deaths1, + pg_local_with_unexpected_deaths2, decrypt_start_app, decrypt_start_app_file, decrypt_start_app_undefined, @@ -377,29 +380,79 @@ rabbit_direct_extract_extra_auth_props(_Config) -> %% ------------------------------------------------------------------- pg_local(_Config) -> - [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]], + [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)], check_pg_local(ok, [], []), + %% P joins group a, then b, then a again check_pg_local(pg_local:join(a, P), [P], []), check_pg_local(pg_local:join(b, P), [P], [P]), check_pg_local(pg_local:join(a, P), [P, P], [P]), + %% Q joins group a, then b, then b again check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]), check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]), check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]), + %% P leaves groups a and a check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]), check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]), + %% leave/2 is idempotent check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]), + %% clean up all processes [begin X ! done, Ref = erlang:monitor(process, X), receive {'DOWN', Ref, process, X, _Info} -> ok end end || X <- [P, Q]], + %% ensure the groups are empty + check_pg_local(ok, [], []), + passed. + +pg_local_with_unexpected_deaths1(_Config) -> + [A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)], check_pg_local(ok, [], []), + %% A joins groups a and b + check_pg_local(pg_local:join(a, A), [A], []), + check_pg_local(pg_local:join(b, A), [A], [A]), + %% B joins group b + check_pg_local(pg_local:join(b, B), [A], [A, B]), + + [begin erlang:exit(X, sleep_now_in_a_fire), + Ref = erlang:monitor(process, X), + receive {'DOWN', Ref, process, X, _Info} -> ok end + end || X <- [A, B]], + %% ensure the groups are empty + check_pg_local(ok, [], []), + ?assertNot(erlang:is_process_alive(A)), + ?assertNot(erlang:is_process_alive(B)), + + passed. + +pg_local_with_unexpected_deaths2(_Config) -> + [A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)], + check_pg_local(ok, [], []), + %% A joins groups a and b + check_pg_local(pg_local:join(a, A), [A], []), + check_pg_local(pg_local:join(b, A), [A], [A]), + %% B joins group b + check_pg_local(pg_local:join(b, B), [A], [A, B]), + + %% something else yanks a record (or all of them) from the pg_local + %% bookkeeping table + ok = pg_local:clear(), + + [begin erlang:exit(X, sleep_now_in_a_fire), + Ref = erlang:monitor(process, X), + receive {'DOWN', Ref, process, X, _Info} -> ok end + end || X <- [A, B]], + %% ensure the groups are empty + check_pg_local(ok, [], []), + ?assertNot(erlang:is_process_alive(A)), + ?assertNot(erlang:is_process_alive(B)), + passed. check_pg_local(ok, APids, BPids) -> ok = pg_local:sync(), - [true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) || - {Key, Pids} <- [{a, APids}, {b, BPids}]]. + ?assertEqual([true, true], [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) || + {Key, Pids} <- [{a, APids}, {b, BPids}]]). %% ------------------------------------------------------------------- %% priority_queue. |
