summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/pg_local.erl74
-rw-r--r--test/unit_SUITE.erl61
2 files changed, 104 insertions, 31 deletions
diff --git a/src/pg_local.erl b/src/pg_local.erl
index e1f5219dcb..3a00e835ce 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -35,7 +35,8 @@
-module(pg_local).
-export([join/2, leave/2, get_members/1, in_group/2]).
--export([sync/0]). %% intended for testing only; not part of official API
+%% intended for testing only; not part of official API
+-export([sync/0, clear/0]).
-export([start/0, start_link/0, init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2]).
@@ -54,7 +55,7 @@
%%----------------------------------------------------------------------------
-%%% As of R13B03 monitors are used instead of links.
+-define(TABLE, pg_local_table).
%%%
%%% Exported functions
@@ -92,6 +93,10 @@ sync() ->
_ = ensure_started(),
gen_server:call(?MODULE, sync, infinity).
+clear() ->
+ _ = ensure_started(),
+ gen_server:call(?MODULE, clear, infinity).
+
%%%
%%% Callback functions from gen_server
%%%
@@ -99,12 +104,16 @@ sync() ->
-record(state, {}).
init([]) ->
- pg_local_table = ets:new(pg_local_table, [ordered_set, protected, named_table]),
+ ?TABLE = ets:new(?TABLE, [ordered_set, protected, named_table]),
{ok, #state{}}.
handle_call(sync, _From, S) ->
{reply, ok, S};
+handle_call(clear, _From, S) ->
+ ets:delete_all_objects(?TABLE),
+ {reply, ok, S};
+
handle_call(Request, From, S) ->
error_logger:warning_msg("The pg_local server received an unexpected message:\n"
"handle_call(~p, ~p, _)\n",
@@ -120,14 +129,14 @@ handle_cast({leave, Name, Pid}, S) ->
handle_cast(_, S) ->
{noreply, S}.
-handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
- member_died(MonitorRef),
+handle_info({'DOWN', MonitorRef, process, Pid, _Info}, S) ->
+ member_died(MonitorRef, Pid),
{noreply, S};
handle_info(_, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
- true = ets:delete(pg_local_table),
+ true = ets:delete(?TABLE),
ok.
%%%
@@ -148,46 +157,57 @@ terminate(_Reason, _S) ->
%%% {{pid, Pid, Name}}
%%% Pid is a member of group Name.
-member_died(Ref) ->
- [{{ref, Ref}, Pid}] = ets:lookup(pg_local_table, {ref, Ref}),
+member_died(Ref, Pid) ->
+ case ets:lookup(?TABLE, {ref, Ref}) of
+ [{{ref, Ref}, Pid}] ->
+ leave_all_groups(Pid);
+ %% in case the key has already been removed
+ %% we can perform the lookup using the DOWN message pid
+ [] ->
+ leave_all_groups(Pid);
+ _ ->
+ leave_all_groups(Pid)
+ end,
+ ok.
+
+leave_all_groups(Pid) ->
Names = member_groups(Pid),
_ = [leave_group(Name, P) ||
Name <- Names,
- P <- member_in_group(Pid, Name)],
- ok.
+ P <- member_in_group(Pid, Name)].
join_group(Name, Pid) ->
Ref_Pid = {ref, Pid},
- try _ = ets:update_counter(pg_local_table, Ref_Pid, {3, +1})
+ try _ = ets:update_counter(?TABLE, Ref_Pid, {3, +1})
catch _:_ ->
Ref = erlang:monitor(process, Pid),
- true = ets:insert(pg_local_table, {Ref_Pid, Ref, 1}),
- true = ets:insert(pg_local_table, {{ref, Ref}, Pid})
+ true = ets:insert(?TABLE, {Ref_Pid, Ref, 1}),
+ true = ets:insert(?TABLE, {{ref, Ref}, Pid})
end,
Member_Name_Pid = {member, Name, Pid},
- try _ = ets:update_counter(pg_local_table, Member_Name_Pid, {2, +1})
+ try _ = ets:update_counter(?TABLE, Member_Name_Pid, {2, +1})
catch _:_ ->
- true = ets:insert(pg_local_table, {Member_Name_Pid, 1}),
- true = ets:insert(pg_local_table, {{pid, Pid, Name}})
+ true = ets:insert(?TABLE, {Member_Name_Pid, 1}),
+ true = ets:insert(?TABLE, {{pid, Pid, Name}})
end.
leave_group(Name, Pid) ->
Member_Name_Pid = {member, Name, Pid},
- try ets:update_counter(pg_local_table, Member_Name_Pid, {2, -1}) of
+ try ets:update_counter(?TABLE, Member_Name_Pid, {2, -1}) of
N ->
if
N =:= 0 ->
- true = ets:delete(pg_local_table, {pid, Pid, Name}),
- true = ets:delete(pg_local_table, Member_Name_Pid);
+ true = ets:delete(?TABLE, {pid, Pid, Name}),
+ true = ets:delete(?TABLE, Member_Name_Pid);
true ->
ok
end,
Ref_Pid = {ref, Pid},
- case ets:update_counter(pg_local_table, Ref_Pid, {3, -1}) of
+ case ets:update_counter(?TABLE, Ref_Pid, {3, -1}) of
0 ->
- [{Ref_Pid,Ref,0}] = ets:lookup(pg_local_table, Ref_Pid),
- true = ets:delete(pg_local_table, {ref, Ref}),
- true = ets:delete(pg_local_table, Ref_Pid),
+ [{Ref_Pid,Ref,0}] = ets:lookup(?TABLE, Ref_Pid),
+ true = ets:delete(?TABLE, {ref, Ref}),
+ true = ets:delete(?TABLE, Ref_Pid),
true = erlang:demonitor(Ref, [flush]),
ok;
_ ->
@@ -199,21 +219,21 @@ leave_group(Name, Pid) ->
group_members(Name) ->
[P ||
- [P, N] <- ets:match(pg_local_table, {{member, Name, '$1'},'$2'}),
+ [P, N] <- ets:match(?TABLE, {{member, Name, '$1'},'$2'}),
_ <- lists:seq(1, N)].
member_in_group(Pid, Name) ->
- [{{member, Name, Pid}, N}] = ets:lookup(pg_local_table, {member, Name, Pid}),
+ [{{member, Name, Pid}, N}] = ets:lookup(?TABLE, {member, Name, Pid}),
lists:duplicate(N, Pid).
member_present(Name, Pid) ->
- case ets:lookup(pg_local_table, {member, Name, Pid}) of
+ case ets:lookup(?TABLE, {member, Name, Pid}) of
[_] -> true;
[] -> false
end.
member_groups(Pid) ->
- [Name || [Name] <- ets:match(pg_local_table, {{pid, Pid, '$1'}})].
+ [Name || [Name] <- ets:match(?TABLE, {{pid, Pid, '$1'}})].
ensure_started() ->
case whereis(?MODULE) of
diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl
index 06288ac1b6..be9c8d8698 100644
--- a/test/unit_SUITE.erl
+++ b/test/unit_SUITE.erl
@@ -17,6 +17,7 @@
-module(unit_SUITE).
-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
@@ -44,7 +45,6 @@ groups() ->
decrypt_config,
listing_plugins_from_multiple_directories,
rabbitmqctl_encode,
- pg_local,
pmerge,
plmerge,
priority_queue,
@@ -68,6 +68,9 @@ groups() ->
]}
]},
{sequential_tests, [], [
+ pg_local,
+ pg_local_with_unexpected_deaths1,
+ pg_local_with_unexpected_deaths2,
decrypt_start_app,
decrypt_start_app_file,
decrypt_start_app_undefined,
@@ -377,29 +380,79 @@ rabbit_direct_extract_extra_auth_props(_Config) ->
%% -------------------------------------------------------------------
pg_local(_Config) ->
- [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- [x, x]],
+ [P, Q] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
check_pg_local(ok, [], []),
+ %% P joins group a, then b, then a again
check_pg_local(pg_local:join(a, P), [P], []),
check_pg_local(pg_local:join(b, P), [P], [P]),
check_pg_local(pg_local:join(a, P), [P, P], [P]),
+ %% Q joins group a, then b, then b again
check_pg_local(pg_local:join(a, Q), [P, P, Q], [P]),
check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q]),
check_pg_local(pg_local:join(b, Q), [P, P, Q], [P, Q, Q]),
+ %% P leaves groups a and a
check_pg_local(pg_local:leave(a, P), [P, Q], [P, Q, Q]),
check_pg_local(pg_local:leave(b, P), [P, Q], [Q, Q]),
+ %% leave/2 is idempotent
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
check_pg_local(pg_local:leave(a, P), [Q], [Q, Q]),
+ %% clean up all processes
[begin X ! done,
Ref = erlang:monitor(process, X),
receive {'DOWN', Ref, process, X, _Info} -> ok end
end || X <- [P, Q]],
+ %% ensure the groups are empty
+ check_pg_local(ok, [], []),
+ passed.
+
+pg_local_with_unexpected_deaths1(_Config) ->
+ [A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
check_pg_local(ok, [], []),
+ %% A joins groups a and b
+ check_pg_local(pg_local:join(a, A), [A], []),
+ check_pg_local(pg_local:join(b, A), [A], [A]),
+ %% B joins group b
+ check_pg_local(pg_local:join(b, B), [A], [A, B]),
+
+ [begin erlang:exit(X, sleep_now_in_a_fire),
+ Ref = erlang:monitor(process, X),
+ receive {'DOWN', Ref, process, X, _Info} -> ok end
+ end || X <- [A, B]],
+ %% ensure the groups are empty
+ check_pg_local(ok, [], []),
+ ?assertNot(erlang:is_process_alive(A)),
+ ?assertNot(erlang:is_process_alive(B)),
+
+ passed.
+
+pg_local_with_unexpected_deaths2(_Config) ->
+ [A, B] = [spawn(fun () -> receive X -> X end end) || _ <- lists:seq(0, 1)],
+ check_pg_local(ok, [], []),
+ %% A joins groups a and b
+ check_pg_local(pg_local:join(a, A), [A], []),
+ check_pg_local(pg_local:join(b, A), [A], [A]),
+ %% B joins group b
+ check_pg_local(pg_local:join(b, B), [A], [A, B]),
+
+ %% something else yanks a record (or all of them) from the pg_local
+ %% bookkeeping table
+ ok = pg_local:clear(),
+
+ [begin erlang:exit(X, sleep_now_in_a_fire),
+ Ref = erlang:monitor(process, X),
+ receive {'DOWN', Ref, process, X, _Info} -> ok end
+ end || X <- [A, B]],
+ %% ensure the groups are empty
+ check_pg_local(ok, [], []),
+ ?assertNot(erlang:is_process_alive(A)),
+ ?assertNot(erlang:is_process_alive(B)),
+
passed.
check_pg_local(ok, APids, BPids) ->
ok = pg_local:sync(),
- [true, true] = [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
- {Key, Pids} <- [{a, APids}, {b, BPids}]].
+ ?assertEqual([true, true], [lists:sort(Pids) == lists:sort(pg_local:get_members(Key)) ||
+ {Key, Pids} <- [{a, APids}, {b, BPids}]]).
%% -------------------------------------------------------------------
%% priority_queue.