diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-06-23 19:53:42 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-06-23 19:53:42 +0100 |
| commit | 93f42fbe153e37851c5cd4014e1d752734a4317d (patch) | |
| tree | 1eb6f3452332225ed6d8d82be0074ca6dd1ee213 | |
| parent | 16c680b78c0e6f02cf7b06b37cd762216beb4327 (diff) | |
| download | rabbitmq-server-git-93f42fbe153e37851c5cd4014e1d752734a4317d.tar.gz | |
Add instrumenting for monitors and Mnesia transactions. It actually all seems to work now.
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | src/gm.erl | 16 | ||||
| -rw-r--r-- | src/gm_qc.erl | 143 |
3 files changed, 113 insertions, 48 deletions
@@ -59,7 +59,7 @@ ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_m ifdef INSTRUMENT_FOR_QC ERLC_OPTS += -DINSTR_MOD=gm_qc else -ERLC_OPTS += -DINSTR_MOD=gen_server2 +ERLC_OPTS += -DINSTR_MOD=gm endif include version.mk diff --git a/src/gm.erl b/src/gm.erl index fd8da7da27..7790b8d0a4 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -388,6 +388,9 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/3]). +%% For INSTR_MOD callbacks +-export([call/3, cast/2, monitor/1, demonitor/1]). + -ifndef(use_specs). -export([behaviour_info/1]). -endif. @@ -1192,7 +1195,7 @@ ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) -> ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) -> {RealNeighbour, MRef}; ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> - true = erlang:demonitor(MRef), + true = ?INSTR_MOD:demonitor(MRef), Msg = {?TAG, Ver, check_neighbours}, ok = neighbour_cast(RealNeighbour, Msg), ok = case Neighbour of @@ -1202,7 +1205,7 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) -> {Neighbour, maybe_monitor(Neighbour, Self)}. maybe_monitor( Self, Self) -> undefined; -maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)). +maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)). check_neighbours(State = #state { self = Self, left = Left, @@ -1461,3 +1464,12 @@ last_pub( [], LP) -> LP; last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List), true = PubNum > LP, %% ASSERTION PubNum. + +%% --------------------------------------------------------------------------- + +%% Uninstrumented versions + +call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout). +cast(Pid, Msg) -> gen_server2:cast(Pid, Msg). +monitor(Pid) -> erlang:monitor(process, Pid). +demonitor(MRef) -> erlang:demonitor(MRef). diff --git a/src/gm_qc.erl b/src/gm_qc.erl index 7760605aaf..4b956196b0 100644 --- a/src/gm_qc.erl +++ b/src/gm_qc.erl @@ -33,14 +33,15 @@ -export([joined/2, members_changed/3, handle_msg/3, terminate/2]). %% Helpers --export([do_join/0, do_leave/1, do_send/1, do_proceed/2]). +-export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]). %% For insertion into gm --export([call/3, cast/2]). +-export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]). -record(state, {seq, %% symbolic and dynamic instrumented, %% dynamic only outstanding, %% dynamic only + monitors, %% dynamic only all_join, %% for symbolic to_join, %% dynamic only to_leave %% for symbolic @@ -68,6 +69,7 @@ cleanup(S) -> All = gms(S2), %% assertion - none to join check_stale_members(All), [gm:leave(GM) || GM <- All], + drain_proceeding(S2), [await_death(GM) || GM <- All], gm:forget_group(?GROUP), ok. @@ -91,13 +93,13 @@ await_death(P) -> await_death(MRef, P) -> receive - {'DOWN', MRef, process, P, _} -> ok; - {'EXIT', _, normal} -> await_death(MRef, P); - {'EXIT', _, Reason} -> exit(Reason); - {instrumented, From, To, Thing} -> process_msg(From, To, Thing), - await_death(MRef, P); - {joined, _GM} -> await_death(MRef, P); - {left, _GM} -> await_death(MRef, P) + {'DOWN', MRef, process, P, _} -> ok; + {'DOWN', _, _, _, _} -> await_death(MRef, P); + {'EXIT', _, normal} -> await_death(MRef, P); + {'EXIT', _, Reason} -> exit(Reason); + {joined, _GM} -> await_death(MRef, P); + {left, _GM} -> await_death(MRef, P); + Anything -> exit({stray_msg, Anything}) end. %% --------------------------------------------------------------------------- @@ -107,6 +109,7 @@ await_death(MRef, P) -> initial_state() -> #state{seq = 1, outstanding = dict:new(), instrumented = dict:new(), + monitors = dict:new(), all_join = sets:new(), to_join = sets:new(), to_leave = sets:new()}. @@ -115,18 +118,21 @@ command(S) -> case {length(gms_symb_not_left(S)), length(gms_symb(S))} of {0, 0} -> qc_join(S); {0, _} -> frequency([{1, qc_join(S)}, - {5, qc_proceed(S)}]); + {3, qc_proceed1(S)}, + {5, qc_proceed2(S)}]); _ -> frequency([{1, qc_join(S)}, {1, qc_leave(S)}, {10, qc_send(S)}, - {20, qc_proceed(S)}]) + {5, qc_proceed1(S)}, + {15, qc_proceed2(S)}]) end. -qc_join(_S) -> {call,?MODULE,do_join, []}. -qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}. -qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}. -qc_proceed(S) -> {call,?MODULE,do_proceed, [oneof(gms_symb(S)), - oneof(gms_symb(S))]}. +qc_join(_S) -> {call,?MODULE,do_join, []}. +qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}. +qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}. +qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}. +qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)), + oneof(gms_symb(S))]}. precondition(S, {call, ?MODULE, do_join, []}) -> length(gms_symb(S)) < ?MAX_SIZE; @@ -137,7 +143,10 @@ precondition(_S, {call, ?MODULE, do_leave, [_GM]}) -> precondition(_S, {call, ?MODULE, do_send, [_GM]}) -> true; -precondition(_S, {call, ?MODULE, do_proceed, [GM1, GM2]}) -> +precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) -> + true; + +precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) -> GM1 =/= GM2. postcondition(S, {call, ?MODULE, do_join, []}, _GM) -> @@ -189,19 +198,23 @@ next_state(S = #state{seq = Seq, S end; -next_state(S = #state{instrumented = Msgs}, _Res, - {call, ?MODULE, do_proceed, [From, To]}) -> - Msgs1 = case dict:find({From, To}, Msgs) of - {ok, Q} -> case queue:out(Q) of - {{value, Thing}, Q2} -> - process_msg(From, To, Thing), - dict:store({From, To}, Q2, Msgs); - {empty, _} -> - Msgs - end; - error -> Msgs - end, - S#state{instrumented = Msgs1}. +next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) -> + proceed(Pid, S); + +next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) -> + proceed({From, To}, S). + +proceed(K, S = #state{instrumented = Msgs}) -> + case dict:find(K, Msgs) of + {ok, Q} -> case queue:out(Q) of + {{value, Thing}, Q2} -> + S2 = process_msg(K, Thing, S), + S2#state{instrumented = dict:store(K, Q2, Msgs)}; + {empty, _} -> + S + end; + error -> S + end. %% --------------------------------------------------------------------------- %% GM @@ -222,7 +235,7 @@ terminate(Pid, _Reason) -> Pid ! {left, self()}. do_join() -> {ok, GM} = gm:start_link(?GROUP, ?MODULE, self(), - fun rabbit_misc:execute_mnesia_transaction/1), + fun execute_mnesia_transaction/1), GM. do_leave(GM) -> @@ -232,7 +245,10 @@ do_leave(GM) -> do_send( _GM) -> ok. %% Do the work in next_state -do_proceed(_From, _To) -> +do_proceed1(_Pid) -> + ok. %% Do the work in next_state + +do_proceed2(_From, _To) -> ok. %% Do the work in next_state %% await_death(GM, ToDie, 0) -> @@ -260,18 +276,21 @@ gms_symb_not_left(#state{all_join = AllJoin, drain(S) -> receive Msg -> drain(handle_msg(Msg, S)) - after 0 -> S + after 10 -> S end. drain_proceeding(S0) -> S = #state{instrumented = Msgs} = drain(S0), case dict:size(Msgs) of 0 -> S; - _ -> _ = dict:map(fun ({From, To}, Q) -> - [process_msg(From, To, Msg) || - Msg <- queue:to_list(Q)] - end, Msgs), - drain_proceeding(S#state{instrumented = dict:new()}) + _ -> S1 = dict:fold( + fun (Key, Q, Si) -> + lists:foldl( + fun (Msg, Sij) -> + process_msg(Key, Msg, Sij) + end, Si, queue:to_list(Q)) + end, S, Msgs), + drain_proceeding(S1#state{instrumented = dict:new()}) end. handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) -> @@ -291,12 +310,12 @@ handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) -> %% Message from GM that has already died. OK. S end; -handle_msg({instrumented, From, To, Thing}, S = #state{instrumented = Msgs}) -> - Q1 = case dict:find({From, To}, Msgs) of +handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) -> + Q1 = case dict:find(Key, Msgs) of {ok, Q} -> queue:in(Thing, Q); error -> queue:from_list([Thing]) end, - S#state{instrumented = dict:store({From, To}, Q1, Msgs)}; + S#state{instrumented = dict:store(Key, Q1, Msgs)}; handle_msg({joined, GM}, S = #state{outstanding = Outstanding, to_join = ToJoin}) -> S#state{outstanding = dict:store(GM, {gb_trees:empty(), gb_sets:empty()}, @@ -307,14 +326,29 @@ handle_msg({left, GM}, S = #state{outstanding = Outstanding, true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin), S#state{outstanding = dict:erase(GM, Outstanding), to_join = sets:del_element(GM, ToJoin)}; +handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) -> + To = dict:fetch(MRef, Mons), + handle_msg({instrumented, {From, To}, {info, Msg}}, + S#state{monitors = dict:erase(MRef, Mons)}); handle_msg({'EXIT', _From, normal}, S) -> S; handle_msg({'EXIT', _From, Reason}, _S) -> %% We just trapped exits to get nicer SASL logging. exit(Reason). -process_msg(From, _To, {call, Ref}) -> From ! {proceed, Ref}; -process_msg(_From, To, {cast, Msg}) -> gen_server2:cast(To, Msg). +process_msg({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S; +process_msg({_From, To}, {info, Msg}, S) -> To ! Msg, S; +process_msg({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S; +process_msg({From, To}, {mon, Ref}, S) -> do_monitor(From, To, Ref, S); +process_msg(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S; +process_msg(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S. + +%% NB From here is To in handle_msg/DOWN above, since the msg is going +%% the other way +do_monitor(From, To, Ref, S = #state{monitors = Mons}) -> + MRef = erlang:monitor(process, To), + From ! {mref, Ref, MRef}, + S#state{monitors = dict:store(MRef, From, Mons)}. assert(S = #state{outstanding = Outstanding}) -> TS = timestamp(), @@ -354,16 +388,35 @@ outstanding_msgs(#state{outstanding = Outstanding}) -> call(Pid, Msg, infinity) -> Ref = make_ref(), - whereis(?MODULE) ! {instrumented, self(), Pid, {call, Ref}}, + whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}}, receive {proceed, Ref} -> ok end, gen_server2:call(Pid, Msg, infinity). cast(Pid, Msg) -> - whereis(?MODULE) ! {instrumented, self(), Pid, {cast, Msg}}, + whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}}, ok. +monitor(Pid) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}}, + receive + {mref, Ref, MRef} -> MRef + end. + +demonitor(MRef) -> + whereis(?MODULE) ! {instrumented, self(), {demon, MRef}}, + true. + +execute_mnesia_transaction(Fun) -> + Ref = make_ref(), + whereis(?MODULE) ! {instrumented, self(), {wait, Ref}}, + receive + {proceed, Ref} -> ok + end, + rabbit_misc:execute_mnesia_transaction(Fun). + timestamp() -> timer:now_diff(os:timestamp(), {0, 0, 0}). -else. |
