summaryrefslogtreecommitdiff
path: root/src/gm.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-07-14 18:08:44 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-07-14 18:08:44 +0100
commit3f5f753966162d819aefc9a5e065ba9c5d58f7b8 (patch)
tree633e35dd46149112983c5ad902e0c13bdef9812c /src/gm.erl
parent98d6ac5ffd2af9784164eae6aa51ff95ebc18781 (diff)
downloadrabbitmq-server-git-3f5f753966162d819aefc9a5e065ba9c5d58f7b8.tar.gz
Revert bug26171, and ignore messages before the initial catchup.
Diffstat (limited to 'src/gm.erl')
-rw-r--r--src/gm.erl77
1 files changed, 46 insertions, 31 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 696b7fa3e8..5863ebc41f 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -570,6 +570,11 @@ init([GroupName, Module, Args, TxnFun]) ->
txn_executor = TxnFun }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call({confirmed_broadcast, _Msg}, _From,
+ State = #state { members_state = undefined }) ->
+ reply(not_joined, State);
+
handle_call({confirmed_broadcast, Msg}, _From,
State = #state { self = Self,
right = {Self, undefined},
@@ -585,6 +590,10 @@ handle_call({confirmed_broadcast, Msg}, From, State) ->
handle_callback_result({Result, flush_broadcast_buffer(
State1 #state { confirms = Confirms1 })});
+handle_call(info, _From,
+ State = #state { members_state = undefined }) ->
+ reply(not_joined, State);
+
handle_call(info, _From, State = #state { group_name = GroupName,
module = Module,
view = View }) ->
@@ -592,6 +601,10 @@ handle_call(info, _From, State = #state { group_name = GroupName,
{module, Module},
{group_members, get_pids(alive_view_members(View))}], State);
+handle_call({add_on_right, _NewMember}, _From,
+ State = #state { members_state = undefined }) ->
+ reply(not_ready, State);
+
handle_call({add_on_right, NewMember}, _From,
State = #state { self = Self,
group_name = GroupName,
@@ -600,17 +613,19 @@ handle_call({add_on_right, NewMember}, _From,
Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun),
View1 = group_to_view(Group),
MembersState1 = remove_erased_members(MembersState, View1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self, prepare_members_state(MembersState1)}),
{Result, State1} = change_view(View1, State #state {
members_state = MembersState1 }),
- Reply = {ok, Group, prepare_members_state(MembersState1)},
- handle_callback_result({Result, Reply, State1}).
+ handle_callback_result({Result, {ok, Group}, State1}).
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
members_state = MembersState,
group_name = GroupName }) ->
{Result, State1} =
- case needs_view_update(ReqVer, View) of
+ case needs_view_update(
+ ReqVer, View) andalso MembersState =/= undefined of
true -> View1 = group_to_view(dirty_read_group(GroupName)),
MemberState1 = remove_erased_members(MembersState, View1),
change_view(View1, State #state {
@@ -621,6 +636,10 @@ handle_cast({?TAG, ReqVer, Msg},
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
+handle_cast({broadcast, _Msg, _SizeHint},
+ State = #state { members_state = undefined }) ->
+ noreply(State);
+
handle_cast({broadcast, Msg, _SizeHint},
State = #state { self = Self,
right = {Self, undefined},
@@ -639,17 +658,16 @@ handle_cast(join, State = #state { self = Self,
module = Module,
callback_args = Args,
txn_executor = TxnFun }) ->
- State1 = case join_group(Self, GroupName, TxnFun) of
- {ok, View} ->
- check_neighbours(
- State#state{view = View,
- members_state = blank_member_state()});
- {ok, View, Left, MembersState} ->
- initial_catchup(Left, MembersState,
- check_neighbours(State#state{view = View}))
- end,
- Members = get_pids(all_known_members(State1#state.view)),
- handle_callback_result({Module:joined(Args, Members), State1});
+ View = join_group(Self, GroupName, TxnFun),
+ MembersState =
+ case alive_view_members(View) of
+ [Self] -> blank_member_state();
+ _ -> undefined
+ end,
+ State1 = check_neighbours(State #state { view = View,
+ members_state = MembersState }),
+ handle_callback_result(
+ {Module:joined(Args, get_pids(all_known_members(View))), State1});
handle_cast({validate_members, OldMembers},
State = #state { view = View,
@@ -738,21 +756,21 @@ prioritise_info(_, _Len, _State) ->
0.
-initial_catchup(Left, MembersStateLeft,
- State = #state { self = Self,
- left = {Left, _MRefL},
- right = {Right, _MRefR},
- view = View,
- members_state = undefined }) ->
- ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
- MembersStateLeft1 = build_members_state(MembersStateLeft),
- State #state { members_state = MembersStateLeft1 }.
-
handle_msg(check_neighbours, State) ->
%% no-op - it's already been done by the calling handle_cast
{ok, State};
handle_msg({catchup, Left, MembersStateLeft},
+ State = #state { self = Self,
+ left = {Left, _MRefL},
+ right = {Right, _MRefR},
+ view = View,
+ members_state = undefined }) ->
+ ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
+ MembersStateLeft1 = build_members_state(MembersStateLeft),
+ {ok, State #state { members_state = MembersStateLeft1 }};
+
+handle_msg({catchup, Left, MembersStateLeft},
State = #state { self = Self,
left = {Left, _MRefL},
view = View,
@@ -1026,11 +1044,11 @@ join_group(Self, GroupName, {error, not_found}, TxnFun) ->
join_group(Self, GroupName,
prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
- {ok, group_to_view(Group)};
+ group_to_view(Group);
join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:member(Self, Members) of
true ->
- {ok, group_to_view(Group)};
+ group_to_view(Group);
false ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
@@ -1049,11 +1067,8 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
end,
try
case neighbour_call(Left, {add_on_right, Self}) of
- {ok, Group1, MembersState1} ->
- {ok, group_to_view(Group1), Left,
- build_members_state(MembersState1)};
- not_ready ->
- join_group(Self, GroupName, TxnFun)
+ {ok, Group1} -> group_to_view(Group1);
+ not_ready -> join_group(Self, GroupName, TxnFun)
end
catch
exit:{R, _}