diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-07-14 18:08:44 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-07-14 18:08:44 +0100 |
| commit | 3f5f753966162d819aefc9a5e065ba9c5d58f7b8 (patch) | |
| tree | 633e35dd46149112983c5ad902e0c13bdef9812c /src/gm.erl | |
| parent | 98d6ac5ffd2af9784164eae6aa51ff95ebc18781 (diff) | |
| download | rabbitmq-server-git-3f5f753966162d819aefc9a5e065ba9c5d58f7b8.tar.gz | |
Revert bug26171, and ignore messages before the initial catchup.
Diffstat (limited to 'src/gm.erl')
| -rw-r--r-- | src/gm.erl | 77 |
1 files changed, 46 insertions, 31 deletions
diff --git a/src/gm.erl b/src/gm.erl index 696b7fa3e8..5863ebc41f 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -570,6 +570,11 @@ init([GroupName, Module, Args, TxnFun]) -> txn_executor = TxnFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({confirmed_broadcast, _Msg}, _From, + State = #state { members_state = undefined }) -> + reply(not_joined, State); + handle_call({confirmed_broadcast, Msg}, _From, State = #state { self = Self, right = {Self, undefined}, @@ -585,6 +590,10 @@ handle_call({confirmed_broadcast, Msg}, From, State) -> handle_callback_result({Result, flush_broadcast_buffer( State1 #state { confirms = Confirms1 })}); +handle_call(info, _From, + State = #state { members_state = undefined }) -> + reply(not_joined, State); + handle_call(info, _From, State = #state { group_name = GroupName, module = Module, view = View }) -> @@ -592,6 +601,10 @@ handle_call(info, _From, State = #state { group_name = GroupName, {module, Module}, {group_members, get_pids(alive_view_members(View))}], State); +handle_call({add_on_right, _NewMember}, _From, + State = #state { members_state = undefined }) -> + reply(not_ready, State); + handle_call({add_on_right, NewMember}, _From, State = #state { self = Self, group_name = GroupName, @@ -600,17 +613,19 @@ handle_call({add_on_right, NewMember}, _From, Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun), View1 = group_to_view(Group), MembersState1 = remove_erased_members(MembersState, View1), + ok = send_right(NewMember, View1, + {catchup, Self, prepare_members_state(MembersState1)}), {Result, State1} = change_view(View1, State #state { members_state = MembersState1 }), - Reply = {ok, Group, prepare_members_state(MembersState1)}, - handle_callback_result({Result, Reply, State1}). + handle_callback_result({Result, {ok, Group}, State1}). handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, members_state = MembersState, group_name = GroupName }) -> {Result, State1} = - case needs_view_update(ReqVer, View) of + case needs_view_update( + ReqVer, View) andalso MembersState =/= undefined of true -> View1 = group_to_view(dirty_read_group(GroupName)), MemberState1 = remove_erased_members(MembersState, View1), change_view(View1, State #state { @@ -621,6 +636,10 @@ handle_cast({?TAG, ReqVer, Msg}, if_callback_success( Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); +handle_cast({broadcast, _Msg, _SizeHint}, + State = #state { members_state = undefined }) -> + noreply(State); + handle_cast({broadcast, Msg, _SizeHint}, State = #state { self = Self, right = {Self, undefined}, @@ -639,17 +658,16 @@ handle_cast(join, State = #state { self = Self, module = Module, callback_args = Args, txn_executor = TxnFun }) -> - State1 = case join_group(Self, GroupName, TxnFun) of - {ok, View} -> - check_neighbours( - State#state{view = View, - members_state = blank_member_state()}); - {ok, View, Left, MembersState} -> - initial_catchup(Left, MembersState, - check_neighbours(State#state{view = View})) - end, - Members = get_pids(all_known_members(State1#state.view)), - handle_callback_result({Module:joined(Args, Members), State1}); + View = join_group(Self, GroupName, TxnFun), + MembersState = + case alive_view_members(View) of + [Self] -> blank_member_state(); + _ -> undefined + end, + State1 = check_neighbours(State #state { view = View, + members_state = MembersState }), + handle_callback_result( + {Module:joined(Args, get_pids(all_known_members(View))), State1}); handle_cast({validate_members, OldMembers}, State = #state { view = View, @@ -738,21 +756,21 @@ prioritise_info(_, _Len, _State) -> 0. -initial_catchup(Left, MembersStateLeft, - State = #state { self = Self, - left = {Left, _MRefL}, - right = {Right, _MRefR}, - view = View, - members_state = undefined }) -> - ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), - MembersStateLeft1 = build_members_state(MembersStateLeft), - State #state { members_state = MembersStateLeft1 }. - handle_msg(check_neighbours, State) -> %% no-op - it's already been done by the calling handle_cast {ok, State}; handle_msg({catchup, Left, MembersStateLeft}, + State = #state { self = Self, + left = {Left, _MRefL}, + right = {Right, _MRefR}, + view = View, + members_state = undefined }) -> + ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), + MembersStateLeft1 = build_members_state(MembersStateLeft), + {ok, State #state { members_state = MembersStateLeft1 }}; + +handle_msg({catchup, Left, MembersStateLeft}, State = #state { self = Self, left = {Left, _MRefL}, view = View, @@ -1026,11 +1044,11 @@ join_group(Self, GroupName, {error, not_found}, TxnFun) -> join_group(Self, GroupName, prune_or_create_group(Self, GroupName, TxnFun), TxnFun); join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) -> - {ok, group_to_view(Group)}; + group_to_view(Group); join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> case lists:member(Self, Members) of true -> - {ok, group_to_view(Group)}; + group_to_view(Group); false -> case lists:filter(fun is_member_alive/1, Members) of [] -> @@ -1049,11 +1067,8 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> end, try case neighbour_call(Left, {add_on_right, Self}) of - {ok, Group1, MembersState1} -> - {ok, group_to_view(Group1), Left, - build_members_state(MembersState1)}; - not_ready -> - join_group(Self, GroupName, TxnFun) + {ok, Group1} -> group_to_view(Group1); + not_ready -> join_group(Self, GroupName, TxnFun) end catch exit:{R, _} |
