diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-19 00:54:51 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-19 00:54:51 +0000 |
| commit | 7494ba44566e5b32a56c7b51f1bfce2f5acf8c56 (patch) | |
| tree | 4370fe866e129b59bc63e695ed18ab934e5b5319 /src | |
| parent | d18ca8b0da9f16a47d19a0f8371cf5c84ca8a7e9 (diff) | |
| download | rabbitmq-server-git-7494ba44566e5b32a56c7b51f1bfce2f5acf8c56.tar.gz | |
Fixed a bug caused by revision 454fbb9127bd in rabbit-ha from where gm.erl came; Make gm_tests more robust; Avoid creating endless funs all the time for every message (substantial performance gain). Abstract use of dicts - expected use case is relatively small groups, thus orddict would normally be more appropriate
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 118 | ||||
| -rw-r--r-- | src/gm_test.erl | 10 |
2 files changed, 66 insertions, 62 deletions
diff --git a/src/gm.erl b/src/gm.erl index 0a6e346a22..6a2c9c4876 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -387,6 +387,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). -define(SETS, ordsets). +-define(DICT, orddict). -record(state, { self, @@ -574,10 +575,7 @@ handle_cast({?TAG, ReqVer, Msg}, end, handle_callback_result( if_callback_success( - Result, - fun (_Result1, State2) -> handle_msg(Msg, State2) end, - fun (Result1, State2) -> {Result1, State2} end, - State1)); + Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1)); handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) -> noreply(State); @@ -617,6 +615,9 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, left = Left, right = Right, group_name = GroupName, + view = View, + module = Module, + callback_args = Args, confirms = Confirms }) -> Member = case {Left, Right} of {{Member1, MRef}, _} -> Member1; @@ -638,7 +639,11 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason}, members_state = blank_member_state(), confirms = purge_confirms(Confirms) }); _ -> - {ok, State1} + %% here we won't be pointing out any deaths: + %% the concern is that there maybe births + %% which we'd otherwise miss. + {callback_view_changed(Args, Module, View, View1), + State1} end, handle_callback_result({Result, check_neighbours(State2)}) end. @@ -674,8 +679,8 @@ handle_msg({catchup, Left, MembersStateLeft}, members_state = MembersState }) when MembersState =/= undefined -> MembersStateLeft1 = build_members_state(MembersStateLeft), - AllMembers = lists:usort(dict:fetch_keys(MembersState) ++ - dict:fetch_keys(MembersStateLeft1)), + AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++ + ?DICT:fetch_keys(MembersStateLeft1)), {MembersState1, Activity} = lists:foldl( fun (Id, MembersStateActivity) -> @@ -709,11 +714,9 @@ handle_msg({catchup, _NotLeft, _MembersState}, State) -> handle_msg({activity, Left, Activity}, State = #state { self = Self, left = {Left, _MRefL}, - module = Module, view = View, members_state = MembersState, - confirms = Confirms, - callback_args = Args }) + confirms = Confirms }) when MembersState =/= undefined -> {MembersState1, {Confirms1, Activity1}} = lists:foldl( @@ -755,31 +758,18 @@ handle_msg({activity, Left, Activity}, {Result, State2} = maybe_erase_aliases(State1), ok = maybe_send_activity(Activity3, State2), if_callback_success( - Result, - fun (_Result1, State3) -> {callback(Args, Module, Activity3), State3} end, - fun (Result1, State3) -> {Result1, State3} end, - State2); + Result, fun activity_true/3, fun activity_false/3, Activity3, State2); handle_msg({activity, _NotLeft, _Activity}, State) -> {ok, State}. noreply(State) -> - ok = a(State), {noreply, State, hibernate}. reply(Reply, State) -> - ok = a(State), {reply, Reply, State, hibernate}. -a(#state { view = undefined }) -> - ok; -a(#state { self = Self, - left = {Left, _MRefL}, - view = View }) -> - #view_member { left = Left } = fetch_view_member(Self, View), - ok. - internal_broadcast(Msg, From, State = #state { self = Self, pub_count = PubCount, members_state = MembersState, @@ -826,25 +816,25 @@ is_member_alias(Member, Self, View) -> dead_member_id({dead, Member}) -> Member. store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> - {Ver, dict:store(Id, VMember, View)}. + {Ver, ?DICT:store(Id, VMember, View)}. with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). fetch_view_member(Id, {_Ver, View}) -> - dict:fetch(Id, View). + ?DICT:fetch(Id, View). find_view_member(Id, {_Ver, View}) -> - dict:find(Id, View). + ?DICT:find(Id, View). blank_view(Ver) -> - {Ver, dict:new()}. + {Ver, ?DICT:new()}. alive_view_members({_Ver, View}) -> - dict:fetch_keys(View). + ?DICT:fetch_keys(View). all_known_members({_Ver, View}) -> - dict:fold( + ?DICT:fold( fun (Member, #view_member { aliases = Aliases }, Acc) -> ?SETS:to_list(Aliases) ++ [Member | Acc] end, [], View). @@ -1155,28 +1145,28 @@ with_member_acc(Fun, Id, {MembersState, Acc}) -> {store_member(Id, MemberState, MembersState), Acc1}. find_member_or_blank(Id, MembersState) -> - case dict:find(Id, MembersState) of + case ?DICT:find(Id, MembersState) of {ok, Result} -> Result; error -> blank_member() end. erase_member(Id, MembersState) -> - dict:erase(Id, MembersState). + ?DICT:erase(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. blank_member_state() -> - dict:new(). + ?DICT:new(). store_member(Id, MemberState, MembersState) -> - dict:store(Id, MemberState, MembersState). + ?DICT:store(Id, MemberState, MembersState). prepare_members_state(MembersState) -> - dict:to_list(MembersState). + ?DICT:to_list(MembersState). build_members_state(MembersStateList) -> - dict:from_list(MembersStateList). + ?DICT:from_list(MembersStateList). %% --------------------------------------------------------------------------- @@ -1228,24 +1218,34 @@ callback_view_changed(Args, Module, OldView, NewView) -> handle_callback_result({Result, State}) -> if_callback_success( - Result, - fun (_Result, State1) -> noreply(State1) end, - fun ({stop, Reason}, State1) -> {stop, Reason, State1} end, - State); + Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State); handle_callback_result({Result, Reply, State}) -> if_callback_success( - Result, - fun (_Result, State1) -> reply(Reply, State1) end, - fun ({stop, Reason}, State1) -> {stop, Reason, Reply, State1} end, - State). - -if_callback_success(ok, True, _False, State) -> - True(ok, State); -if_callback_success({become, Module, Args} = Result, True, _False, State) -> - True(Result, State #state { module = Module, - callback_args = Args }); -if_callback_success({stop, _Reason} = Result, _True, False, State) -> - False(Result, State). + Result, fun reply_true/3, fun reply_false/3, Reply, State). + +no_reply_true (_Result, _Undefined, State) -> noreply(State). +no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}. + +reply_true (_Result, Reply, State) -> reply(Reply, State). +reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}. + +handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State). +handle_msg_false(Result, _Msg, State) -> {Result, State}. + +activity_true(_Result, Activity, State = #state { module = Module, + callback_args = Args }) -> + {callback(Args, Module, Activity), State}. +activity_false(Result, _Activity, State) -> + {Result, State}. + +if_callback_success(ok, True, _False, Arg, State) -> + True(ok, Arg, State); +if_callback_success( + {become, Module, Args} = Result, True, _False, Arg, State) -> + True(Result, Arg, State #state { module = Module, + callback_args = Args }); +if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) -> + False(Result, Arg, State). maybe_confirm(_Self, _Id, Confirms, []) -> Confirms; @@ -1282,14 +1282,12 @@ queue_from_pubs(Pubs) -> apply_acks([], Pubs) -> Pubs; -apply_acks([PubNum | Acks], Pubs) -> - {{value, {PubNum, _Msg}}, Pubs1} = queue:out(Pubs), - apply_acks(Acks, Pubs1). - -join_pubs(Q, []) -> - Q; -join_pubs(Q, Pubs) -> - queue:join(Q, queue_from_pubs(Pubs)). +apply_acks(List, Pubs) -> + {_, Pubs1} = queue:split(length(List), Pubs), + Pubs1. + +join_pubs(Q, []) -> Q; +join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)). last_ack([], LA) -> LA; diff --git a/src/gm_test.erl b/src/gm_test.erl index aebfbb69ee..e8f2859832 100644 --- a/src/gm_test.erl +++ b/src/gm_test.erl @@ -59,7 +59,7 @@ members_changed([], Births, Deaths) -> lists:foldl( fun (Died, StateN) -> true = dict:is_key(Died, StateN), - dict:erase(Died, StateN) + dict:store(Died, died, StateN) end, State1, Deaths) end), ok. @@ -69,6 +69,9 @@ handle_msg([], From, {test_msg, Num}) -> with_state( fun (State) -> ok = case dict:find(From, State) of + {ok, died} -> + exit({{from, From}, + {received_posthumous_delivery, Num}}); {ok, empty} -> ok; {ok, Num} -> ok; {ok, Num1} when Num < Num1 -> @@ -78,7 +81,10 @@ handle_msg([], From, {test_msg, Num}) -> {ok, Num1} -> exit({{from, From}, {missing_delivery_of, Num}, - {received_early, Num1}}) + {received_early, Num1}}); + error -> + exit({{from, From}, + {received_premature_delivery, Num}}) end, dict:store(From, Num + 1, State) end), |
