diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-18 11:43:22 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-18 11:43:22 +0000 |
| commit | e32a511f94f008b8676eb85d72655ba7de4cbd93 (patch) | |
| tree | 5270c7a7ace9cbf4400d7f8435abde1828fb4000 /src/gm.erl | |
| parent | 540ba12d7ea23f286d477f665a2b8b74c2289096 (diff) | |
| parent | d8ebb900ad0477e737e5d20c4337e1837508348a (diff) | |
| download | rabbitmq-server-git-e32a511f94f008b8676eb85d72655ba7de4cbd93.tar.gz | |
Merge in default.
Diffstat (limited to 'src/gm.erl')
| -rw-r--r-- | src/gm.erl | 251 |
1 files changed, 130 insertions, 121 deletions
diff --git a/src/gm.erl b/src/gm.erl index 97c81ec635..2057b1f577 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -77,9 +77,13 @@ %% confirmed_broadcast/2 directly from the callback module otherwise %% you will deadlock the entire group. %% -%% group_members/1 -%% Provide the Pid. Returns a list of the current group members. +%% info/1 +%% Provide the Pid. Returns a proplist with various facts, including +%% the group name and the current group members. %% +%% forget_group/1 +%% Provide the group name. Removes its mnesia record. Makes no attempt +%% to ensure the group is empty. %% %% Implementation Overview %% ----------------------- @@ -372,8 +376,8 @@ -behaviour(gen_server2). --export([create_tables/0, start_link/3, leave/1, broadcast/2, - confirmed_broadcast/2, group_members/1]). +-export([create_tables/0, start_link/4, leave/1, broadcast/2, + confirmed_broadcast/2, info/1, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/2]). @@ -404,7 +408,8 @@ callback_args, confirms, broadcast_buffer, - broadcast_timer + broadcast_timer, + txn_executor }). -record(gm_group, { name, version, members }). @@ -424,14 +429,16 @@ -export_type([group_name/0]). -type(group_name() :: any()). +-type(txn_fun() :: fun((fun(() -> any())) -> any())). -spec(create_tables/0 :: () -> 'ok' | {'aborted', any()}). --spec(start_link/3 :: (group_name(), atom(), any()) -> +-spec(start_link/4 :: (group_name(), atom(), any(), txn_fun()) -> rabbit_types:ok_pid_or_error()). -spec(leave/1 :: (pid()) -> 'ok'). -spec(broadcast/2 :: (pid(), any()) -> 'ok'). -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). --spec(group_members/1 :: (pid()) -> [pid()]). +-spec(info/1 :: (pid()) -> rabbit_types:infos()). +-spec(forget_group/1 :: (group_name()) -> 'ok'). %% The joined, members_changed and handle_msg callbacks can all return %% any of the following terms: @@ -502,8 +509,8 @@ table_definitions() -> {Name, Attributes} = ?TABLE, [{Name, [?TABLE_MATCH | Attributes]}]. -start_link(GroupName, Module, Args) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args], []). +start_link(GroupName, Module, Args, TxnFun) -> + gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). leave(Server) -> gen_server2:cast(Server, leave). @@ -514,11 +521,17 @@ broadcast(Server, Msg) -> confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). -group_members(Server) -> - gen_server2:call(Server, group_members, infinity). +info(Server) -> + gen_server2:call(Server, info, infinity). +forget_group(GroupName) -> + {atomic, ok} = mnesia:sync_transaction( + fun () -> + mnesia:delete({?GROUP_TABLE, GroupName}) + end), + ok. -init([GroupName, Module, Args]) -> +init([GroupName, Module, Args, TxnFun]) -> {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), Self = make_member(GroupName), @@ -534,7 +547,8 @@ init([GroupName, Module, Args]) -> callback_args = Args, confirms = queue:new(), broadcast_buffer = [], - broadcast_timer = undefined }, hibernate, + broadcast_timer = undefined, + txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -553,12 +567,16 @@ handle_call({confirmed_broadcast, Msg}, _From, handle_call({confirmed_broadcast, Msg}, From, State) -> internal_broadcast(Msg, From, State); -handle_call(group_members, _From, +handle_call(info, _From, State = #state { members_state = undefined }) -> reply(not_joined, State); -handle_call(group_members, _From, State = #state { view = View }) -> - reply(alive_view_members(View), State); +handle_call(info, _From, State = #state { group_name = GroupName, + module = Module, + view = View }) -> + reply([{group_name, GroupName}, + {module, Module}, + {group_members, get_pids(alive_view_members(View))}], State); handle_call({add_on_right, _NewMember}, _From, State = #state { members_state = undefined }) -> @@ -570,7 +588,8 @@ handle_call({add_on_right, NewMember}, _From, view = View, members_state = MembersState, module = Module, - callback_args = Args }) -> + callback_args = Args, + txn_executor = TxnFun }) -> {MembersState1, Group} = record_new_member_in_group( GroupName, Self, NewMember, @@ -581,7 +600,7 @@ handle_call({add_on_right, NewMember}, _From, {catchup, Self, prepare_members_state(MembersState1)}), MembersState1 - end), + end, TxnFun), View2 = group_to_view(Group), State1 = check_neighbours(State #state { view = View2, members_state = MembersState1 }), @@ -627,8 +646,9 @@ handle_cast(join, State = #state { self = Self, group_name = GroupName, members_state = undefined, module = Module, - callback_args = Args }) -> - View = join_group(Self, GroupName), + callback_args = Args, + txn_executor = TxnFun }) -> + View = join_group(Self, GroupName, TxnFun), MembersState = case alive_view_members(View) of [Self] -> blank_member_state(); @@ -647,7 +667,10 @@ handle_info(flush, State) -> noreply( flush_broadcast_buffer(State #state { broadcast_timer = undefined })); -handle_info({'DOWN', MRef, process, _Pid, _Reason}, +handle_info(timeout, State) -> + noreply(flush_broadcast_buffer(State)); + +handle_info({'DOWN', MRef, process, _Pid, Reason}, State = #state { self = Self, left = Left, right = Right, @@ -655,18 +678,22 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, view = View, module = Module, callback_args = Args, - confirms = Confirms }) -> + confirms = Confirms, + txn_executor = TxnFun }) -> Member = case {Left, Right} of {{Member1, MRef}, _} -> Member1; {_, {Member1, MRef}} -> Member1; _ -> undefined end, - case Member of - undefined -> + case {Member, Reason} of + {undefined, _} -> + noreply(State); + {_, {shutdown, ring_shutdown}} -> noreply(State); _ -> View1 = - group_to_view(record_dead_member_in_group(Member, GroupName)), + group_to_view(record_dead_member_in_group(Member, + GroupName, TxnFun)), {Result, State2} = case alive_view_members(View1) of [Self] -> @@ -810,10 +837,13 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> noreply(State) -> - {noreply, ensure_broadcast_timer(State), hibernate}. + {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. reply(Reply, State) -> - {reply, Reply, ensure_broadcast_timer(State), hibernate}. + {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. + +flush_timeout(#state{broadcast_buffer = []}) -> hibernate; +flush_timeout(_) -> 0. ensure_broadcast_timer(State = #state { broadcast_buffer = [], broadcast_timer = undefined }) -> @@ -876,11 +906,9 @@ flush_broadcast_buffer(State = #state { self = Self, %% View construction and inspection %% --------------------------------------------------------------------------- -needs_view_update(ReqVer, {Ver, _View}) -> - Ver < ReqVer. +needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer. -view_version({Ver, _View}) -> - Ver. +view_version({Ver, _View}) -> Ver. is_member_alive({dead, _Member}) -> false; is_member_alive(_) -> true. @@ -899,17 +927,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). -fetch_view_member(Id, {_Ver, View}) -> - ?DICT:fetch(Id, View). +fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). -find_view_member(Id, {_Ver, View}) -> - ?DICT:find(Id, View). +find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). -blank_view(Ver) -> - {Ver, ?DICT:new()}. +blank_view(Ver) -> {Ver, ?DICT:new()}. -alive_view_members({_Ver, View}) -> - ?DICT:fetch_keys(View). +alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). all_known_members({_Ver, View}) -> ?DICT:fold( @@ -974,14 +998,15 @@ ensure_alive_suffix1(MembersQ) -> %% View modification %% --------------------------------------------------------------------------- -join_group(Self, GroupName) -> - join_group(Self, GroupName, read_group(GroupName)). +join_group(Self, GroupName, TxnFun) -> + join_group(Self, GroupName, read_group(GroupName), TxnFun). -join_group(Self, GroupName, {error, not_found}) -> - join_group(Self, GroupName, prune_or_create_group(Self, GroupName)); -join_group(Self, _GroupName, #gm_group { members = [Self] } = Group) -> +join_group(Self, GroupName, {error, not_found}, TxnFun) -> + join_group(Self, GroupName, + prune_or_create_group(Self, GroupName, TxnFun), TxnFun); +join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) -> group_to_view(Group); -join_group(Self, GroupName, #gm_group { members = Members } = Group) -> +join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> case lists:member(Self, Members) of true -> group_to_view(Group); @@ -989,20 +1014,22 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> case lists:filter(fun is_member_alive/1, Members) of [] -> join_group(Self, GroupName, - prune_or_create_group(Self, GroupName)); + prune_or_create_group(Self, GroupName, TxnFun)); Alive -> Left = lists:nth(random:uniform(length(Alive)), Alive), Handler = fun () -> join_group( Self, GroupName, - record_dead_member_in_group(Left, GroupName)) + record_dead_member_in_group( + Left, GroupName, TxnFun), + TxnFun) end, try case gen_server2:call( get_pid(Left), {add_on_right, Self}, infinity) of {ok, Group1} -> group_to_view(Group1); - not_ready -> join_group(Self, GroupName) + not_ready -> join_group(Self, GroupName, TxnFun) end catch exit:{R, _} @@ -1021,29 +1048,29 @@ read_group(GroupName) -> [Group] -> Group end. -prune_or_create_group(Self, GroupName) -> - {atomic, Group} = - mnesia:sync_transaction( - fun () -> GroupNew = #gm_group { name = GroupName, - members = [Self], - version = ?VERSION_START }, - case mnesia:read({?GROUP_TABLE, GroupName}) of - [] -> - mnesia:write(GroupNew), - GroupNew; - [Group1 = #gm_group { members = Members }] -> - case lists:any(fun is_member_alive/1, Members) of - true -> Group1; - false -> mnesia:write(GroupNew), - GroupNew - end - end - end), +prune_or_create_group(Self, GroupName, TxnFun) -> + Group = TxnFun( + fun () -> + GroupNew = #gm_group { name = GroupName, + members = [Self], + version = ?VERSION_START }, + case mnesia:read({?GROUP_TABLE, GroupName}) of + [] -> + mnesia:write(GroupNew), + GroupNew; + [Group1 = #gm_group { members = Members }] -> + case lists:any(fun is_member_alive/1, Members) of + true -> Group1; + false -> mnesia:write(GroupNew), + GroupNew + end + end + end), Group. -record_dead_member_in_group(Member, GroupName) -> - {atomic, Group} = - mnesia:sync_transaction( +record_dead_member_in_group(Member, GroupName, TxnFun) -> + Group = + TxnFun( fun () -> [Group1 = #gm_group { members = Members, version = Ver }] = mnesia:read({?GROUP_TABLE, GroupName}), case lists:splitwith( @@ -1060,9 +1087,9 @@ record_dead_member_in_group(Member, GroupName) -> end), Group. -record_new_member_in_group(GroupName, Left, NewMember, Fun) -> - {atomic, {Result, Group}} = - mnesia:sync_transaction( +record_new_member_in_group(GroupName, Left, NewMember, Fun, TxnFun) -> + {Result, Group} = + TxnFun( fun () -> [#gm_group { members = Members, version = Ver } = Group1] = mnesia:read({?GROUP_TABLE, GroupName}), @@ -1077,10 +1104,10 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) -> end), {Result, Group}. -erase_members_in_group(Members, GroupName) -> +erase_members_in_group(Members, GroupName, TxnFun) -> DeadMembers = [{dead, Id} || Id <- Members], - {atomic, Group} = - mnesia:sync_transaction( + Group = + TxnFun( fun () -> [Group1 = #gm_group { members = [_|_] = Members1, version = Ver }] = @@ -1101,7 +1128,8 @@ maybe_erase_aliases(State = #state { self = Self, view = View0, members_state = MembersState, module = Module, - callback_args = Args }, View) -> + callback_args = Args, + txn_executor = TxnFun }, View) -> #view_member { aliases = Aliases } = fetch_view_member(Self, View), {Erasable, MembersState1} = ?SETS:fold( @@ -1118,7 +1146,7 @@ maybe_erase_aliases(State = #state { self = Self, case Erasable of [] -> {ok, State1 #state { view = View }}; _ -> View1 = group_to_view( - erase_members_in_group(Erasable, GroupName)), + erase_members_in_group(Erasable, GroupName, TxnFun)), {callback_view_changed(Args, Module, View0, View1), check_neighbours(State1 #state { view = View1 })} end. @@ -1150,10 +1178,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> end, {Neighbour, maybe_monitor(Neighbour, Self)}. -maybe_monitor(Self, Self) -> - undefined; -maybe_monitor(Other, _Self) -> - erlang:monitor(process, get_pid(Other)). +maybe_monitor( Self, Self) -> undefined; +maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1242,23 +1268,19 @@ find_member_or_blank(Id, MembersState) -> error -> blank_member() end. -erase_member(Id, MembersState) -> - ?DICT:erase(Id, MembersState). +erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. -blank_member_state() -> - ?DICT:new(). +blank_member_state() -> ?DICT:new(). store_member(Id, MemberState, MembersState) -> ?DICT:store(Id, MemberState, MembersState). -prepare_members_state(MembersState) -> - ?DICT:to_list(MembersState). +prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). -build_members_state(MembersStateList) -> - ?DICT:from_list(MembersStateList). +build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). make_member(GroupName) -> {case read_group(GroupName) of @@ -1280,16 +1302,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% Activity assembly %% --------------------------------------------------------------------------- -activity_nil() -> - queue:new(). +activity_nil() -> queue:new(). -activity_cons(_Id, [], [], Tail) -> - Tail; -activity_cons(Sender, Pubs, Acks, Tail) -> - queue:in({Sender, Pubs, Acks}, Tail). +activity_cons( _Id, [], [], Tail) -> Tail; +activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail). -activity_finalise(Activity) -> - queue:to_list(Activity). +activity_finalise(Activity) -> queue:to_list(Activity). maybe_send_activity([], _State) -> ok; @@ -1393,34 +1411,25 @@ purge_confirms(Confirms) -> %% Msg transformation %% --------------------------------------------------------------------------- -acks_from_queue(Q) -> - [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. +acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)]. -pubs_from_queue(Q) -> - queue:to_list(Q). +pubs_from_queue(Q) -> queue:to_list(Q). -queue_from_pubs(Pubs) -> - queue:from_list(Pubs). +queue_from_pubs(Pubs) -> queue:from_list(Pubs). -apply_acks([], Pubs) -> - Pubs; -apply_acks(List, Pubs) -> - {_, Pubs1} = queue:split(length(List), Pubs), - Pubs1. +apply_acks( [], Pubs) -> Pubs; +apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs), + Pubs1. join_pubs(Q, []) -> Q; join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). -last_ack([], LA) -> - LA; -last_ack(List, LA) -> - LA1 = lists:last(List), - true = LA1 > LA, %% ASSERTION - LA1. - -last_pub([], LP) -> - LP; -last_pub(List, LP) -> - {PubNum, _Msg} = lists:last(List), - true = PubNum > LP, %% ASSERTION - PubNum. +last_ack( [], LA) -> LA; +last_ack(List, LA) -> LA1 = lists:last(List), + true = LA1 > LA, %% ASSERTION + LA1. + +last_pub( [], LP) -> LP; +last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), + true = PubNum > LP, %% ASSERTION + PubNum. |
