summaryrefslogtreecommitdiff
path: root/src/gm.erl
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-10-17 16:25:40 +0100
committerTim Watson <tim@rabbitmq.com>2012-10-17 16:25:40 +0100
commit6111d956cd72b9f600424f65cccd2a13a5ab57bb (patch)
tree555e31eff31ff2c9feda108d312858820ac707d4 /src/gm.erl
parent1021784c9d4258099607aed3c72b536f720e8d48 (diff)
downloadrabbitmq-server-git-6111d956cd72b9f600424f65cccd2a13a5ab57bb.tar.gz
do not couple gm with rabbit_misc
Diffstat (limited to 'src/gm.erl')
-rw-r--r--src/gm.erl115
1 files changed, 63 insertions, 52 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 285bb927e9..43cca49527 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -376,7 +376,7 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/3, leave/1, broadcast/2,
+-export([create_tables/0, start_link/4, leave/1, broadcast/2,
confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -408,7 +408,8 @@
callback_args,
confirms,
broadcast_buffer,
- broadcast_timer
+ broadcast_timer,
+ txn_executor
}).
-record(gm_group, { name, version, members }).
@@ -428,9 +429,10 @@
-export_type([group_name/0]).
-type(group_name() :: any()).
+-type(txn_fun() :: fun((fun(() -> any())) -> any())).
-spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}).
--spec(start_link/3 :: (group_name(), atom(), any()) ->
+-spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
@@ -507,8 +509,8 @@ table_definitions() ->
{Name, Attributes} = ?TABLE,
[{Name, [?TABLE_MATCH | Attributes]}].
-start_link(GroupName, Module, Args) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args], []).
+start_link(GroupName, Module, TxnFun, Args) ->
+ gen_server2:start_link(?MODULE, [GroupName, Module, TxnFun, Args], []).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -523,13 +525,13 @@ info(Server) ->
gen_server2:call(Server, info, infinity).
forget_group(GroupName) ->
- ok = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:delete({?GROUP_TABLE, GroupName})
- end),
+ {atomic, ok} = mnesia:sync_transaction(
+ fun () ->
+ mnesia:delete({?GROUP_TABLE, GroupName})
+ end),
ok.
-init([GroupName, Module, Args]) ->
+init([GroupName, Module, TxnFun, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
Self = make_member(GroupName),
@@ -545,7 +547,8 @@ init([GroupName, Module, Args]) ->
callback_args = Args,
confirms = queue:new(),
broadcast_buffer = [],
- broadcast_timer = undefined }, hibernate,
+ broadcast_timer = undefined,
+ txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -585,7 +588,8 @@ handle_call({add_on_right, NewMember}, _From,
view = View,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
{MembersState1, Group} =
record_new_member_in_group(
GroupName, Self, NewMember,
@@ -596,7 +600,7 @@ handle_call({add_on_right, NewMember}, _From,
{catchup, Self,
prepare_members_state(MembersState1)}),
MembersState1
- end),
+ end, TxnFun),
View2 = group_to_view(Group),
State1 = check_neighbours(State #state { view = View2,
members_state = MembersState1 }),
@@ -642,8 +646,9 @@ handle_cast(join, State = #state { self = Self,
group_name = GroupName,
members_state = undefined,
module = Module,
- callback_args = Args }) ->
- View = join_group(Self, GroupName),
+ callback_args = Args,
+ txn_executor = TxnFun }) ->
+ View = join_group(Self, GroupName, TxnFun),
MembersState =
case alive_view_members(View) of
[Self] -> blank_member_state();
@@ -670,7 +675,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
view = View,
module = Module,
callback_args = Args,
- confirms = Confirms }) ->
+ confirms = Confirms,
+ txn_executor = TxnFun }) ->
Member = case {Left, Right} of
{{Member1, MRef}, _} -> Member1;
{_, {Member1, MRef}} -> Member1;
@@ -683,7 +689,8 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
noreply(State);
_ ->
View1 =
- group_to_view(record_dead_member_in_group(Member, GroupName)),
+ group_to_view(record_dead_member_in_group(Member,
+ GroupName, TxnFun)),
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
@@ -985,14 +992,15 @@ ensure_alive_suffix1(MembersQ) ->
%% View modification
%% ---------------------------------------------------------------------------
-join_group(Self, GroupName) ->
- join_group(Self, GroupName, read_group(GroupName)).
+join_group(Self, GroupName, TxnFun) ->
+ join_group(Self, GroupName, read_group(GroupName), TxnFun).
-join_group(Self, GroupName, {error, not_found}) ->
- join_group(Self, GroupName, prune_or_create_group(Self, GroupName));
-join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) ->
+join_group(Self, GroupName, {error, not_found}, TxnFun) ->
+ join_group(Self, GroupName,
+ prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
+join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
group_to_view(Group);
-join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
+join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:member(Self, Members) of
true ->
group_to_view(Group);
@@ -1000,20 +1008,23 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
join_group(Self, GroupName,
- prune_or_create_group(Self, GroupName));
+ prune_or_create_group(Self, GroupName, TxnFun));
Alive ->
Left = lists:nth(random:uniform(length(Alive)), Alive),
Handler =
fun () ->
join_group(
Self, GroupName,
- record_dead_member_in_group(Left, GroupName))
+ record_dead_member_in_group(Left,
+ GroupName,
+ TxnFun),
+ TxnFun)
end,
try
case gen_server2:call(
get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
- not_ready -> join_group(Self, GroupName)
+ not_ready -> join_group(Self, GroupName, TxnFun)
end
catch
exit:{R, _}
@@ -1032,29 +1043,28 @@ read_group(GroupName) ->
[Group] -> Group
end.
-prune_or_create_group(Self, GroupName) ->
- Group =
- rabbit_misc:execute_mnesia_transaction(
- fun () -> GroupNew = #gm_group { name = GroupName,
- members = [Self],
- version = ?VERSION_START },
- case mnesia:read({?GROUP_TABLE, GroupName}) of
- [] ->
- mnesia:write(GroupNew),
- GroupNew;
- [Group1 = #gm_group { members = Members }] ->
- case lists:any(fun is_member_alive/1, Members) of
- true -> Group1;
- false -> mnesia:write(GroupNew),
- GroupNew
- end
- end
- end),
+prune_or_create_group(Self, GroupName, TxnFun) ->
+ Group = TxnFun(
+ fun () -> GroupNew = #gm_group { name = GroupName,
+ members = [Self],
+ version = ?VERSION_START },
+ case mnesia:read({?GROUP_TABLE, GroupName}) of
+ [] ->
+ mnesia:write(GroupNew),
+ GroupNew;
+ [Group1 = #gm_group { members = Members }] ->
+ case lists:any(fun is_member_alive/1, Members) of
+ true -> Group1;
+ false -> mnesia:write(GroupNew),
+ GroupNew
+ end
+ end
+ end),
Group.
-record_dead_member_in_group(Member, GroupName) ->
+record_dead_member_in_group(Member, GroupName, TxnFun) ->
Group =
- rabbit_misc:execute_mnesia_transaction(
+ TxnFun(
fun () -> [Group1 = #gm_group { members = Members, version = Ver }] =
mnesia:read({?GROUP_TABLE, GroupName}),
case lists:splitwith(
@@ -1071,9 +1081,9 @@ record_dead_member_in_group(Member, GroupName) ->
end),
Group.
-record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
+record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) ->
{Result, Group} =
- rabbit_misc:execute_mnesia_transaction(
+ TxnFun(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
mnesia:read({?GROUP_TABLE, GroupName}),
@@ -1088,10 +1098,10 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
end),
{Result, Group}.
-erase_members_in_group(Members, GroupName) ->
+erase_members_in_group(Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
Group =
- rabbit_misc:execute_mnesia_transaction(
+ TxnFun(
fun () ->
[Group1 = #gm_group { members = [_|_] = Members1,
version = Ver }] =
@@ -1112,7 +1122,8 @@ maybe_erase_aliases(State = #state { self = Self,
view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }, View) ->
+ callback_args = Args,
+ txn_executor = TxnFun }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1129,7 +1140,7 @@ maybe_erase_aliases(State = #state { self = Self,
case Erasable of
[] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
- erase_members_in_group(Erasable, GroupName)),
+ erase_members_in_group(Erasable, GroupName, TxnFun)),
{callback_view_changed(Args, Module, View0, View1),
check_neighbours(State1 #state { view = View1 })}
end.