diff options
| -rwxr-xr-x | quickcheck | 5 | ||||
| -rw-r--r-- | src/gm.erl | 97 | ||||
| -rw-r--r-- | src/gm_qc.erl | 255 |
3 files changed, 226 insertions, 131 deletions
diff --git a/quickcheck b/quickcheck index 40f130919f..565b19304c 100755 --- a/quickcheck +++ b/quickcheck @@ -15,7 +15,10 @@ main([NodeStr, ModStr, TrialsStr]) -> case rpc:call(Node, code, ensure_loaded, [proper]) of {module, proper} -> case rpc:call(Node, proper, module, - [Mod] ++ [[{numtests, Trials}, {constraint_tries, 200}]]) of + [Mod] ++ [[{numtests, Trials}, + {constraint_tries, 200}, + {start_size, 5}, + long_result]]) of [] -> ok; R -> io:format("~p.~n", [R]), quit(1) diff --git a/src/gm.erl b/src/gm.erl index 7a2f5835b0..4065b17182 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -382,7 +382,8 @@ -behaviour(gen_server2). --export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3, +-export([create_tables/0, start_link/4, start_link/6, + leave/1, broadcast/2, broadcast/3, confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -417,7 +418,9 @@ broadcast_buffer, broadcast_buffer_sz, broadcast_timer, - txn_executor + txn_executor, + call, + cast }). -record(gm_group, { name, version, members }). @@ -519,7 +522,14 @@ table_definitions() -> [{Name, [?TABLE_MATCH | Attributes]}]. start_link(GroupName, Module, Args, TxnFun) -> - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). + start_link(GroupName, Module, Args, TxnFun, + fun gen_server2:call/3, fun gen_server2:cast/2). + +%% For testing we can instrument certain "interesting" cases where we +%% call and cast to other GMs. +start_link(GroupName, Module, Args, TxnFun, Call, Cast) -> + gen_server2:start_link( + ?MODULE, [GroupName, Module, Args, TxnFun, Call, Cast], []). leave(Server) -> gen_server2:cast(Server, leave). @@ -545,7 +555,7 @@ forget_group(GroupName) -> end), ok. -init([GroupName, Module, Args, TxnFun]) -> +init([GroupName, Module, Args, TxnFun, Call, Cast]) -> put(process_name, {?MODULE, GroupName}), {MegaSecs, Secs, MicroSecs} = now(), random:seed(MegaSecs, Secs, MicroSecs), @@ -564,7 +574,9 @@ init([GroupName, Module, Args, TxnFun]) -> broadcast_buffer = [], broadcast_buffer_sz = 0, broadcast_timer = undefined, - txn_executor = TxnFun }, hibernate, + txn_executor = TxnFun, + call = Call, + cast = Cast}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -606,12 +618,14 @@ handle_call({add_on_right, NewMember}, _From, State = #state { self = Self, group_name = GroupName, members_state = MembersState, - txn_executor = TxnFun }) -> + txn_executor = TxnFun, + cast = Cast}) -> Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun), View1 = group_to_view(Group), MembersState1 = remove_erased_members(MembersState, View1), ok = send_right(NewMember, View1, - {catchup, Self, prepare_members_state(MembersState1)}), + {catchup, Self, prepare_members_state(MembersState1)}, + Cast), {Result, State1} = change_view(View1, State #state { members_state = MembersState1 }), handle_callback_result({Result, {ok, Group}, State1}). @@ -653,8 +667,9 @@ handle_cast(join, State = #state { self = Self, members_state = undefined, module = Module, callback_args = Args, - txn_executor = TxnFun }) -> - View = join_group(Self, GroupName, TxnFun), + txn_executor = TxnFun, + call = Call}) -> + View = join_group(Self, GroupName, TxnFun, Call), MembersState = case alive_view_members(View) of [Self] -> blank_member_state(); @@ -761,8 +776,9 @@ handle_msg({catchup, Left, MembersStateLeft}, left = {Left, _MRefL}, right = {Right, _MRefR}, view = View, - members_state = undefined }) -> - ok = send_right(Right, View, {catchup, Self, MembersStateLeft}), + members_state = undefined, + cast = Cast}) -> + ok = send_right(Right, View, {catchup, Self, MembersStateLeft}, Cast), MembersStateLeft1 = build_members_state(MembersStateLeft), {ok, State #state { members_state = MembersStateLeft1 }}; @@ -1033,15 +1049,17 @@ ensure_alive_suffix1(MembersQ) -> %% View modification %% --------------------------------------------------------------------------- -join_group(Self, GroupName, TxnFun) -> - join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun). +join_group(Self, GroupName, TxnFun, Call) -> + join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun, Call). -join_group(Self, GroupName, {error, not_found}, TxnFun) -> +join_group(Self, GroupName, {error, not_found}, TxnFun, Call) -> join_group(Self, GroupName, - prune_or_create_group(Self, GroupName, TxnFun), TxnFun); -join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) -> + prune_or_create_group(Self, GroupName, TxnFun), TxnFun, Call); +join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, + _TxnFun, _Call) -> group_to_view(Group); -join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> +join_group(Self, GroupName, #gm_group { members = Members } = Group, + TxnFun, Call) -> case lists:member(Self, Members) of true -> group_to_view(Group); @@ -1050,7 +1068,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> [] -> join_group(Self, GroupName, prune_or_create_group(Self, GroupName, TxnFun), - TxnFun); + TxnFun, Call); Alive -> Left = lists:nth(random:uniform(length(Alive)), Alive), Handler = @@ -1059,13 +1077,14 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) -> Self, GroupName, record_dead_member_in_group( Left, GroupName, TxnFun), - TxnFun) + TxnFun, Call) end, try - case gen_server2:call( + case Call( get_pid(Left), {add_on_right, Self}, infinity) of {ok, Group1} -> group_to_view(Group1); - not_ready -> join_group(Self, GroupName, TxnFun) + not_ready -> join_group( + Self, GroupName, TxnFun, Call) end catch exit:{R, _} @@ -1183,21 +1202,20 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false. %% View monitoring and maintanence %% --------------------------------------------------------------------------- -ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> +ensure_neighbour(_Ver, Self, {Self, undefined}, Self, _Cast) -> {Self, undefined}; -ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> - ok = gen_server2:cast(get_pid(RealNeighbour), - {?TAG, Ver, check_neighbours}), +ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour, Cast) -> + ok = Cast(get_pid(RealNeighbour), {?TAG, Ver, check_neighbours}), {RealNeighbour, maybe_monitor(RealNeighbour, Self)}; -ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> +ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour, _Cast) -> {RealNeighbour, MRef}; -ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> +ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour, Cast) -> true = erlang:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, - ok = gen_server2:cast(get_pid(RealNeighbour), Msg), + ok = Cast(get_pid(RealNeighbour), Msg), ok = case Neighbour of Self -> ok; - _ -> gen_server2:cast(get_pid(Neighbour), Msg) + _ -> Cast(get_pid(Neighbour), Msg) end, {Neighbour, maybe_monitor(Neighbour, Self)}. @@ -1208,12 +1226,13 @@ check_neighbours(State = #state { self = Self, left = Left, right = Right, view = View, - broadcast_buffer = Buffer }) -> + broadcast_buffer = Buffer, + cast = Cast}) -> #view_member { left = VLeft, right = VRight } = fetch_view_member(Self, View), Ver = view_version(View), - Left1 = ensure_neighbour(Ver, Self, Left, VLeft), - Right1 = ensure_neighbour(Ver, Self, Right, VRight), + Left1 = ensure_neighbour(Ver, Self, Left, VLeft, Cast), + Right1 = ensure_neighbour(Ver, Self, Right, VRight, Cast), Buffer1 = case Right1 of {Self, undefined} -> []; _ -> Buffer @@ -1233,9 +1252,10 @@ maybe_send_catchup(_Right, #state { members_state = undefined }) -> maybe_send_catchup(_Right, #state { self = Self, right = {Right, _MRef}, view = View, - members_state = MembersState }) -> + members_state = MembersState, + cast = Cast}) -> send_right(Right, View, - {catchup, Self, prepare_members_state(MembersState)}). + {catchup, Self, prepare_members_state(MembersState)}, Cast). %% --------------------------------------------------------------------------- @@ -1338,11 +1358,12 @@ maybe_send_activity([], _State) -> ok; maybe_send_activity(Activity, #state { self = Self, right = {Right, _MRefR}, - view = View }) -> - send_right(Right, View, {activity, Self, Activity}). + view = View, + cast = Cast}) -> + send_right(Right, View, {activity, Self, Activity}, Cast). -send_right(Right, View, Msg) -> - ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}). +send_right(Right, View, Msg, Cast) -> + ok = Cast(get_pid(Right), {?TAG, view_version(View), Msg}). callback(Args, Module, Activity) -> Result = diff --git a/src/gm_qc.erl b/src/gm_qc.erl index 2382f7f17a..6a09c286c2 100644 --- a/src/gm_qc.erl +++ b/src/gm_qc.erl @@ -33,15 +33,18 @@ -export([joined/2, members_changed/3, handle_msg/3, terminate/2]). %% Helpers --export([do_join/0, do_leave/1, do_send/2]). +-export([do_join/0, do_leave/1, do_send/2, do_proceed/2]). --record(state, {seq, msgs}). +-record(state, {seq, instrumented, outstanding}). prop_gm_test() -> - ?FORALL(Cmds, commands(?MODULE, initial_state()), - gm_test(Cmds)). + ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)). gm_test(Cmds) -> + %% Give some feedback on how long our sequences are, since it + %% seems easy to end up with lots of tiny sequences and think + %% you're doing something. + io:format("~p", [length(Cmds)]), {_H, State, Res} = run_commands(?MODULE, Cmds), cleanup(State), ?WHENFAIL( @@ -49,8 +52,8 @@ gm_test(Cmds) -> aggregate(command_names(Cmds), Res =:= ok)). cleanup(S) -> - #state{msgs = Msgs} = ensure_outstanding_msgs_received(S), - All = gms(Msgs), + S2 = ensure_outstanding_msgs_received(drain_proceeding(S)), + All = gms(S2), [gm:leave(GM) || GM <- All], [await_death(GM) || GM <- All], ok. @@ -65,79 +68,105 @@ await_death(P) -> %% proper_statem %% --------------------------------------------------------------------------- -initial_state() -> #state{seq = 1, - msgs = dict:new()}. +initial_state() -> #state{seq = 1, + outstanding = dict:new(), + instrumented = dict:new()}. -command(S = #state{msgs = Msgs}) -> - case dict:size(Msgs) of +command(S = #state{outstanding = Outstanding}) -> + case dict:size(Outstanding) of 0 -> qc_join(S); _ -> frequency([{1, qc_join(S)}, {1, qc_leave(S)}, - {10, qc_send(S)}]) + {10, qc_send(S)}, + {20, qc_proceed(S)}]) end. -qc_join(_S) -> {call,?MODULE,do_join, []}. -qc_leave(#state{msgs = Msgs}) -> {call,?MODULE,do_leave,[random(gms(Msgs))]}. -qc_send(#state{seq = N, - msgs = Msgs}) -> {call,?MODULE,do_send, [N, random(gms(Msgs))]}. +qc_join(_S) -> {call,?MODULE,do_join, []}. +qc_leave(S) -> {call,?MODULE,do_leave,[random(gms(S))]}. +qc_send(S = #state{seq = N}) -> {call,?MODULE,do_send, [N, random(gms(S))]}. +qc_proceed(S) -> {call,?MODULE,do_proceed, [random(gms(S)), + random(gms(S))]}. random([]) -> will_fail_precondition; random(L) -> lists:nth(random:uniform(length(L)), L). -precondition(#state{msgs = Msgs}, {call, ?MODULE, do_join, []}) -> - dict:size(Msgs) < ?MAX_SIZE; +precondition(S, {call, ?MODULE, do_join, []}) -> + length(gms(S)) < ?MAX_SIZE; + +precondition(S, {call, ?MODULE, do_leave, [_GM]}) -> + length(gms(S)) > 0; -precondition(#state{msgs = Msgs}, {call, ?MODULE, do_leave, [_GM]}) -> - dict:size(Msgs) > 0; +precondition(S, {call, ?MODULE, do_send, [_N, _GM]}) -> + length(gms(S)) > 0; -precondition(#state{msgs = Msgs}, {call, ?MODULE, do_send, [_N, _GM]}) -> - dict:size(Msgs) > 0. +precondition(S, {call, ?MODULE, do_proceed, [GM1, GM2]}) -> + length(gms(S)) > 0 andalso GM1 =/= GM2. -postcondition(S = #state{msgs = Msgs}, {call, ?MODULE, do_join, []}, GM) -> - [begin - gm:broadcast(Existing, heartbeat), - receive - {birth, Existing, GM} -> ok - after 1000 -> - exit({birth_timeout, Existing, did_not_announce, GM}) - end - end || Existing <- gms(Msgs) -- [GM]], +postcondition(S, {call, ?MODULE, do_join, []}, _GM) -> + %% TODO figure out how to test birth announcements again + %% [begin + %% gm:broadcast(Existing, heartbeat), + %% receive + %% {birth, Existing, GM} -> ok + %% after 1000 -> + %% exit({birth_timeout, Existing, did_not_announce, GM}) + %% end + %% end || Existing <- gms(S) -- [GM]], assert(S); -postcondition(S = #state{msgs = Msgs}, +postcondition(S = #state{outstanding = Outstanding}, {call, ?MODULE, do_leave, [Dead]}, _Res) -> - [await_death(Existing, Dead, 5) || Existing <- gms(Msgs) -- [Dead]], - assert(S#state{msgs = dict:erase(Dead, Msgs)}); + %% TODO figure out how to test death announcements again + %%[await_death(Existing, Dead, 5) || Existing <- gms(S) -- [Dead]], + assert(S#state{outstanding = dict:erase(Dead, Outstanding)}); postcondition(S = #state{}, {call, _M, _F, _A}, _Res) -> assert(S). -next_state(S = #state{msgs = Msgs}, GM, {call, ?MODULE, do_join, []}) -> - S#state{msgs = dict:store(GM, {gb_trees:empty(), gb_sets:empty()}, Msgs)}; +next_state(S = #state{outstanding = Outstanding}, GM, + {call, ?MODULE, do_join, []}) -> + S#state{outstanding = dict:store(GM, {gb_trees:empty(), gb_sets:empty()}, + Outstanding)}; -next_state(S = #state{msgs = Msgs}, _GM, {call, ?MODULE, do_leave, [GM]}) -> - true = dict:is_key(GM, Msgs), - S#state{msgs = dict:erase(GM, Msgs)}; +next_state(S = #state{outstanding = Outstanding}, _Res, + {call, ?MODULE, do_leave, [GM]}) -> + true = dict:is_key(GM, Outstanding), + S#state{outstanding = dict:erase(GM, Outstanding)}; -next_state(S = #state{seq = Seq, - msgs = Msgs}, Msg, {call, ?MODULE, do_send, [_, _]}) -> +next_state(S = #state{seq = Seq, + outstanding = Outstanding}, Msg, + {call, ?MODULE, do_send, [_, _]}) -> TS = timestamp(), - Msgs1 = dict:map(fun (_GM, {Tree, Set}) -> - {gb_trees:insert(Msg, TS, Tree), - gb_sets:add_element({TS, Msg}, Set)} - end, Msgs), - drain(S#state{seq = Seq + 1, - msgs = Msgs1}). + Outstanding1 = dict:map(fun (_GM, {Tree, Set}) -> + {gb_trees:insert(Msg, TS, Tree), + gb_sets:add_element({TS, Msg}, Set)} + end, Outstanding), + drain(S#state{seq = Seq + 1, + outstanding = Outstanding1}); + +next_state(S = #state{instrumented = Msgs}, _Res, + {call, ?MODULE, do_proceed, [From, To]}) -> + Msgs1 = case dict:find({From, To}, Msgs) of + {ok, Q} -> case queue:out(Q) of + {{value, Thing}, Q2} -> + process_msg(From, To, Thing), + dict:store({From, To}, Q2, Msgs); + {empty, _} -> + Msgs + end; + error -> Msgs + end, + S#state{instrumented = Msgs1}. %% --------------------------------------------------------------------------- %% GM %% --------------------------------------------------------------------------- -joined(Pid, _Members) -> Pid ! {joined, self()}, ok. -members_changed(Pid, Bs, Ds) -> [Pid ! {birth, self(), B} || B <- Bs], - [Pid ! {death, self(), D} || D <- Ds], +joined(_Pid, _Members) -> ok. +members_changed(_Pid, _Bs, _Ds) -> %%[Pid ! {birth, self(), B} || B <- Bs], + %%[Pid ! {death, self(), D} || D <- Ds], ok. -handle_msg(_Pid, _From, heartbeat) -> ok; +%%handle_msg(_Pid, _From, heartbeat) -> ok; handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok. terminate(_Pid, _Reason) -> ok. @@ -146,11 +175,15 @@ terminate(_Pid, _Reason) -> ok. %% --------------------------------------------------------------------------- do_join() -> + {Call, Cast} = instrumented_funs(), {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(), - fun rabbit_misc:execute_mnesia_transaction/1), - receive - {joined, GM} -> ok - end, + fun rabbit_misc:execute_mnesia_transaction/1, + Call, Cast), + %% TODO do we need to test the joined callback? What is the joined + %% callback actually for? + %% receive + %% {joined, GM} -> ok + %% end, GM. do_leave(GM) -> @@ -162,44 +195,67 @@ do_send(Seq, GM) -> gm:broadcast(GM, Msg), Msg. -await_death(GM, ToDie, 0) -> - exit({death_msg_timeout, GM, ToDie}); -await_death(GM, ToDie, N) -> - gm:broadcast(GM, heartbeat), - receive - {death, GM, ToDie} -> ok - after 100 -> - await_death(GM, ToDie, N - 1) - end. +do_proceed(_From, _To) -> + ok. %% Do the work in next_state + +%% await_death(GM, ToDie, 0) -> +%% exit({death_msg_timeout, GM, ToDie}); +%% await_death(GM, ToDie, N) -> +%% gm:broadcast(GM, heartbeat), +%% receive +%% {death, GM, ToDie} -> ok +%% after 100 -> +%% await_death(GM, ToDie, N - 1) +%% end. -gms(Msgs) -> dict:fetch_keys(Msgs). +gms(#state{outstanding = Outstanding}) -> dict:fetch_keys(Outstanding). -drain(S = #state{msgs = Msgs}) -> +drain(S) -> receive - {gm, GM, Msg} -> - case dict:find(GM, Msgs) of - {ok, {Tree, Set}} -> - case gb_trees:lookup(Msg, Tree) of - {value, TS} -> - Msgs1 = dict:store( - GM, {gb_trees:delete(Msg, Tree), - gb_sets:del_element({TS, Msg}, Set)}, - Msgs), - drain(S#state{msgs = Msgs1}); - none -> - %% Message from GM that joined after we - %% broadcast the message. OK. - drain(S) - end; - error -> - %% Message from GM that has already died. OK. - drain(S) - end - after 0 -> - S + Msg -> drain(handle_msg(Msg, S)) + after 0 -> S end. -assert(#state{msgs = Msgs}) -> +drain_proceeding(S0) -> + timer:sleep(100), + S = #state{instrumented = Msgs} = drain(S0), + case dict:size(Msgs) of + 0 -> S; + _ -> _ = dict:map(fun ({From, To}, Q) -> + [process_msg(From, To, Msg) || + Msg <- queue:to_list(Q)] + end, Msgs), + drain_proceeding(S#state{instrumented = dict:new()}) + end. + +handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) -> + case dict:find(GM, Outstanding) of + {ok, {Tree, Set}} -> + case gb_trees:lookup(Msg, Tree) of + {value, TS} -> + TreeSet = {gb_trees:delete(Msg, Tree), + gb_sets:del_element({TS, Msg}, Set)}, + S#state{outstanding = dict:store(GM, TreeSet, Outstanding)}; + none -> + %% Message from GM that joined after we + %% broadcast the message. OK. + S + end; + error -> + %% Message from GM that has already died. OK. + S + end; +handle_msg({instrumented, From, To, Thing}, S = #state{instrumented = Msgs}) -> + Q1 = case dict:find({From, To}, Msgs) of + {ok, Q} -> queue:in(Thing, Q); + error -> queue:from_list([Thing]) + end, + S#state{instrumented = dict:store({From, To}, Q1, Msgs)}. + +process_msg(From, _To, {call, Ref}) -> From ! {proceed, Ref}; +process_msg(_From, To, {cast, Msg}) -> gen_server2:cast(To, Msg). + +assert(S = #state{outstanding = Outstanding}) -> TS = timestamp(), dict:fold(fun (GM, {_Tree, Set}, none) -> case gb_sets:size(Set) of @@ -209,27 +265,42 @@ assert(#state{msgs = Msgs}) -> true -> exit({msg_timeout, [{msg, Msg}, {gm, GM}, - {all, gms(Msgs)}]}); + {all, gms(S)}]}); false -> ok end end, none - end, none, Msgs), + end, none, Outstanding), true. ensure_outstanding_msgs_received(S) -> case outstanding_msgs(S) of false -> S; true -> timer:sleep(100), - S2 = drain(S), + S2 = drain_proceeding(S), assert(S2), ensure_outstanding_msgs_received(S2) end. -outstanding_msgs(#state{msgs = Msgs}) -> +outstanding_msgs(#state{outstanding = Outstanding}) -> dict:fold(fun (_GM, {_Tree, Set}, false) -> not gb_sets:is_empty(Set); (_GM, {_Tree, _Set}, true) -> true - end, false, Msgs). + end, false, Outstanding). + +instrumented_funs() -> + Test = self(), + {fun (Pid, Msg, infinity) -> + Ref = make_ref(), + Test ! {instrumented, self(), Pid, {call, Ref}}, + receive + {proceed, Ref} -> ok + end, + gen_server2:call(Pid, Msg, infinity) + end, + fun (Pid, Msg) -> + Test ! {instrumented, self(), Pid, {cast, Msg}}, + ok + end}. timestamp() -> timer:now_diff(os:timestamp(), {0, 0, 0}). |
