summaryrefslogtreecommitdiff
path: root/src/gm.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-01-24 18:27:38 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-01-24 18:27:38 +0000
commit60a09e95e6a3bb0f3383097e108a10d5269f0e2d (patch)
treecff4001e2d7dcded69dd810989a035c579ff2b62 /src/gm.erl
parent2142d1b62ef070cb413222920dac858ad53c33df (diff)
parentc177658c5547d4648f47b6dbb92e5e86dd84cb2b (diff)
downloadrabbitmq-server-git-60a09e95e6a3bb0f3383097e108a10d5269f0e2d.tar.gz
merge default into bug20337
Diffstat (limited to 'src/gm.erl')
-rw-r--r--src/gm.erl42
1 files changed, 28 insertions, 14 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 8c838a7056..6c89912213 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -386,6 +386,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(BROADCAST_TIMER, 25).
+-define(VERSION_START, 0).
-define(SETS, ordsets).
-define(DICT, orddict).
@@ -515,8 +516,8 @@ group_members(Server) ->
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
+ Self = make_member(GroupName),
gen_server2:cast(self(), join),
- Self = self(),
{ok, #state { self = Self,
left = {Self, undefined},
right = {Self, undefined},
@@ -541,7 +542,8 @@ handle_call({confirmed_broadcast, Msg}, _From,
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
@@ -604,7 +606,8 @@ handle_cast({broadcast, Msg},
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
- handle_callback_result({Module:handle_msg(Args, Self, Msg), State});
+ handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
+ State});
handle_cast({broadcast, Msg}, State) ->
internal_broadcast(Msg, none, State);
@@ -623,7 +626,7 @@ handle_cast(join, State = #state { self = Self,
State1 = check_neighbours(State #state { view = View,
members_state = MembersState }),
handle_callback_result(
- {Module:joined(Args, all_known_members(View)), State1});
+ {Module:joined(Args, get_pids(all_known_members(View))), State1});
handle_cast(leave, State) ->
{stop, normal, State}.
@@ -817,7 +820,7 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
- Result = Module:handle_msg(Args, Self, Msg),
+ Result = Module:handle_msg(Args, get_pid(Self), Msg),
Buffer1 = [{PubCount, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
@@ -979,7 +982,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
end,
try
case gen_server2:call(
- Left, {add_on_right, Self}, infinity) of
+ get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
not_ready -> join_group(Self, GroupName)
end
@@ -1005,7 +1008,7 @@ prune_or_create_group(Self, GroupName) ->
mnesia:sync_transaction(
fun () -> GroupNew = #gm_group { name = GroupName,
members = [Self],
- version = 0 },
+ version = ?VERSION_START },
case mnesia:read({?GROUP_TABLE, GroupName}) of
[] ->
mnesia:write(GroupNew),
@@ -1114,24 +1117,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
{Self, undefined};
ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
- ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
+ ok = gen_server2:cast(get_pid(RealNeighbour),
+ {?TAG, Ver, check_neighbours}),
{RealNeighbour, maybe_monitor(RealNeighbour, Self)};
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
true = erlang:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
- ok = gen_server2:cast(RealNeighbour, Msg),
+ ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
ok = case Neighbour of
Self -> ok;
- _ -> gen_server2:cast(Neighbour, Msg)
+ _ -> gen_server2:cast(get_pid(Neighbour), Msg)
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
maybe_monitor(Self, Self) ->
undefined;
maybe_monitor(Other, _Self) ->
- erlang:monitor(process, Other).
+ erlang:monitor(process, get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1238,6 +1242,15 @@ prepare_members_state(MembersState) ->
build_members_state(MembersStateList) ->
?DICT:from_list(MembersStateList).
+make_member(GroupName) ->
+ {case read_group(GroupName) of
+ #gm_group { version = Version } -> Version;
+ {error, not_found} -> ?VERSION_START
+ end, self()}.
+
+get_pid({_Version, Pid}) -> Pid.
+
+get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% ---------------------------------------------------------------------------
%% Activity assembly
@@ -1262,13 +1275,13 @@ maybe_send_activity(Activity, #state { self = Self,
send_right(Right, View, {activity, Self, Activity}).
send_right(Right, View, Msg) ->
- ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}).
+ ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
lists:foldl(
fun ({Id, Pubs, _Acks}, ok) ->
lists:foldl(fun ({_PubNum, Pub}, ok) ->
- Module:handle_msg(Args, Id, Pub);
+ Module:handle_msg(Args, get_pid(Id), Pub);
(_, Error) ->
Error
end, ok, Pubs);
@@ -1283,7 +1296,8 @@ callback_view_changed(Args, Module, OldView, NewView) ->
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> ok;
- _ -> Module:members_changed(Args, Births, Deaths)
+ _ -> Module:members_changed(Args, get_pids(Births),
+ get_pids(Deaths))
end.
handle_callback_result({Result, State}) ->