summaryrefslogtreecommitdiff
path: root/src/gm.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gm.erl')
-rw-r--r--src/gm.erl286
1 files changed, 146 insertions, 140 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 01300f18af..f88ed18fbf 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -433,51 +433,47 @@
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
-spec(group_members/1 :: (pid()) -> [pid()]).
-%% The joined, members_changed and handle_msg callbacks can all
-%% return any of the following terms:
+%% The joined, members_changed and handle_msg callbacks can all return
+%% any of the following terms:
%%
%% 'ok' - the callback function returns normally
%%
-%% {'stop', Reason} - the callback indicates the member should
-%% stop with reason Reason and should leave the group.
+%% {'stop', Reason} - the callback indicates the member should stop
+%% with reason Reason and should leave the group.
%%
-%% {'become', Module, Args} - the callback indicates that the
-%% callback module should be changed to Module and that the
-%% callback functions should now be passed the arguments
-%% Args. This allows the callback module to be dynamically
-%% changed.
+%% {'become', Module, Args} - the callback indicates that the callback
+%% module should be changed to Module and that the callback functions
+%% should now be passed the arguments Args. This allows the callback
+%% module to be dynamically changed.
-%% Called when we've successfully joined the group. Supplied with
-%% Args provided in start_link, plus current group members.
+%% Called when we've successfully joined the group. Supplied with Args
+%% provided in start_link, plus current group members.
-callback joined(Args :: term(), Members :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
-%% Supplied with Args provided in start_link, the list of new
-%% members and the list of members previously known to us that
-%% have since died. Note that if a member joins and dies very
-%% quickly, it's possible that we will never see that member
-%% appear in either births or deaths. However we are guaranteed
-%% that (1) we will see a member joining either in the births
-%% here, or in the members passed to joined/2 before receiving
-%% any messages from it; and (2) we will not see members die that
-%% we have not seen born (or supplied in the members to
-%% joined/2).
+%% Supplied with Args provided in start_link, the list of new members
+%% and the list of members previously known to us that have since
+%% died. Note that if a member joins and dies very quickly, it's
+%% possible that we will never see that member appear in either births
+%% or deaths. However we are guaranteed that (1) we will see a member
+%% joining either in the births here, or in the members passed to
+%% joined/2 before receiving any messages from it; and (2) we will not
+%% see members die that we have not seen born (or supplied in the
+%% members to joined/2).
-callback members_changed(Args :: term(), Births :: [pid()],
Deaths :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
%% Supplied with Args provided in start_link, the sender, and the
-%% message. This does get called for messages injected by this
-%% member, however, in such cases, there is no special
-%% significance of this invocation: it does not indicate that the
-%% message has made it to any other members, let alone all other
-%% members.
+%% message. This does get called for messages injected by this member,
+%% however, in such cases, there is no special significance of this
+%% invocation: it does not indicate that the message has made it to
+%% any other members, let alone all other members.
-callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
-%% Called on gm member termination as per rules in gen_server,
-%% with the Args provided in start_link plus the termination
-%% Reason.
+%% Called on gm member termination as per rules in gen_server, with
+%% the Args provided in start_link plus the termination Reason.
-callback terminate(Args :: term(), Reason :: term()) ->
ok | term().
@@ -533,7 +529,7 @@ init([GroupName, Module, Args]) ->
group_name = GroupName,
module = Module,
view = undefined,
- pub_count = 0,
+ pub_count = -1,
members_state = undefined,
callback_args = Args,
confirms = queue:new(),
@@ -562,7 +558,7 @@ handle_call(group_members, _From,
reply(not_joined, State);
handle_call(group_members, _From, State = #state { view = View }) ->
- reply(alive_view_members(View), State);
+ reply(get_pids(alive_view_members(View)), State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
@@ -575,33 +571,39 @@ handle_call({add_on_right, NewMember}, _From,
members_state = MembersState,
module = Module,
callback_args = Args }) ->
- Group = record_new_member_in_group(
- GroupName, Self, NewMember,
- fun (Group1) ->
- View1 = group_to_view(Group1),
- ok = send_right(NewMember, View1,
- {catchup, Self, prepare_members_state(
- MembersState)})
- end),
+ {MembersState1, Group} =
+ record_new_member_in_group(
+ GroupName, Self, NewMember,
+ fun (Group1) ->
+ View1 = group_to_view(Group1),
+ MembersState1 = remove_erased_members(MembersState, View1),
+ ok = send_right(NewMember, View1,
+ {catchup, Self,
+ prepare_members_state(MembersState1)}),
+ MembersState1
+ end),
View2 = group_to_view(Group),
- State1 = check_neighbours(State #state { view = View2 }),
+ State1 = check_neighbours(State #state { view = View2,
+ members_state = MembersState1 }),
Result = callback_view_changed(Args, Module, View, View2),
handle_callback_result({Result, {ok, Group}, State1}).
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
+ members_state = MembersState,
group_name = GroupName,
module = Module,
callback_args = Args }) ->
{Result, State1} =
case needs_view_update(ReqVer, View) of
- true ->
- View1 = group_to_view(read_group(GroupName)),
- {callback_view_changed(Args, Module, View, View1),
- check_neighbours(State #state { view = View1 })};
- false ->
- {ok, State}
+ true -> View1 = group_to_view(read_group(GroupName)),
+ MemberState1 = remove_erased_members(MembersState, View1),
+ {callback_view_changed(Args, Module, View, View1),
+ check_neighbours(
+ State #state { view = View1,
+ members_state = MemberState1 })};
+ false -> {ok, State}
end,
handle_callback_result(
if_callback_success(
@@ -645,7 +647,7 @@ handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
-handle_info({'DOWN', MRef, process, _Pid, _Reason},
+handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
right = Right,
@@ -659,28 +661,29 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
{_, {Member1, MRef}} -> Member1;
_ -> undefined
end,
- case Member of
- undefined ->
+ case {Member, Reason} of
+ {undefined, _} ->
+ noreply(State);
+ {_, {shutdown, ring_shutdown}} ->
noreply(State);
_ ->
View1 =
group_to_view(record_dead_member_in_group(Member, GroupName)),
- State1 = State #state { view = View1 },
{Result, State2} =
case alive_view_members(View1) of
[Self] ->
- maybe_erase_aliases(
- State1 #state {
+ {Result1, State1} = maybe_erase_aliases(State, View1),
+ {Result1, State1 #state {
members_state = blank_member_state(),
- confirms = purge_confirms(Confirms) });
+ confirms = purge_confirms(Confirms) }};
_ ->
%% here we won't be pointing out any deaths:
%% the concern is that there maybe births
%% which we'd otherwise miss.
{callback_view_changed(Args, Module, View, View1),
- State1}
+ check_neighbours(State #state { view = View1 })}
end,
- handle_callback_result({Result, check_neighbours(State2)})
+ handle_callback_result({Result, State2})
end.
@@ -693,9 +696,13 @@ terminate(Reason, State = #state { module = Module,
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-prioritise_info(flush, _State) -> 1;
-prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1;
-prioritise_info(_ , _State) -> 0.
+prioritise_info(flush, _State) ->
+ 1;
+prioritise_info({'DOWN', _MRef, process, _Pid, _Reason},
+ #state { members_state = MS }) when MS /= undefined ->
+ 1;
+prioritise_info(_, _State) ->
+ 0.
handle_msg(check_neighbours, State) ->
@@ -795,8 +802,8 @@ handle_msg({activity, Left, Activity},
State1 = State #state { members_state = MembersState1,
confirms = Confirms1 },
Activity3 = activity_finalise(Activity1),
- {Result, State2} = maybe_erase_aliases(State1),
- ok = maybe_send_activity(Activity3, State2),
+ ok = maybe_send_activity(Activity3, State1),
+ {Result, State2} = maybe_erase_aliases(State1, View),
if_callback_success(
Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
@@ -829,13 +836,14 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
+ PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
- Buffer1 = [{PubCount, Msg} | Buffer],
+ Buffer1 = [{PubCount1, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
- _ -> queue:in({PubCount, From}, Confirms)
+ _ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount + 1,
+ State1 = State #state { pub_count = PubCount1,
confirms = Confirms1,
broadcast_buffer = Buffer1 },
case From =/= none of
@@ -850,14 +858,17 @@ flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
flush_broadcast_buffer(State = #state { self = Self,
members_state = MembersState,
- broadcast_buffer = Buffer }) ->
+ broadcast_buffer = Buffer,
+ pub_count = PubCount }) ->
+ [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount
Pubs = lists:reverse(Buffer),
Activity = activity_cons(Self, Pubs, [], activity_nil()),
ok = maybe_send_activity(activity_finalise(Activity), State),
MembersState1 = with_member(
fun (Member = #member { pending_ack = PA }) ->
PA1 = queue:join(PA, queue:from_list(Pubs)),
- Member #member { pending_ack = PA1 }
+ Member #member { pending_ack = PA1,
+ last_pub = PubCount }
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [] }.
@@ -867,11 +878,9 @@ flush_broadcast_buffer(State = #state { self = Self,
%% View construction and inspection
%% ---------------------------------------------------------------------------
-needs_view_update(ReqVer, {Ver, _View}) ->
- Ver < ReqVer.
+needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
-view_version({Ver, _View}) ->
- Ver.
+view_version({Ver, _View}) -> Ver.
is_member_alive({dead, _Member}) -> false;
is_member_alive(_) -> true.
@@ -890,17 +899,13 @@ store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
-fetch_view_member(Id, {_Ver, View}) ->
- ?DICT:fetch(Id, View).
+fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View).
-find_view_member(Id, {_Ver, View}) ->
- ?DICT:find(Id, View).
+find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View).
-blank_view(Ver) ->
- {Ver, ?DICT:new()}.
+blank_view(Ver) -> {Ver, ?DICT:new()}.
-alive_view_members({_Ver, View}) ->
- ?DICT:fetch_keys(View).
+alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View).
all_known_members({_Ver, View}) ->
?DICT:fold(
@@ -1052,7 +1057,7 @@ record_dead_member_in_group(Member, GroupName) ->
Group.
record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
- {atomic, Group} =
+ {atomic, {Result, Group}} =
mnesia:sync_transaction(
fun () ->
[#gm_group { members = Members, version = Ver } = Group1] =
@@ -1062,11 +1067,11 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) ->
Members1 = Prefix ++ [Left, NewMember | Suffix],
Group2 = Group1 #gm_group { members = Members1,
version = Ver + 1 },
- ok = Fun(Group2),
+ Result = Fun(Group2),
mnesia:write(Group2),
- Group2
+ {Result, Group2}
end),
- Group.
+ {Result, Group}.
erase_members_in_group(Members, GroupName) ->
DeadMembers = [{dead, Id} || Id <- Members],
@@ -1089,10 +1094,10 @@ erase_members_in_group(Members, GroupName) ->
maybe_erase_aliases(State = #state { self = Self,
group_name = GroupName,
- view = View,
+ view = View0,
members_state = MembersState,
module = Module,
- callback_args = Args }) ->
+ callback_args = Args }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
@@ -1107,11 +1112,11 @@ maybe_erase_aliases(State = #state { self = Self,
end, {[], MembersState}, Aliases),
State1 = State #state { members_state = MembersState1 },
case Erasable of
- [] -> {ok, State1};
+ [] -> {ok, State1 #state { view = View }};
_ -> View1 = group_to_view(
erase_members_in_group(Erasable, GroupName)),
- {callback_view_changed(Args, Module, View, View1),
- State1 #state { view = View1 }}
+ {callback_view_changed(Args, Module, View0, View1),
+ check_neighbours(State1 #state { view = View1 })}
end.
can_erase_view_member(Self, Self, _LA, _LP) -> false;
@@ -1141,10 +1146,8 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
-maybe_monitor(Self, Self) ->
- undefined;
-maybe_monitor(Other, _Self) ->
- erlang:monitor(process, get_pid(Other)).
+maybe_monitor( Self, Self) -> undefined;
+maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1233,23 +1236,19 @@ find_member_or_blank(Id, MembersState) ->
error -> blank_member()
end.
-erase_member(Id, MembersState) ->
- ?DICT:erase(Id, MembersState).
+erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
-blank_member_state() ->
- ?DICT:new().
+blank_member_state() -> ?DICT:new().
store_member(Id, MemberState, MembersState) ->
?DICT:store(Id, MemberState, MembersState).
-prepare_members_state(MembersState) ->
- ?DICT:to_list(MembersState).
+prepare_members_state(MembersState) -> ?DICT:to_list(MembersState).
-build_members_state(MembersStateList) ->
- ?DICT:from_list(MembersStateList).
+build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList).
make_member(GroupName) ->
{case read_group(GroupName) of
@@ -1257,6 +1256,12 @@ make_member(GroupName) ->
{error, not_found} -> ?VERSION_START
end, self()}.
+remove_erased_members(MembersState, View) ->
+ lists:foldl(fun (Id, MembersState1) ->
+ store_member(Id, find_member_or_blank(Id, MembersState),
+ MembersState1)
+ end, blank_member_state(), all_known_members(View)).
+
get_pid({_Version, Pid}) -> Pid.
get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
@@ -1265,16 +1270,12 @@ get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% Activity assembly
%% ---------------------------------------------------------------------------
-activity_nil() ->
- queue:new().
+activity_nil() -> queue:new().
-activity_cons(_Id, [], [], Tail) ->
- Tail;
-activity_cons(Sender, Pubs, Acks, Tail) ->
- queue:in({Sender, Pubs, Acks}, Tail).
+activity_cons( _Id, [], [], Tail) -> Tail;
+activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
-activity_finalise(Activity) ->
- queue:to_list(Activity).
+activity_finalise(Activity) -> queue:to_list(Activity).
maybe_send_activity([], _State) ->
ok;
@@ -1287,16 +1288,30 @@ send_right(Right, View, Msg) ->
ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
- lists:foldl(
- fun ({Id, Pubs, _Acks}, ok) ->
- lists:foldl(fun ({_PubNum, Pub}, ok) ->
- Module:handle_msg(Args, get_pid(Id), Pub);
- (_, Error) ->
- Error
- end, ok, Pubs);
- (_, Error) ->
- Error
- end, ok, Activity).
+ Result =
+ lists:foldl(
+ fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) ->
+ lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) ->
+ case Module2:handle_msg(
+ Args2, get_pid(Id), Pub) of
+ ok ->
+ Acc;
+ {become, Module3, Args3} ->
+ {Args3, Module3, ok};
+ {stop, _Reason} = Error ->
+ Error
+ end;
+ (_, Error = {stop, _Reason}) ->
+ Error
+ end, {Args1, Module1, ok}, Pubs);
+ (_, Error = {stop, _Reason}) ->
+ Error
+ end, {Args, Module, ok}, Activity),
+ case Result of
+ {Args, Module, ok} -> ok;
+ {Args1, Module1, ok} -> {become, Module1, Args1};
+ {stop, _Reason} = Error -> Error
+ end.
callback_view_changed(Args, Module, OldView, NewView) ->
OldMembers = all_known_members(OldView),
@@ -1364,34 +1379,25 @@ purge_confirms(Confirms) ->
%% Msg transformation
%% ---------------------------------------------------------------------------
-acks_from_queue(Q) ->
- [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
+acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
-pubs_from_queue(Q) ->
- queue:to_list(Q).
+pubs_from_queue(Q) -> queue:to_list(Q).
-queue_from_pubs(Pubs) ->
- queue:from_list(Pubs).
+queue_from_pubs(Pubs) -> queue:from_list(Pubs).
-apply_acks([], Pubs) ->
- Pubs;
-apply_acks(List, Pubs) ->
- {_, Pubs1} = queue:split(length(List), Pubs),
- Pubs1.
+apply_acks( [], Pubs) -> Pubs;
+apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
join_pubs(Q, []) -> Q;
join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
-last_ack([], LA) ->
- LA;
-last_ack(List, LA) ->
- LA1 = lists:last(List),
- true = LA1 > LA, %% ASSERTION
- LA1.
-
-last_pub([], LP) ->
- LP;
-last_pub(List, LP) ->
- {PubNum, _Msg} = lists:last(List),
- true = PubNum > LP, %% ASSERTION
- PubNum.
+last_ack( [], LA) -> LA;
+last_ack(List, LA) -> LA1 = lists:last(List),
+ true = LA1 > LA, %% ASSERTION
+ LA1.
+
+last_pub( [], LP) -> LP;
+last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
+ true = PubNum > LP, %% ASSERTION
+ PubNum.