summaryrefslogtreecommitdiff
path: root/src/gm.erl
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-10-25 13:00:09 +0100
committerEmile Joubert <emile@rabbitmq.com>2012-10-25 13:00:09 +0100
commit54248e76629b351561d770208036ffbaedf962a6 (patch)
treeb0cc0604e0434f3a845fd7132505055dbb0c5e80 /src/gm.erl
parent71044851c7fc988a8cb560fee98f523739705873 (diff)
parent964876078c29a04b6dacd6f77e50e533d607ea39 (diff)
downloadrabbitmq-server-git-54248e76629b351561d770208036ffbaedf962a6.tar.gz
Merged bug25053 into default
Diffstat (limited to 'src/gm.erl')
-rw-r--r--src/gm.erl146
1 files changed, 86 insertions, 60 deletions
diff --git a/src/gm.erl b/src/gm.erl
index f88ed18fbf..4a95de0dd1 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -77,9 +77,13 @@
%% confirmed_broadcast/2 directly from the callback module otherwise
%% you will deadlock the entire group.
%%
-%% group_members/1
-%% Provide the Pid. Returns a list of the current group members.
+%% info/1
+%% Provide the Pid. Returns a proplist with various facts, including
+%% the group name and the current group members.
%%
+%% forget_group/1
+%% Provide the group name. Removes its mnesia record. Makes no attempt
+%% to ensure the group is empty.
%%
%% Implementation Overview
%% -----------------------
@@ -372,8 +376,8 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/3, leave/1, broadcast/2,
- confirmed_broadcast/2, group_members/1]).
+-export([create_tables/0, start_link/4, leave/1, broadcast/2,
+ confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/2]).
@@ -404,7 +408,8 @@
callback_args,
confirms,
broadcast_buffer,
- broadcast_timer
+ broadcast_timer,
+ txn_executor
}).
-record(gm_group, { name, version, members }).
@@ -424,14 +429,16 @@
-export_type([group_name/0]).
-type(group_name() :: any()).
+-type(txn_fun() :: fun((fun(() -> any())) -> any())).
-spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
--spec(start_link/3 :: (group_name(), atom(), any()) ->
+-spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
--spec(group_members/1 :: (pid()) -> [pid()]).
+-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(forget_group/1 :: (group_name()) -> 'ok').
%% The joined, members_changed and handle_msg callbacks can all return
%% any of the following terms:
@@ -502,8 +509,8 @@ table_definitions() ->
{Name, Attributes} = ?TABLE,
[{Name, [?TABLE_MATCH | Attributes]}].
-start_link(GroupName, Module, Args) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
+start_link(GroupName, Module, Args, TxnFun) ->
+ gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -514,11 +521,17 @@ broadcast(Server, Msg) ->
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
-group_members(Server) ->
- gen_server2:call(Server, group_members, infinity).
+info(Server) ->
+ gen_server2:call(Server, info, infinity).
+forget_group(GroupName) ->
+ {atomic, ok} = mnesia:sync_transaction(
+ fun () ->
+ mnesia:delete({?GROUP_TABLE, GroupName})
+ end),
+ ok.
-init([GroupName, Module, Args]) ->
+init([GroupName, Module, Args, TxnFun]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
@@ -534,7 +547,8 @@ init([GroupName, Module, Args]) ->
callback_args = Args,
confirms = queue:new(),
broadcast_buffer = [],
- broadcast_timer = undefined }, hibernate,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -553,12 +567,16 @@ handle_call({confirmed_broadcast, Msg}, _From,
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
-handle_call(group_members, _From,
+handle_call(info, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
-handle_call(group_members, _From, State = #state { view = View }) ->
- reply(get_pids(alive_view_members(View)), State);
+handle_call(info, _From, State = #state { group_name = GroupName,
+ module = Module,
+ view = View }) ->
+ reply([{group_name, GroupName},
+ {module, Module},
+ {group_members, get_pids(alive_view_members(View))}], State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
@@ -570,7 +588,8 @@ handle_call({add_on_right, NewMember}, _From,
view = View,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
{MembersState1, Group} =
record_new_member_in_group(
GroupName, Self, NewMember,
@@ -581,7 +600,7 @@ handle_call({add_on_right, NewMember}, _From,
{catchup, Self,
prepare_members_state(MembersState1)}),
MembersState1
- end),
+ end, TxnFun),
View2 = group_to_view(Group),
State1 = check_neighbours(State #state { view = View2,
members_state = MembersState1 }),
@@ -627,8 +646,9 @@ handle_cast(join, State = #state { self = Self,
group_name = GroupName,
members_state = undefined,
module = Module,
- callback_args = Args }) ->
- View = join_group(Self, GroupName),
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
+ View = join_group(Self, GroupName, TxnFun),
MembersState =
case alive_view_members(View) of
[Self] -> blank_member_state();
@@ -655,7 +675,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
view = View,
module = Module,
callback_args = Args,
- confirms = Confirms }) ->
+ confirms = Confirms,
+ txn_executor = TxnFun }) ->
Member = case {Left, Right} of
{{Member1, MRef}, _} -> Member1;
{_, {Member1, MRef}} -> Member1;
@@ -668,7 +689,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
noreply(State);
_ ->
View1 =
- group_to_view(record_dead_member_in_group(Member, GroupName)),
+ group_to_view(record_dead_member_in_group(Member,
+ GroupName, TxnFun)),
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
@@ -970,14 +992,15 @@ ensure_alive_suffix1(MembersQ) ->
%% View modification
%% ---------------------------------------------------------------------------
-join_group(Self, GroupName) ->
- join_group(Self, GroupName, read_group(GroupName)).
+join_group(Self, GroupName, TxnFun) ->
+ join_group(Self, GroupName, read_group(GroupName), TxnFun).
-join_group(Self, GroupName, {error, not_found}) ->
- join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
-join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
+join_group(Self, GroupName, {error, not_found}, TxnFun) ->
+ join_group(Self, GroupName,
+ prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
+join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
group_to_view(Group);
-join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
+join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:member(Self, Members) of
true ->
group_to_view(Group);
@@ -985,20 +1008,22 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
join_group(Self, GroupName,
- prune_or_create_group(Self, GroupName));
+ prune_or_create_group(Self, GroupName, TxnFun));
Alive ->
Left = lists:nth(random:uniform(length(Alive)), Alive),
Handler =
fun () ->
join_group(
Self, GroupName,
- record_dead_member_in_group(Left, GroupName))
+ record_dead_member_in_group(
+ Left, GroupName, TxnFun),
+ TxnFun)
end,
try
case gen_server2:call(
get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
- not_ready -> join_group(Self, GroupName)
+ not_ready -> join_group(Self, GroupName, TxnFun)
end
catch
exit:{R, _}
@@ -1017,29 +1042,29 @@ read_group(GroupName) ->
[Group] -> Group
end.
-prune_or_create_group(Self, GroupName) ->
- {atomic, Group} =
- mnesia:sync_transaction(
- fun () -> GroupNew = #gm_group { name = GroupName,
- members = [Self],
- version = ?VERSION_START },
- case mnesia:read({?GROUP_TABLE, GroupName}) of
- [] ->
- mnesia:write(GroupNew),
- GroupNew;
- [Group1 = #gm_group { members = Members }] ->
- case lists:any(fun is_member_alive/1, Members) of
- true -> Group1;
- false -> mnesia:write(GroupNew),
- GroupNew
- end
- end
- end),
+prune_or_create_group(Self, GroupName, TxnFun) ->
+ Group = TxnFun(
+ fun () ->
+ GroupNew = #gm_group { name = GroupName,
+ members = [Self],
+ version = ?VERSION_START },
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
+ [] ->
+ mnesia:write(GroupNew),
+ GroupNew;
+ [Group1 = #gm_group { members = Members }] ->
+ case lists:any(fun is_member_alive/1, Members) of
+ true -> Group1;
+ false -> mnesia:write(GroupNew),
+ GroupNew
+ end
+ end
+ end),
Group.
-record_dead_member_in_group(Member, GroupName) ->
- {atomic, Group} =
- mnesia:sync_transaction(
+record_dead_member_in_group(Member, GroupName, TxnFun) ->
+ Group =
+ TxnFun(
fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
mnesia:read({?GROUP_TABLE, GroupName}),
case lists:splitwith(
@@ -1056,9 +1081,9 @@ record_dead_member_in_group(Member, GroupName) ->
end),
Group.
-record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
- {atomic, {Result, Group}} =
- mnesia:sync_transaction(
+record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) ->
+ {Result, Group} =
+ TxnFun(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
mnesia:read({?GROUP_TABLE, GroupName}),
@@ -1073,10 +1098,10 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
end),
{Result, Group}.
-erase_members_in_group(Members, GroupName) ->
+erase_members_in_group(Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
- {atomic, Group} =
- mnesia:sync_transaction(
+ Group =
+ TxnFun(
fun () ->
[Group1 = #gm_group { members = [_|_] = Members1,
version = Ver }] =
@@ -1097,7 +1122,8 @@ maybe_erase_aliases(State = #state { self = Self,
view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }, View) ->
+ callback_args = Args,
+ txn_executor = TxnFun }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1114,7 +1140,7 @@ maybe_erase_aliases(State = #state { self = Self,
case Erasable of
[] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
- erase_members_in_group(Erasable, GroupName)),
+ erase_members_in_group(Erasable, GroupName, TxnFun)),
{callback_view_changed(Args, Module, View0, View1),
check_neighbours(State1 #state { view = View1 })}
end.