summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-06-23 19:53:42 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-06-23 19:53:42 +0100
commit93f42fbe153e37851c5cd4014e1d752734a4317d (patch)
tree1eb6f3452332225ed6d8d82be0074ca6dd1ee213
parent16c680b78c0e6f02cf7b06b37cd762216beb4327 (diff)
downloadrabbitmq-server-git-93f42fbe153e37851c5cd4014e1d752734a4317d.tar.gz
Add instrumenting for monitors and Mnesia transactions. It actually all seems to work now.
-rw-r--r--Makefile2
-rw-r--r--src/gm.erl16
-rw-r--r--src/gm_qc.erl143
3 files changed, 113 insertions, 48 deletions
diff --git a/Makefile b/Makefile
index 7bfdf44ebb..f3ad1c27ec 100644
--- a/Makefile
+++ b/Makefile
@@ -59,7 +59,7 @@ ERLC_OPTS=-I $(INCLUDE_DIR) -o $(EBIN_DIR) -Wall -v +debug_info $(call boolean_m
ifdef INSTRUMENT_FOR_QC
ERLC_OPTS += -DINSTR_MOD=gm_qc
else
-ERLC_OPTS += -DINSTR_MOD=gen_server2
+ERLC_OPTS += -DINSTR_MOD=gm
endif
include version.mk
diff --git a/src/gm.erl b/src/gm.erl
index fd8da7da27..7790b8d0a4 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -388,6 +388,9 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/3]).
+%% For INSTR_MOD callbacks
+-export([call/3, cast/2, monitor/1, demonitor/1]).
+
-ifndef(use_specs).
-export([behaviour_info/1]).
-endif.
@@ -1192,7 +1195,7 @@ ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
- true = erlang:demonitor(MRef),
+ true = ?INSTR_MOD:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
ok = neighbour_cast(RealNeighbour, Msg),
ok = case Neighbour of
@@ -1202,7 +1205,7 @@ ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
{Neighbour, maybe_monitor(Neighbour, Self)}.
maybe_monitor( Self, Self) -> undefined;
-maybe_monitor(Other, _Self) -> erlang:monitor(process, get_pid(Other)).
+maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
@@ -1461,3 +1464,12 @@ last_pub( [], LP) -> LP;
last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
true = PubNum > LP, %% ASSERTION
PubNum.
+
+%% ---------------------------------------------------------------------------
+
+%% Uninstrumented versions
+
+call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout).
+cast(Pid, Msg) -> gen_server2:cast(Pid, Msg).
+monitor(Pid) -> erlang:monitor(process, Pid).
+demonitor(MRef) -> erlang:demonitor(MRef).
diff --git a/src/gm_qc.erl b/src/gm_qc.erl
index 7760605aaf..4b956196b0 100644
--- a/src/gm_qc.erl
+++ b/src/gm_qc.erl
@@ -33,14 +33,15 @@
-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
%% Helpers
--export([do_join/0, do_leave/1, do_send/1, do_proceed/2]).
+-export([do_join/0, do_leave/1, do_send/1, do_proceed1/1, do_proceed2/2]).
%% For insertion into gm
--export([call/3, cast/2]).
+-export([call/3, cast/2, monitor/1, demonitor/1, execute_mnesia_transaction/1]).
-record(state, {seq, %% symbolic and dynamic
instrumented, %% dynamic only
outstanding, %% dynamic only
+ monitors, %% dynamic only
all_join, %% for symbolic
to_join, %% dynamic only
to_leave %% for symbolic
@@ -68,6 +69,7 @@ cleanup(S) ->
All = gms(S2), %% assertion - none to join
check_stale_members(All),
[gm:leave(GM) || GM <- All],
+ drain_proceeding(S2),
[await_death(GM) || GM <- All],
gm:forget_group(?GROUP),
ok.
@@ -91,13 +93,13 @@ await_death(P) ->
await_death(MRef, P) ->
receive
- {'DOWN', MRef, process, P, _} -> ok;
- {'EXIT', _, normal} -> await_death(MRef, P);
- {'EXIT', _, Reason} -> exit(Reason);
- {instrumented, From, To, Thing} -> process_msg(From, To, Thing),
- await_death(MRef, P);
- {joined, _GM} -> await_death(MRef, P);
- {left, _GM} -> await_death(MRef, P)
+ {'DOWN', MRef, process, P, _} -> ok;
+ {'DOWN', _, _, _, _} -> await_death(MRef, P);
+ {'EXIT', _, normal} -> await_death(MRef, P);
+ {'EXIT', _, Reason} -> exit(Reason);
+ {joined, _GM} -> await_death(MRef, P);
+ {left, _GM} -> await_death(MRef, P);
+ Anything -> exit({stray_msg, Anything})
end.
%% ---------------------------------------------------------------------------
@@ -107,6 +109,7 @@ await_death(MRef, P) ->
initial_state() -> #state{seq = 1,
outstanding = dict:new(),
instrumented = dict:new(),
+ monitors = dict:new(),
all_join = sets:new(),
to_join = sets:new(),
to_leave = sets:new()}.
@@ -115,18 +118,21 @@ command(S) ->
case {length(gms_symb_not_left(S)), length(gms_symb(S))} of
{0, 0} -> qc_join(S);
{0, _} -> frequency([{1, qc_join(S)},
- {5, qc_proceed(S)}]);
+ {3, qc_proceed1(S)},
+ {5, qc_proceed2(S)}]);
_ -> frequency([{1, qc_join(S)},
{1, qc_leave(S)},
{10, qc_send(S)},
- {20, qc_proceed(S)}])
+ {5, qc_proceed1(S)},
+ {15, qc_proceed2(S)}])
end.
-qc_join(_S) -> {call,?MODULE,do_join, []}.
-qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}.
-qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}.
-qc_proceed(S) -> {call,?MODULE,do_proceed, [oneof(gms_symb(S)),
- oneof(gms_symb(S))]}.
+qc_join(_S) -> {call,?MODULE,do_join, []}.
+qc_leave(S) -> {call,?MODULE,do_leave,[oneof(gms_symb_not_left(S))]}.
+qc_send(S) -> {call,?MODULE,do_send, [oneof(gms_symb_not_left(S))]}.
+qc_proceed1(S) -> {call,?MODULE,do_proceed1, [oneof(gms_symb(S))]}.
+qc_proceed2(S) -> {call,?MODULE,do_proceed2, [oneof(gms_symb(S)),
+ oneof(gms_symb(S))]}.
precondition(S, {call, ?MODULE, do_join, []}) ->
length(gms_symb(S)) < ?MAX_SIZE;
@@ -137,7 +143,10 @@ precondition(_S, {call, ?MODULE, do_leave, [_GM]}) ->
precondition(_S, {call, ?MODULE, do_send, [_GM]}) ->
true;
-precondition(_S, {call, ?MODULE, do_proceed, [GM1, GM2]}) ->
+precondition(_S, {call, ?MODULE, do_proceed1, [_GM]}) ->
+ true;
+
+precondition(_S, {call, ?MODULE, do_proceed2, [GM1, GM2]}) ->
GM1 =/= GM2.
postcondition(S, {call, ?MODULE, do_join, []}, _GM) ->
@@ -189,19 +198,23 @@ next_state(S = #state{seq = Seq,
S
end;
-next_state(S = #state{instrumented = Msgs}, _Res,
- {call, ?MODULE, do_proceed, [From, To]}) ->
- Msgs1 = case dict:find({From, To}, Msgs) of
- {ok, Q} -> case queue:out(Q) of
- {{value, Thing}, Q2} ->
- process_msg(From, To, Thing),
- dict:store({From, To}, Q2, Msgs);
- {empty, _} ->
- Msgs
- end;
- error -> Msgs
- end,
- S#state{instrumented = Msgs1}.
+next_state(S, _Res, {call, ?MODULE, do_proceed1, [Pid]}) ->
+ proceed(Pid, S);
+
+next_state(S, _Res, {call, ?MODULE, do_proceed2, [From, To]}) ->
+ proceed({From, To}, S).
+
+proceed(K, S = #state{instrumented = Msgs}) ->
+ case dict:find(K, Msgs) of
+ {ok, Q} -> case queue:out(Q) of
+ {{value, Thing}, Q2} ->
+ S2 = process_msg(K, Thing, S),
+ S2#state{instrumented = dict:store(K, Q2, Msgs)};
+ {empty, _} ->
+ S
+ end;
+ error -> S
+ end.
%% ---------------------------------------------------------------------------
%% GM
@@ -222,7 +235,7 @@ terminate(Pid, _Reason) -> Pid ! {left, self()}.
do_join() ->
{ok, GM} = gm:start_link(?GROUP, ?MODULE, self(),
- fun rabbit_misc:execute_mnesia_transaction/1),
+ fun execute_mnesia_transaction/1),
GM.
do_leave(GM) ->
@@ -232,7 +245,10 @@ do_leave(GM) ->
do_send( _GM) ->
ok. %% Do the work in next_state
-do_proceed(_From, _To) ->
+do_proceed1(_Pid) ->
+ ok. %% Do the work in next_state
+
+do_proceed2(_From, _To) ->
ok. %% Do the work in next_state
%% await_death(GM, ToDie, 0) ->
@@ -260,18 +276,21 @@ gms_symb_not_left(#state{all_join = AllJoin,
drain(S) ->
receive
Msg -> drain(handle_msg(Msg, S))
- after 0 -> S
+ after 10 -> S
end.
drain_proceeding(S0) ->
S = #state{instrumented = Msgs} = drain(S0),
case dict:size(Msgs) of
0 -> S;
- _ -> _ = dict:map(fun ({From, To}, Q) ->
- [process_msg(From, To, Msg) ||
- Msg <- queue:to_list(Q)]
- end, Msgs),
- drain_proceeding(S#state{instrumented = dict:new()})
+ _ -> S1 = dict:fold(
+ fun (Key, Q, Si) ->
+ lists:foldl(
+ fun (Msg, Sij) ->
+ process_msg(Key, Msg, Sij)
+ end, Si, queue:to_list(Q))
+ end, S, Msgs),
+ drain_proceeding(S1#state{instrumented = dict:new()})
end.
handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) ->
@@ -291,12 +310,12 @@ handle_msg({gm, GM, Msg}, S = #state{outstanding = Outstanding}) ->
%% Message from GM that has already died. OK.
S
end;
-handle_msg({instrumented, From, To, Thing}, S = #state{instrumented = Msgs}) ->
- Q1 = case dict:find({From, To}, Msgs) of
+handle_msg({instrumented, Key, Thing}, S = #state{instrumented = Msgs}) ->
+ Q1 = case dict:find(Key, Msgs) of
{ok, Q} -> queue:in(Thing, Q);
error -> queue:from_list([Thing])
end,
- S#state{instrumented = dict:store({From, To}, Q1, Msgs)};
+ S#state{instrumented = dict:store(Key, Q1, Msgs)};
handle_msg({joined, GM}, S = #state{outstanding = Outstanding,
to_join = ToJoin}) ->
S#state{outstanding = dict:store(GM, {gb_trees:empty(), gb_sets:empty()},
@@ -307,14 +326,29 @@ handle_msg({left, GM}, S = #state{outstanding = Outstanding,
true = dict:is_key(GM, Outstanding) orelse sets:is_element(GM, ToJoin),
S#state{outstanding = dict:erase(GM, Outstanding),
to_join = sets:del_element(GM, ToJoin)};
+handle_msg({'DOWN', MRef, _, From, _} = Msg, S = #state{monitors = Mons}) ->
+ To = dict:fetch(MRef, Mons),
+ handle_msg({instrumented, {From, To}, {info, Msg}},
+ S#state{monitors = dict:erase(MRef, Mons)});
handle_msg({'EXIT', _From, normal}, S) ->
S;
handle_msg({'EXIT', _From, Reason}, _S) ->
%% We just trapped exits to get nicer SASL logging.
exit(Reason).
-process_msg(From, _To, {call, Ref}) -> From ! {proceed, Ref};
-process_msg(_From, To, {cast, Msg}) -> gen_server2:cast(To, Msg).
+process_msg({_From, To}, {cast, Msg}, S) -> gen_server2:cast(To, Msg), S;
+process_msg({_From, To}, {info, Msg}, S) -> To ! Msg, S;
+process_msg({From, _To}, {wait, Ref}, S) -> From ! {proceed, Ref}, S;
+process_msg({From, To}, {mon, Ref}, S) -> do_monitor(From, To, Ref, S);
+process_msg(_Pid, {demon, MRef}, S) -> erlang:demonitor(MRef), S;
+process_msg(Pid, {wait, Ref}, S) -> Pid ! {proceed, Ref}, S.
+
+%% NB From here is To in handle_msg/DOWN above, since the msg is going
+%% the other way
+do_monitor(From, To, Ref, S = #state{monitors = Mons}) ->
+ MRef = erlang:monitor(process, To),
+ From ! {mref, Ref, MRef},
+ S#state{monitors = dict:store(MRef, From, Mons)}.
assert(S = #state{outstanding = Outstanding}) ->
TS = timestamp(),
@@ -354,16 +388,35 @@ outstanding_msgs(#state{outstanding = Outstanding}) ->
call(Pid, Msg, infinity) ->
Ref = make_ref(),
- whereis(?MODULE) ! {instrumented, self(), Pid, {call, Ref}},
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {wait, Ref}},
receive
{proceed, Ref} -> ok
end,
gen_server2:call(Pid, Msg, infinity).
cast(Pid, Msg) ->
- whereis(?MODULE) ! {instrumented, self(), Pid, {cast, Msg}},
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {cast, Msg}},
ok.
+monitor(Pid) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, {self(), Pid}, {mon, Ref}},
+ receive
+ {mref, Ref, MRef} -> MRef
+ end.
+
+demonitor(MRef) ->
+ whereis(?MODULE) ! {instrumented, self(), {demon, MRef}},
+ true.
+
+execute_mnesia_transaction(Fun) ->
+ Ref = make_ref(),
+ whereis(?MODULE) ! {instrumented, self(), {wait, Ref}},
+ receive
+ {proceed, Ref} -> ok
+ end,
+ rabbit_misc:execute_mnesia_transaction(Fun).
+
timestamp() -> timer:now_diff(os:timestamp(), {0, 0, 0}).
-else.