diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-05-28 11:44:24 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-05-28 11:44:24 +0100 |
| commit | 38e963b91d43de1d19c7054baa89647e6c710d4d (patch) | |
| tree | 15bdcbb30031f5fdad8cad132a43abc7b2da90f8 /src | |
| parent | 5241f82dbdb0fe08b1da53957e6c0d3cc05298f1 (diff) | |
| parent | 94dce0cae5cb5164b7fff0d39b38828e6647277f (diff) | |
| download | rabbitmq-server-git-38e963b91d43de1d19c7054baa89647e6c710d4d.tar.gz | |
Merge bug 24961
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 189 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 18 |
2 files changed, 117 insertions, 90 deletions
diff --git a/src/gm.erl b/src/gm.erl index 01300f18af..97c81ec635 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -433,51 +433,47 @@ -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). -spec(group_members/1 :: (pid()) -> [pid()]). -%% The joined, members_changed and handle_msg callbacks can all -%% return any of the following terms: +%% The joined, members_changed and handle_msg callbacks can all return +%% any of the following terms: %% %% 'ok' - the callback function returns normally %% -%% {'stop', Reason} - the callback indicates the member should -%% stop with reason Reason and should leave the group. +%% {'stop', Reason} - the callback indicates the member should stop +%% with reason Reason and should leave the group. %% -%% {'become', Module, Args} - the callback indicates that the -%% callback module should be changed to Module and that the -%% callback functions should now be passed the arguments -%% Args. This allows the callback module to be dynamically -%% changed. +%% {'become', Module, Args} - the callback indicates that the callback +%% module should be changed to Module and that the callback functions +%% should now be passed the arguments Args. This allows the callback +%% module to be dynamically changed. -%% Called when we've successfully joined the group. Supplied with -%% Args provided in start_link, plus current group members. +%% Called when we've successfully joined the group. Supplied with Args +%% provided in start_link, plus current group members. -callback joined(Args :: term(), Members :: [pid()]) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. -%% Supplied with Args provided in start_link, the list of new -%% members and the list of members previously known to us that -%% have since died. Note that if a member joins and dies very -%% quickly, it's possible that we will never see that member -%% appear in either births or deaths. However we are guaranteed -%% that (1) we will see a member joining either in the births -%% here, or in the members passed to joined/2 before receiving -%% any messages from it; and (2) we will not see members die that -%% we have not seen born (or supplied in the members to -%% joined/2). +%% Supplied with Args provided in start_link, the list of new members +%% and the list of members previously known to us that have since +%% died. Note that if a member joins and dies very quickly, it's +%% possible that we will never see that member appear in either births +%% or deaths. However we are guaranteed that (1) we will see a member +%% joining either in the births here, or in the members passed to +%% joined/2 before receiving any messages from it; and (2) we will not +%% see members die that we have not seen born (or supplied in the +%% members to joined/2). -callback members_changed(Args :: term(), Births :: [pid()], Deaths :: [pid()]) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. %% Supplied with Args provided in start_link, the sender, and the -%% message. This does get called for messages injected by this -%% member, however, in such cases, there is no special -%% significance of this invocation: it does not indicate that the -%% message has made it to any other members, let alone all other -%% members. +%% message. This does get called for messages injected by this member, +%% however, in such cases, there is no special significance of this +%% invocation: it does not indicate that the message has made it to +%% any other members, let alone all other members. -callback handle_msg(Args :: term(), From :: pid(), Message :: term()) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. -%% Called on gm member termination as per rules in gen_server, -%% with the Args provided in start_link plus the termination -%% Reason. +%% Called on gm member termination as per rules in gen_server, with +%% the Args provided in start_link plus the termination Reason. -callback terminate(Args :: term(), Reason :: term()) -> ok | term(). @@ -533,7 +529,7 @@ init([GroupName, Module, Args]) -> group_name = GroupName, module = Module, view = undefined, - pub_count = 0, + pub_count = -1, members_state = undefined, callback_args = Args, confirms = queue:new(), @@ -575,33 +571,39 @@ handle_call({add_on_right, NewMember}, _From, members_state = MembersState, module = Module, callback_args = Args }) -> - Group = record_new_member_in_group( - GroupName, Self, NewMember, - fun (Group1) -> - View1 = group_to_view(Group1), - ok = send_right(NewMember, View1, - {catchup, Self, prepare_members_state( - MembersState)}) - end), + {MembersState1, Group} = + record_new_member_in_group( + GroupName, Self, NewMember, + fun (Group1) -> + View1 = group_to_view(Group1), + MembersState1 = remove_erased_members(MembersState, View1), + ok = send_right(NewMember, View1, + {catchup, Self, + prepare_members_state(MembersState1)}), + MembersState1 + end), View2 = group_to_view(Group), - State1 = check_neighbours(State #state { view = View2 }), + State1 = check_neighbours(State #state { view = View2, + members_state = MembersState1 }), Result = callback_view_changed(Args, Module, View, View2), handle_callback_result({Result, {ok, Group}, State1}). handle_cast({?TAG, ReqVer, Msg}, State = #state { view = View, + members_state = MembersState, group_name = GroupName, module = Module, callback_args = Args }) -> {Result, State1} = case needs_view_update(ReqVer, View) of - true -> - View1 = group_to_view(read_group(GroupName)), - {callback_view_changed(Args, Module, View, View1), - check_neighbours(State #state { view = View1 })}; - false -> - {ok, State} + true -> View1 = group_to_view(read_group(GroupName)), + MemberState1 = remove_erased_members(MembersState, View1), + {callback_view_changed(Args, Module, View, View1), + check_neighbours( + State #state { view = View1, + members_state = MemberState1 })}; + false -> {ok, State} end, handle_callback_result( if_callback_success( @@ -665,22 +667,21 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, _ -> View1 = group_to_view(record_dead_member_in_group(Member, GroupName)), - State1 = State #state { view = View1 }, {Result, State2} = case alive_view_members(View1) of [Self] -> - maybe_erase_aliases( - State1 #state { + {Result1, State1} = maybe_erase_aliases(State, View1), + {Result1, State1 #state { members_state = blank_member_state(), - confirms = purge_confirms(Confirms) }); + confirms = purge_confirms(Confirms) }}; _ -> %% here we won't be pointing out any deaths: %% the concern is that there maybe births %% which we'd otherwise miss. {callback_view_changed(Args, Module, View, View1), - State1} + check_neighbours(State #state { view = View1 })} end, - handle_callback_result({Result, check_neighbours(State2)}) + handle_callback_result({Result, State2}) end. @@ -693,9 +694,13 @@ terminate(Reason, State = #state { module = Module, code_change(_OldVsn, State, _Extra) -> {ok, State}. -prioritise_info(flush, _State) -> 1; -prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _State) -> 1; -prioritise_info(_ , _State) -> 0. +prioritise_info(flush, _State) -> + 1; +prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, + #state { members_state = MS }) when MS /= undefined -> + 1; +prioritise_info(_, _State) -> + 0. handle_msg(check_neighbours, State) -> @@ -795,8 +800,8 @@ handle_msg({activity, Left, Activity}, State1 = State #state { members_state = MembersState1, confirms = Confirms1 }, Activity3 = activity_finalise(Activity1), - {Result, State2} = maybe_erase_aliases(State1), - ok = maybe_send_activity(Activity3, State2), + ok = maybe_send_activity(Activity3, State1), + {Result, State2} = maybe_erase_aliases(State1, View), if_callback_success( Result, fun activity_true/3, fun activity_false/3, Activity3, State2); @@ -829,13 +834,14 @@ internal_broadcast(Msg, From, State = #state { self = Self, confirms = Confirms, callback_args = Args, broadcast_buffer = Buffer }) -> + PubCount1 = PubCount + 1, Result = Module:handle_msg(Args, get_pid(Self), Msg), - Buffer1 = [{PubCount, Msg} | Buffer], + Buffer1 = [{PubCount1, Msg} | Buffer], Confirms1 = case From of none -> Confirms; - _ -> queue:in({PubCount, From}, Confirms) + _ -> queue:in({PubCount1, From}, Confirms) end, - State1 = State #state { pub_count = PubCount + 1, + State1 = State #state { pub_count = PubCount1, confirms = Confirms1, broadcast_buffer = Buffer1 }, case From =/= none of @@ -850,14 +856,17 @@ flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) -> State; flush_broadcast_buffer(State = #state { self = Self, members_state = MembersState, - broadcast_buffer = Buffer }) -> + broadcast_buffer = Buffer, + pub_count = PubCount }) -> + [{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount Pubs = lists:reverse(Buffer), Activity = activity_cons(Self, Pubs, [], activity_nil()), ok = maybe_send_activity(activity_finalise(Activity), State), MembersState1 = with_member( fun (Member = #member { pending_ack = PA }) -> PA1 = queue:join(PA, queue:from_list(Pubs)), - Member #member { pending_ack = PA1 } + Member #member { pending_ack = PA1, + last_pub = PubCount } end, Self, MembersState), State #state { members_state = MembersState1, broadcast_buffer = [] }. @@ -1052,7 +1061,7 @@ record_dead_member_in_group(Member, GroupName) -> Group. record_new_member_in_group(GroupName, Left, NewMember, Fun) -> - {atomic, Group} = + {atomic, {Result, Group}} = mnesia:sync_transaction( fun () -> [#gm_group { members = Members, version = Ver } = Group1] = @@ -1062,11 +1071,11 @@ record_new_member_in_group(GroupName, Left, NewMember, Fun) -> Members1 = Prefix ++ [Left, NewMember | Suffix], Group2 = Group1 #gm_group { members = Members1, version = Ver + 1 }, - ok = Fun(Group2), + Result = Fun(Group2), mnesia:write(Group2), - Group2 + {Result, Group2} end), - Group. + {Result, Group}. erase_members_in_group(Members, GroupName) -> DeadMembers = [{dead, Id} || Id <- Members], @@ -1089,10 +1098,10 @@ erase_members_in_group(Members, GroupName) -> maybe_erase_aliases(State = #state { self = Self, group_name = GroupName, - view = View, + view = View0, members_state = MembersState, module = Module, - callback_args = Args }) -> + callback_args = Args }, View) -> #view_member { aliases = Aliases } = fetch_view_member(Self, View), {Erasable, MembersState1} = ?SETS:fold( @@ -1107,11 +1116,11 @@ maybe_erase_aliases(State = #state { self = Self, end, {[], MembersState}, Aliases), State1 = State #state { members_state = MembersState1 }, case Erasable of - [] -> {ok, State1}; + [] -> {ok, State1 #state { view = View }}; _ -> View1 = group_to_view( erase_members_in_group(Erasable, GroupName)), - {callback_view_changed(Args, Module, View, View1), - State1 #state { view = View1 }} + {callback_view_changed(Args, Module, View0, View1), + check_neighbours(State1 #state { view = View1 })} end. can_erase_view_member(Self, Self, _LA, _LP) -> false; @@ -1257,6 +1266,12 @@ make_member(GroupName) -> {error, not_found} -> ?VERSION_START end, self()}. +remove_erased_members(MembersState, View) -> + lists:foldl(fun (Id, MembersState1) -> + store_member(Id, find_member_or_blank(Id, MembersState), + MembersState1) + end, blank_member_state(), all_known_members(View)). + get_pid({_Version, Pid}) -> Pid. get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids]. @@ -1287,16 +1302,30 @@ send_right(Right, View, Msg) -> ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}). callback(Args, Module, Activity) -> - lists:foldl( - fun ({Id, Pubs, _Acks}, ok) -> - lists:foldl(fun ({_PubNum, Pub}, ok) -> - Module:handle_msg(Args, get_pid(Id), Pub); - (_, Error) -> - Error - end, ok, Pubs); - (_, Error) -> - Error - end, ok, Activity). + Result = + lists:foldl( + fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) -> + lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) -> + case Module2:handle_msg( + Args2, get_pid(Id), Pub) of + ok -> + Acc; + {become, Module3, Args3} -> + {Args3, Module3, ok}; + {stop, _Reason} = Error -> + Error + end; + (_, Error = {stop, _Reason}) -> + Error + end, {Args1, Module1, ok}, Pubs); + (_, Error = {stop, _Reason}) -> + Error + end, {Args, Module, ok}, Activity), + case Result of + {Args, Module, ok} -> ok; + {Args1, Module1, ok} -> {become, Module1, Args1}; + {stop, _Reason} = Error -> Error + end. callback_view_changed(Args, Module, OldView, NewView) -> OldMembers = all_known_members(OldView), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index dafb3f2ee7..49213c9552 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1466,16 +1466,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, %% determined based on which is growing faster. Whichever %% comes second may very well get a quota of 0 if the %% first manages to push out the max number of messages. - S1 -> {_, State2} = - lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> - ReduceFun(QuotaN, StateN) - end, - {S1, State}, - case (AvgAckIngress - AvgAckEgress) > - (AvgIngress - AvgEgress) of - true -> [AckFun, AlphaBetaFun]; - false -> [AlphaBetaFun, AckFun] - end), + S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) > + (AvgIngress - AvgEgress)) of + true -> [AckFun, AlphaBetaFun]; + false -> [AlphaBetaFun, AckFun] + end, + {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> + ReduceFun(QuotaN, StateN) + end, {S1, State}, Funs), {true, State2} end, |
