summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-05-29 16:00:37 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-05-29 16:00:37 +0100
commit1e1ea54173ddcd546b770bb12c0cd77de01fc649 (patch)
tree8d28699028ffe9d57263b2f28dede4d1d34c0c85 /src
parentf51c08553dda5fabb2fcb01e07dc6143d82bcbbf (diff)
downloadrabbitmq-server-git-1e1ea54173ddcd546b770bb12c0cd77de01fc649.tar.gz
Instrument certain calls to gs2:call/3 and gs2:cast/2, and allow PropEr to determine when they proceed. That makes things a lot more deterministic (although not fully) and lets us reproduce bug 26171 (small whoop).
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl97
-rw-r--r--src/gm_qc.erl255
2 files changed, 222 insertions, 130 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 7a2f5835b0..4065b17182 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -382,7 +382,8 @@
-behaviour(gen_server2).
--export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3,
+-export([create_tables/0, start_link/4, start_link/6,
+ leave/1, broadcast/2, broadcast/3,
confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -417,7 +418,9 @@
broadcast_buffer,
broadcast_buffer_sz,
broadcast_timer,
- txn_executor
+ txn_executor,
+ call,
+ cast
}).
-record(gm_group, { name, version, members }).
@@ -519,7 +522,14 @@ table_definitions() ->
[{Name, [?TABLE_MATCH | Attributes]}].
start_link(GroupName, Module, Args, TxnFun) ->
- gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []).
+ start_link(GroupName, Module, Args, TxnFun,
+ fun gen_server2:call/3, fun gen_server2:cast/2).
+
+%% For testing we can instrument certain "interesting" cases where we
+%% call and cast to other GMs.
+start_link(GroupName, Module, Args, TxnFun, Call, Cast) ->
+ gen_server2:start_link(
+ ?MODULE, [GroupName, Module, Args, TxnFun, Call, Cast], []).
leave(Server) ->
gen_server2:cast(Server, leave).
@@ -545,7 +555,7 @@ forget_group(GroupName) ->
end),
ok.
-init([GroupName, Module, Args, TxnFun]) ->
+init([GroupName, Module, Args, TxnFun, Call, Cast]) ->
put(process_name, {?MODULE, GroupName}),
{MegaSecs, Secs, MicroSecs} = now(),
random:seed(MegaSecs, Secs, MicroSecs),
@@ -564,7 +574,9 @@ init([GroupName, Module, Args, TxnFun]) ->
broadcast_buffer = [],
broadcast_buffer_sz = 0,
broadcast_timer = undefined,
- txn_executor = TxnFun }, hibernate,
+ txn_executor = TxnFun,
+ call = Call,
+ cast = Cast}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -606,12 +618,14 @@ handle_call({add_on_right, NewMember}, _From,
State = #state { self = Self,
group_name = GroupName,
members_state = MembersState,
- txn_executor = TxnFun }) ->
+ txn_executor = TxnFun,
+ cast = Cast}) ->
Group = record_new_member_in_group(NewMember, Self, GroupName, TxnFun),
View1 = group_to_view(Group),
MembersState1 = remove_erased_members(MembersState, View1),
ok = send_right(NewMember, View1,
- {catchup, Self, prepare_members_state(MembersState1)}),
+ {catchup, Self, prepare_members_state(MembersState1)},
+ Cast),
{Result, State1} = change_view(View1, State #state {
members_state = MembersState1 }),
handle_callback_result({Result, {ok, Group}, State1}).
@@ -653,8 +667,9 @@ handle_cast(join, State = #state { self = Self,
members_state = undefined,
module = Module,
callback_args = Args,
- txn_executor = TxnFun }) ->
- View = join_group(Self, GroupName, TxnFun),
+ txn_executor = TxnFun,
+ call = Call}) ->
+ View = join_group(Self, GroupName, TxnFun, Call),
MembersState =
case alive_view_members(View) of
[Self] -> blank_member_state();
@@ -761,8 +776,9 @@ handle_msg({catchup, Left, MembersStateLeft},
left = {Left, _MRefL},
right = {Right, _MRefR},
view = View,
- members_state = undefined }) ->
- ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
+ members_state = undefined,
+ cast = Cast}) ->
+ ok = send_right(Right, View, {catchup, Self, MembersStateLeft}, Cast),
MembersStateLeft1 = build_members_state(MembersStateLeft),
{ok, State #state { members_state = MembersStateLeft1 }};
@@ -1033,15 +1049,17 @@ ensure_alive_suffix1(MembersQ) ->
%% View modification
%% ---------------------------------------------------------------------------
-join_group(Self, GroupName, TxnFun) ->
- join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun).
+join_group(Self, GroupName, TxnFun, Call) ->
+ join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun, Call).
-join_group(Self, GroupName, {error, not_found}, TxnFun) ->
+join_group(Self, GroupName, {error, not_found}, TxnFun, Call) ->
join_group(Self, GroupName,
- prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
-join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
+ prune_or_create_group(Self, GroupName, TxnFun), TxnFun, Call);
+join_group(Self, _GroupName, #gm_group { members = [Self] } = Group,
+ _TxnFun, _Call) ->
group_to_view(Group);
-join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
+join_group(Self, GroupName, #gm_group { members = Members } = Group,
+ TxnFun, Call) ->
case lists:member(Self, Members) of
true ->
group_to_view(Group);
@@ -1050,7 +1068,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
[] ->
join_group(Self, GroupName,
prune_or_create_group(Self, GroupName, TxnFun),
- TxnFun);
+ TxnFun, Call);
Alive ->
Left = lists:nth(random:uniform(length(Alive)), Alive),
Handler =
@@ -1059,13 +1077,14 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
Self, GroupName,
record_dead_member_in_group(
Left, GroupName, TxnFun),
- TxnFun)
+ TxnFun, Call)
end,
try
- case gen_server2:call(
+ case Call(
get_pid(Left), {add_on_right, Self}, infinity) of
{ok, Group1} -> group_to_view(Group1);
- not_ready -> join_group(Self, GroupName, TxnFun)
+ not_ready -> join_group(
+ Self, GroupName, TxnFun, Call)
end
catch
exit:{R, _}
@@ -1183,21 +1202,20 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
%% View monitoring and maintanence
%% ---------------------------------------------------------------------------
-ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
+ensure_neighbour(_Ver, Self, {Self, undefined}, Self, _Cast) ->
{Self, undefined};
-ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
- ok = gen_server2:cast(get_pid(RealNeighbour),
- {?TAG, Ver, check_neighbours}),
+ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour, Cast) ->
+ ok = Cast(get_pid(RealNeighbour), {?TAG, Ver, check_neighbours}),
{RealNeighbour, maybe_monitor(RealNeighbour, Self)};
-ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
+ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour, _Cast) ->
{RealNeighbour, MRef};
-ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
+ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour, Cast) ->
true = erlang:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
- ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
+ ok = Cast(get_pid(RealNeighbour), Msg),
ok = case Neighbour of
Self -> ok;
- _ -> gen_server2:cast(get_pid(Neighbour), Msg)
+ _ -> Cast(get_pid(Neighbour), Msg)
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
@@ -1208,12 +1226,13 @@ check_neighbours(State = #state { self = Self,
left = Left,
right = Right,
view = View,
- broadcast_buffer = Buffer }) ->
+ broadcast_buffer = Buffer,
+ cast = Cast}) ->
#view_member { left = VLeft, right = VRight }
= fetch_view_member(Self, View),
Ver = view_version(View),
- Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
- Right1 = ensure_neighbour(Ver, Self, Right, VRight),
+ Left1 = ensure_neighbour(Ver, Self, Left, VLeft, Cast),
+ Right1 = ensure_neighbour(Ver, Self, Right, VRight, Cast),
Buffer1 = case Right1 of
{Self, undefined} -> [];
_ -> Buffer
@@ -1233,9 +1252,10 @@ maybe_send_catchup(_Right, #state { members_state = undefined }) ->
maybe_send_catchup(_Right, #state { self = Self,
right = {Right, _MRef},
view = View,
- members_state = MembersState }) ->
+ members_state = MembersState,
+ cast = Cast}) ->
send_right(Right, View,
- {catchup, Self, prepare_members_state(MembersState)}).
+ {catchup, Self, prepare_members_state(MembersState)}, Cast).
%% ---------------------------------------------------------------------------
@@ -1338,11 +1358,12 @@ maybe_send_activity([], _State) ->
ok;
maybe_send_activity(Activity, #state { self = Self,
right = {Right, _MRefR},
- view = View }) ->
- send_right(Right, View, {activity, Self, Activity}).
+ view = View,
+ cast = Cast}) ->
+ send_right(Right, View, {activity, Self, Activity}, Cast).
-send_right(Right, View, Msg) ->
- ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
+send_right(Right, View, Msg, Cast) ->
+ ok = Cast(get_pid(Right), {?TAG, view_version(View), Msg}).
callback(Args, Module, Activity) ->
Result =
diff --git a/src/gm_qc.erl b/src/gm_qc.erl
index 2382f7f17a..6a09c286c2 100644
--- a/src/gm_qc.erl
+++ b/src/gm_qc.erl
@@ -33,15 +33,18 @@
-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
%% Helpers
--export([do_join/0, do_leave/1, do_send/2]).
+-export([do_join/0, do_leave/1, do_send/2, do_proceed/2]).
--record(state, {seq, msgs}).
+-record(state, {seq, instrumented, outstanding}).
prop_gm_test() ->
- ?FORALL(Cmds, commands(?MODULE, initial_state()),
- gm_test(Cmds)).
+ ?FORALL(Cmds, commands(?MODULE), gm_test(Cmds)).
gm_test(Cmds) ->
+ %% Give some feedback on how long our sequences are, since it
+ %% seems easy to end up with lots of tiny sequences and think
+ %% you're doing something.
+ io:format("~p", [length(Cmds)]),
{_H, State, Res} = run_commands(?MODULE, Cmds),
cleanup(State),
?WHENFAIL(
@@ -49,8 +52,8 @@ gm_test(Cmds) ->
aggregate(command_names(Cmds), Res =:= ok)).
cleanup(S) ->
- #state{msgs = Msgs} = ensure_outstanding_msgs_received(S),
- All = gms(Msgs),
+ S2 = ensure_outstanding_msgs_received(drain_proceeding(S)),
+ All = gms(S2),
[gm:leave(GM) || GM <- All],
[await_death(GM) || GM <- All],
ok.
@@ -65,79 +68,105 @@ await_death(P) ->
%% proper_statem
%% ---------------------------------------------------------------------------
-initial_state() -> #state{seq = 1,
- msgs = dict:new()}.
+initial_state() -> #state{seq = 1,
+ outstanding = dict:new(),
+ instrumented = dict:new()}.
-command(S = #state{msgs = Msgs}) ->
- case dict:size(Msgs) of
+command(S = #state{outstanding = Outstanding}) ->
+ case dict:size(Outstanding) of
0 -> qc_join(S);
_ -> frequency([{1, qc_join(S)},
{1, qc_leave(S)},
- {10, qc_send(S)}])
+ {10, qc_send(S)},
+ {20, qc_proceed(S)}])
end.
-qc_join(_S) -> {call,?MODULE,do_join, []}.
-qc_leave(#state{msgs = Msgs}) -> {call,?MODULE,do_leave,[random(gms(Msgs))]}.
-qc_send(#state{seq = N,
- msgs = Msgs}) -> {call,?MODULE,do_send, [N, random(gms(Msgs))]}.
+qc_join(_S) -> {call,?MODULE,do_join, []}.
+qc_leave(S) -> {call,?MODULE,do_leave,[random(gms(S))]}.
+qc_send(S = #state{seq = N}) -> {call,?MODULE,do_send, [N, random(gms(S))]}.
+qc_proceed(S) -> {call,?MODULE,do_proceed, [random(gms(S)),
+ random(gms(S))]}.
random([]) -> will_fail_precondition;
random(L) -> lists:nth(random:uniform(length(L)), L).
-precondition(#state{msgs = Msgs}, {call, ?MODULE, do_join, []}) ->
- dict:size(Msgs) < ?MAX_SIZE;
+precondition(S, {call, ?MODULE, do_join, []}) ->
+ length(gms(S)) < ?MAX_SIZE;
+
+precondition(S, {call, ?MODULE, do_leave, [_GM]}) ->
+ length(gms(S)) > 0;
-precondition(#state{msgs = Msgs}, {call, ?MODULE, do_leave, [_GM]}) ->
- dict:size(Msgs) > 0;
+precondition(S, {call, ?MODULE, do_send, [_N, _GM]}) ->
+ length(gms(S)) > 0;
-precondition(#state{msgs = Msgs}, {call, ?MODULE, do_send, [_N, _GM]}) ->
- dict:size(Msgs) > 0.
+precondition(S, {call, ?MODULE, do_proceed, [GM1, GM2]}) ->
+ length(gms(S)) > 0 andalso GM1 =/= GM2.
-postcondition(S = #state{msgs = Msgs}, {call, ?MODULE, do_join, []}, GM) ->
- [begin
- gm:broadcast(Existing, heartbeat),
- receive
- {birth, Existing, GM} -> ok
- after 1000 ->
- exit({birth_timeout, Existing, did_not_announce, GM})
- end
- end || Existing <- gms(Msgs) -- [GM]],
+postcondition(S, {call, ?MODULE, do_join, []}, _GM) ->
+ %% TODO figure out how to test birth announcements again
+ %% [begin
+ %% gm:broadcast(Existing, heartbeat),
+ %% receive
+ %% {birth, Existing, GM} -> ok
+ %% after 1000 ->
+ %% exit({birth_timeout, Existing, did_not_announce, GM})
+ %% end
+ %% end || Existing <- gms(S) -- [GM]],
assert(S);
-postcondition(S = #state{msgs = Msgs},
+postcondition(S = #state{outstanding = Outstanding},
{call, ?MODULE, do_leave, [Dead]}, _Res) ->
- [await_death(Existing, Dead, 5) || Existing <- gms(Msgs) -- [Dead]],
- assert(S#state{msgs = dict:erase(Dead, Msgs)});
+ %% TODO figure out how to test death announcements again
+ %%[await_death(Existing, Dead, 5) || Existing <- gms(S) -- [Dead]],
+ assert(S#state{outstanding = dict:erase(Dead, Outstanding)});
postcondition(S = #state{}, {call, _M, _F, _A}, _Res) ->
assert(S).
-next_state(S = #state{msgs = Msgs}, GM, {call, ?MODULE, do_join, []}) ->
- S#state{msgs = dict:store(GM, {gb_trees:empty(), gb_sets:empty()}, Msgs)};
+next_state(S = #state{outstanding = Outstanding}, GM,
+ {call, ?MODULE, do_join, []}) ->
+ S#state{outstanding = dict:store(GM, {gb_trees:empty(), gb_sets:empty()},
+ Outstanding)};
-next_state(S = #state{msgs = Msgs}, _GM, {call, ?MODULE, do_leave, [GM]}) ->
- true = dict:is_key(GM, Msgs),
- S#state{msgs = dict:erase(GM, Msgs)};
+next_state(S = #state{outstanding = Outstanding}, _Res,
+ {call, ?MODULE, do_leave, [GM]}) ->
+ true = dict:is_key(GM, Outstanding),
+ S#state{outstanding = dict:erase(GM, Outstanding)};
-next_state(S = #state{seq = Seq,
- msgs = Msgs}, Msg, {call, ?MODULE, do_send, [_, _]}) ->
+next_state(S = #state{seq = Seq,
+ outstanding = Outstanding}, Msg,
+ {call, ?MODULE, do_send, [_, _]}) ->
TS = timestamp(),
- Msgs1 = dict:map(fun (_GM, {Tree, Set}) ->
- {gb_trees:insert(Msg, TS, Tree),
- gb_sets:add_element({TS, Msg}, Set)}
- end, Msgs),
- drain(S#state{seq = Seq + 1,
- msgs = Msgs1}).
+ Outstanding1 = dict:map(fun (_GM, {Tree, Set}) ->
+ {gb_trees:insert(Msg, TS, Tree),
+ gb_sets:add_element({TS, Msg}, Set)}
+ end, Outstanding),
+ drain(S#state{seq = Seq + 1,
+ outstanding = Outstanding1});
+
+next_state(S = #state{instrumented = Msgs}, _Res,
+ {call, ?MODULE, do_proceed, [From, To]}) ->
+ Msgs1 = case dict:find({From, To}, Msgs) of
+ {ok, Q} -> case queue:out(Q) of
+ {{value, Thing}, Q2} ->
+ process_msg(From, To, Thing),
+ dict:store({From, To}, Q2, Msgs);
+ {empty, _} ->
+ Msgs
+ end;
+ error -> Msgs
+ end,
+ S#state{instrumented = Msgs1}.
%% ---------------------------------------------------------------------------
%% GM
%% ---------------------------------------------------------------------------
-joined(Pid, _Members) -> Pid ! {joined, self()}, ok.
-members_changed(Pid, Bs, Ds) -> [Pid ! {birth, self(), B} || B <- Bs],
- [Pid ! {death, self(), D} || D <- Ds],
+joined(_Pid, _Members) -> ok.
+members_changed(_Pid, _Bs, _Ds) -> %%[Pid ! {birth, self(), B} || B <- Bs],
+ %%[Pid ! {death, self(), D} || D <- Ds],
ok.
-handle_msg(_Pid, _From, heartbeat) -> ok;
+%%handle_msg(_Pid, _From, heartbeat) -> ok;
handle_msg(Pid, _From, Msg) -> Pid ! {gm, self(), Msg}, ok.
terminate(_Pid, _Reason) -> ok.
@@ -146,11 +175,15 @@ terminate(_Pid, _Reason) -> ok.
%% ---------------------------------------------------------------------------
do_join() ->
+ {Call, Cast} = instrumented_funs(),
{ok, GM} = gm:start_link(?GROUP, ?MODULE, self(),
- fun rabbit_misc:execute_mnesia_transaction/1),
- receive
- {joined, GM} -> ok
- end,
+ fun rabbit_misc:execute_mnesia_transaction/1,
+ Call, Cast),
+ %% TODO do we need to test the joined callback? What is the joined
+ %% callback actually for?
+ %% receive
+ %% {joined, GM} -> ok
+ %% end,
GM.
do_leave(GM) ->
@@ -162,44 +195,67 @@ do_send(Seq, GM) ->
gm:broadcast(GM, Msg),
Msg.
-await_death(GM, ToDie, 0) ->
- exit({death_msg_timeout, GM, ToDie});
-await_death(GM, ToDie, N) ->
- gm:broadcast(GM, heartbeat),
- receive
- {death, GM, ToDie} -> ok
- after 100 ->
- await_death(GM, ToDie, N - 1)
- end.
+do_proceed(_From, _To) ->
+ ok. %% Do the work in next_state
+
+%% await_death(GM, ToDie, 0) ->
+%% exit({death_msg_timeout, GM, ToDie});
+%% await_death(GM, ToDie, N) ->
+%% gm:broadcast(GM, heartbeat),
+%% receive
+%% {death, GM, ToDie} -> ok
+%% after 100 ->
+%% await_death(GM, ToDie, N - 1)
+%% end.
-gms(Msgs) -> dict:fetch_keys(Msgs).
+gms(#state{outstanding = Outstanding}) -> dict:fetch_keys(Outstanding).
-drain(S = #state{msgs = Msgs}) ->
+drain(S) ->
receive
- {gm, GM, Msg} ->
- case dict:find(GM, Msgs) of
- {ok, {Tree, Set}} ->
- case gb_trees:lookup(Msg, Tree) of
- {value, TS} ->
- Msgs1 = dict:store(
- GM, {gb_trees:delete(Msg, Tree),
- gb_sets:del_element({TS, Msg}, Set)},
- Msgs),
- drain(S#state{msgs = Msgs1});
- none ->
- %% Message from GM that joined after we
- %% broadcast the message. OK.
- drain(S)
- end;
- error ->
- %% Message from GM that has already died. OK.
- drain(S)
- end
- after 0 ->
- S
+ Msg -> drain(handle_msg(Msg, S))
+ after 0 -> S
end.
-assert(#state{msgs = Msgs}) ->
+drain_proceeding(S0) ->
+ timer:sleep(100),
+ S = #state{instrumented = Msgs} = drain(S0),
+ case dict:size(Msgs) of
+ 0 -> S;
+ _ -> _ = dict:map(fun ({From, To}, Q) ->
+ [process_msg(From, To, Msg) ||
+ Msg <- queue:to_list(Q)]
+ end, Msgs),
+ drain_proceeding(S#state{instrumented = dict:new()})
+ end.
+
+handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) ->
+ case dict:find(GM, Outstanding) of
+ {ok, {Tree, Set}} ->
+ case gb_trees:lookup(Msg, Tree) of
+ {value, TS} ->
+ TreeSet = {gb_trees:delete(Msg, Tree),
+ gb_sets:del_element({TS, Msg}, Set)},
+ S#state{outstanding = dict:store(GM, TreeSet, Outstanding)};
+ none ->
+ %% Message from GM that joined after we
+ %% broadcast the message. OK.
+ S
+ end;
+ error ->
+ %% Message from GM that has already died. OK.
+ S
+ end;
+handle_msg({instrumented, From, To, Thing}, S = #state{instrumented = Msgs}) ->
+ Q1 = case dict:find({From, To}, Msgs) of
+ {ok, Q} -> queue:in(Thing, Q);
+ error -> queue:from_list([Thing])
+ end,
+ S#state{instrumented = dict:store({From, To}, Q1, Msgs)}.
+
+process_msg(From, _To, {call, Ref}) -> From ! {proceed, Ref};
+process_msg(_From, To, {cast, Msg}) -> gen_server2:cast(To, Msg).
+
+assert(S = #state{outstanding = Outstanding}) ->
TS = timestamp(),
dict:fold(fun (GM, {_Tree, Set}, none) ->
case gb_sets:size(Set) of
@@ -209,27 +265,42 @@ assert(#state{msgs = Msgs}) ->
true -> exit({msg_timeout,
[{msg, Msg},
{gm, GM},
- {all, gms(Msgs)}]});
+ {all, gms(S)}]});
false -> ok
end
end,
none
- end, none, Msgs),
+ end, none, Outstanding),
true.
ensure_outstanding_msgs_received(S) ->
case outstanding_msgs(S) of
false -> S;
true -> timer:sleep(100),
- S2 = drain(S),
+ S2 = drain_proceeding(S),
assert(S2),
ensure_outstanding_msgs_received(S2)
end.
-outstanding_msgs(#state{msgs = Msgs}) ->
+outstanding_msgs(#state{outstanding = Outstanding}) ->
dict:fold(fun (_GM, {_Tree, Set}, false) -> not gb_sets:is_empty(Set);
(_GM, {_Tree, _Set}, true) -> true
- end, false, Msgs).
+ end, false, Outstanding).
+
+instrumented_funs() ->
+ Test = self(),
+ {fun (Pid, Msg, infinity) ->
+ Ref = make_ref(),
+ Test ! {instrumented, self(), Pid, {call, Ref}},
+ receive
+ {proceed, Ref} -> ok
+ end,
+ gen_server2:call(Pid, Msg, infinity)
+ end,
+ fun (Pid, Msg) ->
+ Test ! {instrumented, self(), Pid, {cast, Msg}},
+ ok
+ end}.
timestamp() -> timer:now_diff(os:timestamp(), {0, 0, 0}).