diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-01-24 18:27:38 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-01-24 18:27:38 +0000 |
| commit | 60a09e95e6a3bb0f3383097e108a10d5269f0e2d (patch) | |
| tree | cff4001e2d7dcded69dd810989a035c579ff2b62 /src/gm.erl | |
| parent | 2142d1b62ef070cb413222920dac858ad53c33df (diff) | |
| parent | c177658c5547d4648f47b6dbb92e5e86dd84cb2b (diff) | |
| download | rabbitmq-server-git-60a09e95e6a3bb0f3383097e108a10d5269f0e2d.tar.gz | |
merge default into bug20337
Diffstat (limited to 'src/gm.erl')
| -rw-r--r-- | src/gm.erl | 42 |
1 files changed, 28 insertions, 14 deletions
diff --git a/src/gm.erl b/src/gm.erl index 8c838a7056..6c89912213 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -386,6 +386,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(BROADCAST_TIMER, 25). +-define(VERSION_START, 0). -define(SETS, ordsets). -define(DICT, orddict). @@ -515,8 +516,8 @@ group_members(Server) -> init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), + Self = make_member(GroupName), gen_server2:cast(self(), join), - Self = self(), {ok, #state { self = Self, left = {Self, undefined}, right = {Self, undefined}, @@ -541,7 +542,8 @@ handle_call({confirmed_broadcast, Msg}, _From, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> internal_broadcast(Msg, From, State); @@ -604,7 +606,8 @@ handle_cast({broadcast, Msg}, right = {Self, undefined}, module = Module, callback_args = Args }) -> - handle_callback_result({Module:handle_msg(Args, Self, Msg), State}); + handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg), + State}); handle_cast({broadcast, Msg}, State) -> internal_broadcast(Msg, none, State); @@ -623,7 +626,7 @@ handle_cast(join, State = #state { self = Self, State1 = check_neighbours(State #state { view = View, members_state = MembersState }), handle_callback_result( - {Module:joined(Args, all_known_members(View)), State1}); + {Module:joined(Args, get_pids(all_known_members(View))), State1}); handle_cast(leave, State) -> {stop, normal, State}. @@ -817,7 +820,7 @@ internal_broadcast(Msg, From, State = #state { self = Self, confirms = Confirms, callback_args = Args, broadcast_buffer = Buffer }) -> - Result = Module:handle_msg(Args, Self, Msg), + Result = Module:handle_msg(Args, get_pid(Self), Msg), Buffer1 = [{PubCount, Msg} | Buffer], Confirms1 = case From of none -> Confirms; @@ -979,7 +982,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) -> end, try case gen_server2:call( - Left, {add_on_right, Self}, infinity) of + get_pid(Left), {add_on_right, Self}, infinity) of {ok, Group1} -> group_to_view(Group1); not_ready -> join_group(Self, GroupName) end @@ -1005,7 +1008,7 @@ prune_or_create_group(Self, GroupName) -> mnesia:sync_transaction( fun () -> GroupNew = #gm_group { name = GroupName, members = [Self], - version = 0 }, + version = ?VERSION_START }, case mnesia:read({?GROUP_TABLE, GroupName}) of [] -> mnesia:write(GroupNew), @@ -1114,24 +1117,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false. ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> {Self, undefined}; ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> - ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}), + ok = gen_server2:cast(get_pid(RealNeighbour), + {?TAG, Ver, check_neighbours}), {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> true = erlang:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, - ok = gen_server2:cast(RealNeighbour, Msg), + ok = gen_server2:cast(get_pid(RealNeighbour), Msg), ok = case Neighbour of Self -> ok; - _ -> gen_server2:cast(Neighbour, Msg) + _ -> gen_server2:cast(get_pid(Neighbour), Msg) end, {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor(Self, Self) -> undefined; maybe_monitor(Other, _Self) -> - erlang:monitor(process, Other). + erlang:monitor(process, get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1238,6 +1242,15 @@ prepare_members_state(MembersState) -> build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). +make_member(GroupName) -> + {case read_group(GroupName) of + #gm_group { version = Version } -> Version; + {error, not_found} -> ?VERSION_START + end, self()}. + +get_pid({_Version, Pid}) -> Pid. + +get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. %% --------------------------------------------------------------------------- %% Activity assembly @@ -1262,13 +1275,13 @@ maybe_send_activity(Activity, #state { self = Self, send_right(Right, View, {activity, Self, Activity}). send_right(Right, View, Msg) -> - ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}). + ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}). callback(Args, Module, Activity) -> lists:foldl( fun ({Id, Pubs, _Acks}, ok) -> lists:foldl(fun ({_PubNum, Pub}, ok) -> - Module:handle_msg(Args, Id, Pub); + Module:handle_msg(Args, get_pid(Id), Pub); (_, Error) -> Error end, ok, Pubs); @@ -1283,7 +1296,8 @@ callback_view_changed(Args, Module, OldView, NewView) -> Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> ok; - _ -> Module:members_changed(Args, Births, Deaths) + _ -> Module:members_changed(Args, get_pids(Births), + get_pids(Deaths)) end. handle_callback_result({Result, State}) -> |
