diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 351 |
1 files changed, 221 insertions, 130 deletions
diff --git a/src/gm.erl b/src/gm.erl index aeb050e15f..199cf7c4de 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -617,14 +617,20 @@ handle_call({add_on_right, NewMember}, _From, group_name = GroupName, members_state = MembersState, txn_executor = TxnFun }) -> - Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun), - View1 = group_to_view(Group), - MembersState1 = remove_erased_members(MembersState, View1), - ok = send_right(NewMember, View1, - {catchup, Self, prepare_members_state(MembersState1)}), - {Result, State1} = change_view(View1, State #state { - members_state = MembersState1 }), - handle_callback_result({Result, {ok, Group}, State1}). + try + Group = record_new_member_in_group( + NewMember, Self, GroupName, TxnFun), + View1 = group_to_view(check_membership(Self, Group)), + MembersState1 = remove_erased_members(MembersState, View1), + ok = send_right(NewMember, View1, + {catchup, Self, prepare_members_state(MembersState1)}), + {Result, State1} = change_view(View1, State #state { + members_state = MembersState1 }), + handle_callback_result({Result, {ok, Group}, State1}) + catch + lost_membership -> + {stop, normal, State} + end. %% add_on_right causes a catchup to be sent immediately from the left, %% so we can never see this from the left neighbour. However, it's @@ -638,19 +644,28 @@ handle_cast({?TAG, _ReqVer, check_neighbours}, handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, + self = Self, members_state = MembersState, group_name = GroupName }) -> - {Result, State1} = - case needs_view_update(ReqVer, View) of - true -> View1 = group_to_view(dirty_read_group(GroupName)), - MemberState1 = remove_erased_members(MembersState, View1), - change_view(View1, State #state { - members_state = MemberState1 }); - false -> {ok, State} - end, - handle_callback_result( - if_callback_success( - Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); + try + {Result, State1} = + case needs_view_update(ReqVer, View) of + true -> + View1 = group_to_view( + check_membership(Self, + dirty_read_group(GroupName))), + MemberState1 = remove_erased_members(MembersState, View1), + change_view(View1, State #state { + members_state = MemberState1 }); + false -> {ok, State} + end, + handle_callback_result( + if_callback_success( + Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)) + catch + lost_membership -> + {stop, normal, State} + end; handle_cast({broadcast, _Msg, _SizeHint}, State = #state { shutting_down = {true, _} }) -> @@ -724,39 +739,44 @@ handle_info({'DOWN', MRef, process, _Pid, Reason}, group_name = GroupName, confirms = Confirms, txn_executor = TxnFun }) -> - Member = case {Left, Right} of - {{Member1, MRef}, _} -> Member1; - {_, {Member1, MRef}} -> Member1; - _ -> undefined - end, - case {Member, Reason} of - {undefined, _} -> - noreply(State); - {_, {shutdown, ring_shutdown}} -> - noreply(State); - _ -> - %% In the event of a partial partition we could see another member - %% go down and then remove them from Mnesia. While they can - %% recover from this they'd have to restart the queue - not - %% ideal. So let's sleep here briefly just in case this was caused - %% by a partial partition; in which case by the time we record the - %% member death in Mnesia we will probably be in a full - %% partition and will not be assassinating another member. - timer:sleep(100), - View1 = group_to_view(record_dead_member_in_group( - Member, GroupName, TxnFun)), - handle_callback_result( - case alive_view_members(View1) of - [Self] -> maybe_erase_aliases( - State #state { - members_state = blank_member_state(), - confirms = purge_confirms(Confirms) }, - View1); - _ -> change_view(View1, State) - end) + try + check_membership(GroupName), + Member = case {Left, Right} of + {{Member1, MRef}, _} -> Member1; + {_, {Member1, MRef}} -> Member1; + _ -> undefined + end, + case {Member, Reason} of + {undefined, _} -> + noreply(State); + {_, {shutdown, ring_shutdown}} -> + noreply(State); + _ -> + %% In the event of a partial partition we could see another member + %% go down and then remove them from Mnesia. While they can + %% recover from this they'd have to restart the queue - not + %% ideal. So let's sleep here briefly just in case this was caused + %% by a partial partition; in which case by the time we record the + %% member death in Mnesia we will probably be in a full + %% partition and will not be assassinating another member. + timer:sleep(100), + View1 = group_to_view(record_dead_member_in_group(Self, + Member, GroupName, TxnFun, true)), + handle_callback_result( + case alive_view_members(View1) of + [Self] -> maybe_erase_aliases( + State #state { + members_state = blank_member_state(), + confirms = purge_confirms(Confirms) }, + View1); + _ -> change_view(View1, State) + end) + end + catch + lost_membership -> + {stop, normal, State} end. - terminate(Reason, #state { module = Module, callback_args = Args }) -> Module:handle_terminate(Args, Reason). @@ -841,52 +861,30 @@ handle_msg({catchup, _NotLeft, _MembersState}, State) -> handle_msg({activity, Left, Activity}, State = #state { self = Self, + group_name = GroupName, left = {Left, _MRefL}, view = View, members_state = MembersState, confirms = Confirms }) when MembersState =/= undefined -> - {MembersState1, {Confirms1, Activity1}} = - lists:foldl( - fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) -> - with_member_acc( - fun (Member = #member { pending_ack = PA, - last_pub = LP, - last_ack = LA }, - {Confirms2, Activity2}) -> - case is_member_alias(Id, Self, View) of - true -> - {ToAck, PA1} = - find_common(queue_from_pubs(Pubs), PA, - queue:new()), - LA1 = last_ack(Acks, LA), - AckNums = acks_from_queue(ToAck), - Confirms3 = maybe_confirm( - Self, Id, Confirms2, AckNums), - {Member #member { pending_ack = PA1, - last_ack = LA1 }, - {Confirms3, - activity_cons( - Id, [], AckNums, Activity2)}}; - false -> - PA1 = apply_acks(Acks, join_pubs(PA, Pubs)), - LA1 = last_ack(Acks, LA), - LP1 = last_pub(Pubs, LP), - {Member #member { pending_ack = PA1, - last_pub = LP1, - last_ack = LA1 }, - {Confirms2, - activity_cons(Id, Pubs, Acks, Activity2)}} - end - end, Id, MembersStateConfirmsActivity) - end, {MembersState, {Confirms, activity_nil()}}, Activity), - State1 = State #state { members_state = MembersState1, - confirms = Confirms1 }, - Activity3 = activity_finalise(Activity1), - ok = maybe_send_activity(Activity3, State1), - {Result, State2} = maybe_erase_aliases(State1, View), - if_callback_success( - Result, fun activity_true/3, fun activity_false/3, Activity3, State2); + try + %% If we have to stop, do it asap so we avoid any ack confirmation + %% Membership must be checked again by erase_members_in_group, as the + %% node can be marked as dead on the meanwhile + check_membership(GroupName), + {MembersState1, {Confirms1, Activity1}} = + calculate_activity(MembersState, Confirms, Activity, Self, View), + State1 = State #state { members_state = MembersState1, + confirms = Confirms1 }, + Activity3 = activity_finalise(Activity1), + ok = maybe_send_activity(Activity3, State1), + {Result, State2} = maybe_erase_aliases(State1, View), + if_callback_success( + Result, fun activity_true/3, fun activity_false/3, Activity3, State2) + catch + lost_membership -> + {{stop, normal}, State} + end; handle_msg({activity, _NotLeft, _Activity}, State) -> {ok, State}. @@ -1091,8 +1089,8 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> fun () -> join_group( Self, GroupName, - record_dead_member_in_group( - Left, GroupName, TxnFun), + record_dead_member_in_group(Self, + Left, GroupName, TxnFun, false), TxnFun) end, try @@ -1142,47 +1140,84 @@ prune_or_create_group(Self, GroupName, TxnFun) -> end end). -record_dead_member_in_group(Member, GroupName, TxnFun) -> - TxnFun( - fun () -> - Group = #gm_group { members = Members, version = Ver } = - read_group(GroupName), - case lists:splitwith( - fun (Member1) -> Member1 =/= Member end, Members) of - {_Members1, []} -> %% not found - already recorded dead - Group; - {Members1, [Member | Members2]} -> - Members3 = Members1 ++ [{dead, Member} | Members2], - write_group(Group #gm_group { members = Members3, - version = Ver + 1 }) - end - end). +record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) -> + Fun = + fun () -> + try + Group = #gm_group { members = Members, version = Ver } = + case Verify of + true -> + check_membership(Self, read_group(GroupName)); + false -> + read_group(GroupName) + end, + case lists:splitwith( + fun (Member1) -> Member1 =/= Member end, Members) of + {_Members1, []} -> %% not found - already recorded dead + Group; + {Members1, [Member | Members2]} -> + Members3 = Members1 ++ [{dead, Member} | Members2], + write_group(Group #gm_group { members = Members3, + version = Ver + 1 }) + end + catch + lost_membership -> + %% The transaction must not be abruptly crashed, but + %% leave the gen_server to stop normally + {error, lost_membership} + end + end, + handle_lost_membership_in_txn(TxnFun, Fun). + +handle_lost_membership_in_txn(TxnFun, Fun) -> + case TxnFun(Fun) of + {error, lost_membership = T} -> + throw(T); + Any -> + Any + end. record_new_member_in_group(NewMember, Left, GroupName, TxnFun) -> - TxnFun( - fun () -> - Group = #gm_group { members = Members, version = Ver } = - read_group(GroupName), - {Prefix, [Left | Suffix]} = - lists:splitwith(fun (M) -> M =/= Left end, Members), - write_group(Group #gm_group { - members = Prefix ++ [Left, NewMember | Suffix], - version = Ver + 1 }) - end). + Fun = + fun () -> + try + Group = #gm_group { members = Members, version = Ver } = + check_membership(Left, read_group(GroupName)), + {Prefix, [Left | Suffix]} = + lists:splitwith(fun (M) -> M =/= Left end, Members), + write_group(Group #gm_group { + members = Prefix ++ [Left, NewMember | Suffix], + version = Ver + 1 }) + catch + lost_membership -> + %% The transaction must not be abruptly crashed, but + %% leave the gen_server to stop normally + {error, lost_membership} + end + end, + handle_lost_membership_in_txn(TxnFun, Fun). -erase_members_in_group(Members, GroupName, TxnFun) -> +erase_members_in_group(Self, Members, GroupName, TxnFun) -> DeadMembers = [{dead, Id} || Id <- Members], - TxnFun( - fun () -> - Group = #gm_group { members = [_|_] = Members1, version = Ver } = - read_group(GroupName), - case Members1 -- DeadMembers of - Members1 -> Group; - Members2 -> write_group( - Group #gm_group { members = Members2, - version = Ver + 1 }) + Fun = + fun () -> + try + Group = #gm_group { members = [_|_] = Members1, version = Ver } = + check_membership(Self, read_group(GroupName)), + case Members1 -- DeadMembers of + Members1 -> Group; + Members2 -> write_group( + Group #gm_group { members = Members2, + version = Ver + 1 }) + end + catch + lost_membership -> + %% The transaction must not be abruptly crashed, but + %% leave the gen_server to stop normally + {error, lost_membership} end - end). + end, + handle_lost_membership_in_txn(TxnFun, Fun). maybe_erase_aliases(State = #state { self = Self, group_name = GroupName, @@ -1203,7 +1238,7 @@ maybe_erase_aliases(State = #state { self = Self, View1 = case Erasable of [] -> View; _ -> group_to_view( - erase_members_in_group(Erasable, GroupName, TxnFun)) + erase_members_in_group(Self, Erasable, GroupName, TxnFun)) end, change_view(View1, State #state { members_state = MembersState1 }). @@ -1378,6 +1413,41 @@ maybe_send_activity(Activity, #state { self = Self, send_right(Right, View, Msg) -> ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}). +calculate_activity(MembersState, Confirms, Activity, Self, View) -> + lists:foldl( + fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) -> + with_member_acc( + fun (Member = #member { pending_ack = PA, + last_pub = LP, + last_ack = LA }, + {Confirms2, Activity2}) -> + case is_member_alias(Id, Self, View) of + true -> + {ToAck, PA1} = + find_common(queue_from_pubs(Pubs), PA, + queue:new()), + LA1 = last_ack(Acks, LA), + AckNums = acks_from_queue(ToAck), + Confirms3 = maybe_confirm( + Self, Id, Confirms2, AckNums), + {Member #member { pending_ack = PA1, + last_ack = LA1 }, + {Confirms3, + activity_cons( + Id, [], AckNums, Activity2)}}; + false -> + PA1 = apply_acks(Acks, join_pubs(PA, Pubs)), + LA1 = last_ack(Acks, LA), + LP1 = last_pub(Pubs, LP), + {Member #member { pending_ack = PA1, + last_pub = LP1, + last_ack = LA1 }, + {Confirms2, + activity_cons(Id, Pubs, Acks, Activity2)}} + end + end, Id, MembersStateConfirmsActivity) + end, {MembersState, {Confirms, activity_nil()}}, Activity). + callback(Args, Module, Activity) -> Result = lists:foldl( @@ -1530,3 +1600,24 @@ call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout). cast(Pid, Msg) -> gen_server2:cast(Pid, Msg). monitor(Pid) -> erlang:monitor(process, Pid). demonitor(MRef) -> erlang:demonitor(MRef). + +check_membership(Self, #gm_group{members = M} = Group) -> + case lists:member(Self, M) of + true -> + Group; + false -> + throw(lost_membership) + end. + +check_membership(GroupName) -> + case dirty_read_group(GroupName) of + #gm_group{members = M} -> + case lists:keymember(self(), 2, M) of + true -> + ok; + false -> + throw(lost_membership) + end; + {error, not_found} -> + throw(lost_membership) + end. |
