summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-19 00:54:51 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-19 00:54:51 +0000
commit7494ba44566e5b32a56c7b51f1bfce2f5acf8c56 (patch)
tree4370fe866e129b59bc63e695ed18ab934e5b5319 /src
parentd18ca8b0da9f16a47d19a0f8371cf5c84ca8a7e9 (diff)
downloadrabbitmq-server-git-7494ba44566e5b32a56c7b51f1bfce2f5acf8c56.tar.gz
Fixed a bug caused by revision 454fbb9127bd in rabbit-ha from where gm.erl came; Make gm_tests more robust; Avoid creating endless funs all the time for every message (substantial performance gain). Abstract use of dicts - expected use case is relatively small groups, thus orddict would normally be more appropriate
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl118
-rw-r--r--src/gm_test.erl10
2 files changed, 66 insertions, 62 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 0a6e346a22..6a2c9c4876 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -387,6 +387,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
-define(SETS, ordsets).
+-define(DICT, orddict).
-record(state,
{ self,
@@ -574,10 +575,7 @@ handle_cast({?TAG, ReqVer, Msg},
end,
handle_callback_result(
if_callback_success(
- Result,
- fun (_Result1, State2) -> handle_msg(Msg, State2) end,
- fun (Result1, State2) -> {Result1, State2} end,
- State1));
+ Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1));
handle_cast({broadcast, _Msg}, State = #state { members_state = undefined }) ->
noreply(State);
@@ -617,6 +615,9 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
left = Left,
right = Right,
group_name = GroupName,
+ view = View,
+ module = Module,
+ callback_args = Args,
confirms = Confirms }) ->
Member = case {Left, Right} of
{{Member1, MRef}, _} -> Member1;
@@ -638,7 +639,11 @@ handle_info({'DOWN', MRef, process, _Pid, _Reason},
members_state = blank_member_state(),
confirms = purge_confirms(Confirms) });
_ ->
- {ok, State1}
+ %% here we won't be pointing out any deaths:
+ %% the concern is that there maybe births
+ %% which we'd otherwise miss.
+ {callback_view_changed(Args, Module, View, View1),
+ State1}
end,
handle_callback_result({Result, check_neighbours(State2)})
end.
@@ -674,8 +679,8 @@ handle_msg({catchup, Left, MembersStateLeft},
members_state = MembersState })
when MembersState =/= undefined ->
MembersStateLeft1 = build_members_state(MembersStateLeft),
- AllMembers = lists:usort(dict:fetch_keys(MembersState) ++
- dict:fetch_keys(MembersStateLeft1)),
+ AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++
+ ?DICT:fetch_keys(MembersStateLeft1)),
{MembersState1, Activity} =
lists:foldl(
fun (Id, MembersStateActivity) ->
@@ -709,11 +714,9 @@ handle_msg({catchup, _NotLeft, _MembersState}, State) ->
handle_msg({activity, Left, Activity},
State = #state { self = Self,
left = {Left, _MRefL},
- module = Module,
view = View,
members_state = MembersState,
- confirms = Confirms,
- callback_args = Args })
+ confirms = Confirms })
when MembersState =/= undefined ->
{MembersState1, {Confirms1, Activity1}} =
lists:foldl(
@@ -755,31 +758,18 @@ handle_msg({activity, Left, Activity},
{Result, State2} = maybe_erase_aliases(State1),
ok = maybe_send_activity(Activity3, State2),
if_callback_success(
- Result,
- fun (_Result1, State3) -> {callback(Args, Module, Activity3), State3} end,
- fun (Result1, State3) -> {Result1, State3} end,
- State2);
+ Result, fun activity_true/3, fun activity_false/3, Activity3, State2);
handle_msg({activity, _NotLeft, _Activity}, State) ->
{ok, State}.
noreply(State) ->
- ok = a(State),
{noreply, State, hibernate}.
reply(Reply, State) ->
- ok = a(State),
{reply, Reply, State, hibernate}.
-a(#state { view = undefined }) ->
- ok;
-a(#state { self = Self,
- left = {Left, _MRefL},
- view = View }) ->
- #view_member { left = Left } = fetch_view_member(Self, View),
- ok.
-
internal_broadcast(Msg, From, State = #state { self = Self,
pub_count = PubCount,
members_state = MembersState,
@@ -826,25 +816,25 @@ is_member_alias(Member, Self, View) ->
dead_member_id({dead, Member}) -> Member.
store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
- {Ver, dict:store(Id, VMember, View)}.
+ {Ver, ?DICT:store(Id, VMember, View)}.
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
fetch_view_member(Id, {_Ver, View}) ->
- dict:fetch(Id, View).
+ ?DICT:fetch(Id, View).
find_view_member(Id, {_Ver, View}) ->
- dict:find(Id, View).
+ ?DICT:find(Id, View).
blank_view(Ver) ->
- {Ver, dict:new()}.
+ {Ver, ?DICT:new()}.
alive_view_members({_Ver, View}) ->
- dict:fetch_keys(View).
+ ?DICT:fetch_keys(View).
all_known_members({_Ver, View}) ->
- dict:fold(
+ ?DICT:fold(
fun (Member, #view_member { aliases = Aliases }, Acc) ->
?SETS:to_list(Aliases) ++ [Member | Acc]
end, [], View).
@@ -1155,28 +1145,28 @@ with_member_acc(Fun, Id, {MembersState, Acc}) ->
{store_member(Id, MemberState, MembersState), Acc1}.
find_member_or_blank(Id, MembersState) ->
- case dict:find(Id, MembersState) of
+ case ?DICT:find(Id, MembersState) of
{ok, Result} -> Result;
error -> blank_member()
end.
erase_member(Id, MembersState) ->
- dict:erase(Id, MembersState).
+ ?DICT:erase(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
blank_member_state() ->
- dict:new().
+ ?DICT:new().
store_member(Id, MemberState, MembersState) ->
- dict:store(Id, MemberState, MembersState).
+ ?DICT:store(Id, MemberState, MembersState).
prepare_members_state(MembersState) ->
- dict:to_list(MembersState).
+ ?DICT:to_list(MembersState).
build_members_state(MembersStateList) ->
- dict:from_list(MembersStateList).
+ ?DICT:from_list(MembersStateList).
%% ---------------------------------------------------------------------------
@@ -1228,24 +1218,34 @@ callback_view_changed(Args, Module, OldView, NewView) ->
handle_callback_result({Result, State}) ->
if_callback_success(
- Result,
- fun (_Result, State1) -> noreply(State1) end,
- fun ({stop, Reason}, State1) -> {stop, Reason, State1} end,
- State);
+ Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State);
handle_callback_result({Result, Reply, State}) ->
if_callback_success(
- Result,
- fun (_Result, State1) -> reply(Reply, State1) end,
- fun ({stop, Reason}, State1) -> {stop, Reason, Reply, State1} end,
- State).
-
-if_callback_success(ok, True, _False, State) ->
- True(ok, State);
-if_callback_success({become, Module, Args} = Result, True, _False, State) ->
- True(Result, State #state { module = Module,
- callback_args = Args });
-if_callback_success({stop, _Reason} = Result, _True, False, State) ->
- False(Result, State).
+ Result, fun reply_true/3, fun reply_false/3, Reply, State).
+
+no_reply_true (_Result, _Undefined, State) -> noreply(State).
+no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}.
+
+reply_true (_Result, Reply, State) -> reply(Reply, State).
+reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}.
+
+handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State).
+handle_msg_false(Result, _Msg, State) -> {Result, State}.
+
+activity_true(_Result, Activity, State = #state { module = Module,
+ callback_args = Args }) ->
+ {callback(Args, Module, Activity), State}.
+activity_false(Result, _Activity, State) ->
+ {Result, State}.
+
+if_callback_success(ok, True, _False, Arg, State) ->
+ True(ok, Arg, State);
+if_callback_success(
+ {become, Module, Args} = Result, True, _False, Arg, State) ->
+ True(Result, Arg, State #state { module = Module,
+ callback_args = Args });
+if_callback_success({stop, _Reason} = Result, _True, False, Arg, State) ->
+ False(Result, Arg, State).
maybe_confirm(_Self, _Id, Confirms, []) ->
Confirms;
@@ -1282,14 +1282,12 @@ queue_from_pubs(Pubs) ->
apply_acks([], Pubs) ->
Pubs;
-apply_acks([PubNum | Acks], Pubs) ->
- {{value, {PubNum, _Msg}}, Pubs1} = queue:out(Pubs),
- apply_acks(Acks, Pubs1).
-
-join_pubs(Q, []) ->
- Q;
-join_pubs(Q, Pubs) ->
- queue:join(Q, queue_from_pubs(Pubs)).
+apply_acks(List, Pubs) ->
+ {_, Pubs1} = queue:split(length(List), Pubs),
+ Pubs1.
+
+join_pubs(Q, []) -> Q;
+join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
last_ack([], LA) ->
LA;
diff --git a/src/gm_test.erl b/src/gm_test.erl
index aebfbb69ee..e8f2859832 100644
--- a/src/gm_test.erl
+++ b/src/gm_test.erl
@@ -59,7 +59,7 @@ members_changed([], Births, Deaths) ->
lists:foldl(
fun (Died, StateN) ->
true = dict:is_key(Died, StateN),
- dict:erase(Died, StateN)
+ dict:store(Died, died, StateN)
end, State1, Deaths)
end),
ok.
@@ -69,6 +69,9 @@ handle_msg([], From, {test_msg, Num}) ->
with_state(
fun (State) ->
ok = case dict:find(From, State) of
+ {ok, died} ->
+ exit({{from, From},
+ {received_posthumous_delivery, Num}});
{ok, empty} -> ok;
{ok, Num} -> ok;
{ok, Num1} when Num < Num1 ->
@@ -78,7 +81,10 @@ handle_msg([], From, {test_msg, Num}) ->
{ok, Num1} ->
exit({{from, From},
{missing_delivery_of, Num},
- {received_early, Num1}})
+ {received_early, Num1}});
+ error ->
+ exit({{from, From},
+ {received_premature_delivery, Num}})
end,
dict:store(From, Num + 1, State)
end),