summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_backing_queue_qc.erl187
1 files changed, 133 insertions, 54 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index f5cd7f95b1..83e7da5ec6 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -23,26 +23,50 @@
-define(BQMOD, rabbit_variable_queue).
+-define(RECORD_INDEX(Key, Record),
+ erlang:element(2, proplists:lookup(Key, lists:zip(
+ record_info(fields, Record), lists:seq(2, record_info(size, Record)))))).
+
-export([initial_state/0, command/1, precondition/2, postcondition/3,
next_state/3]).
-export([prop_backing_queue_test/0]).
-
-record(state, {bqstate,
- messages,
- acks}).
+ messages, %% queue of {msg_props, basic_msg}
+ acks, %% list of {acktag, {message_props, basic_msg}}
+ confirms}). %% set of msgid
+%% Initialise model
initial_state() ->
- VQ = qc_variable_queue_init(qc_test_queue()),
- #state{bqstate=VQ, messages = queue:new(), acks = []}.
+ #state{bqstate = qc_variable_queue_init(qc_test_queue()),
+ messages = queue:new(),
+ acks = [],
+ confirms = gb_sets:new()}.
+
+%% Property
prop_backing_queue_test() ->
?FORALL(Cmds, commands(?MODULE, initial_state()),
begin
- {_H, #state{bqstate = VQ}, Res} = run_commands(?MODULE, Cmds),
- rabbit_variable_queue:delete_and_terminate(shutdown, VQ),
+ {ok, FileSizeLimit} =
+ application:get_env(rabbit, msg_store_file_size_limit),
+ application:set_env(rabbit, msg_store_file_size_limit, 512,
+ infinity),
+ {ok, MaxJournal} =
+ application:get_env(rabbit, queue_index_max_journal_entries),
+ application:set_env(rabbit, queue_index_max_journal_entries, 128,
+ infinity),
+
+ {_H, #state{bqstate = BQ}, Res} = run_commands(?MODULE, Cmds),
+
+ application:set_env(rabbit, msg_store_file_size_limit,
+ FileSizeLimit, infinity),
+ application:set_env(rabbit, queue_index_max_journal_entries,
+ MaxJournal, infinity),
+
+ rabbit_variable_queue:delete_and_terminate(shutdown, BQ),
?WHENFAIL(
io:format("Result: ~p~n", [Res]),
aggregate(command_names(Cmds), Res =:= ok))
@@ -50,32 +74,46 @@ prop_backing_queue_test() ->
%% Commands
-command(#state{bqstate = VQ} = S) ->
+command(S) ->
?SIZED(Size,
- frequency([{Size, qc_publish(S)},
- {Size, qc_fetch(S)},
- {Size, qc_ack(S)},
- {Size, qc_requeue(S)},
- {Size, qc_ram(S)},
- {1, {call, ?BQMOD, purge, [VQ]}}])).
-
-qc_publish(#state{bqstate = VQ}) ->
+ frequency([{Size, qc_publish(S)},
+ {Size, qc_fetch(S)},
+ {Size, qc_ack(S)},
+ {Size, qc_requeue(S)},
+ {Size, qc_ram(S)},
+ {Size, qc_drain_confirmed(S)},
+ {Size, qc_dropwhile(S)},
+ {1, qc_purge(S)}])).
+
+qc_publish(#state{bqstate = BQ}) ->
{call, ?BQMOD, publish,
- [qc_message(), #message_properties{}, self(), VQ]}.
+ [qc_message(),
+ #message_properties{needs_confirming = frequency([{1, true},
+ {20, false}]),
+ expiry = choose(0, 10)},
+ self(), BQ]}.
-qc_fetch(#state{bqstate = VQ}) ->
- {call, ?BQMOD, fetch, [boolean(), VQ]}.
+qc_fetch(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, fetch, [boolean(), BQ]}.
-qc_ack(#state{bqstate = VQ, acks = Acks}) ->
- {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), VQ]}.
+qc_ack(#state{bqstate = BQ, acks = Acks}) ->
+ {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
-qc_requeue(#state{bqstate = VQ, acks = Acks}) ->
+qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, requeue,
- [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, VQ]}.
+ [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+
+qc_ram(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, set_ram_duration_target, [oneof([0, infinity]), BQ]}.
+
+qc_drain_confirmed(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, drain_confirmed, [BQ]}.
-qc_ram(#state{bqstate = VQ}) ->
- {call, ?BQMOD, set_ram_duration_target,
- [oneof([0, infinity]), VQ]}.
+qc_dropwhile(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
+
+qc_purge(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, purge, [BQ]}.
%% Preconditions
@@ -85,68 +123,92 @@ precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
precondition(_S, {call, ?BQMOD, _Fun, _Arg}) ->
true.
-%% Next state
-
-next_state(S, VQ, {call, ?BQMOD, publish, [Msg, _MsgProps, _Pid, _VQ]}) ->
- #state{messages = Messages} = S,
- S#state{bqstate = VQ, messages = queue:in(Msg, Messages)};
-
-next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _VQ]}) ->
+%% Model updates
+
+next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
+ #state{messages = Messages, confirms = Confirms} = S,
+ MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
+ NeedsConfirm =
+ {call, erlang, element,
+ [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
+ Confirms1 = case eval(NeedsConfirm) of
+ true -> gb_sets:add(MsgId, Confirms);
+ _ -> Confirms
+ end,
+ S#state{bqstate = BQ,
+ messages = queue:in({MsgProps, Msg}, Messages),
+ confirms = Confirms1};
+
+next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
#state{messages = Messages, acks = Acks} = S,
ResultInfo = {call, erlang, element, [1, Res]},
- VQ1 = {call, erlang, element, [2, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
AckTag = {call, erlang, element, [3, ResultInfo]},
- S1 = S#state{bqstate = VQ1},
+ S1 = S#state{bqstate = BQ1},
case queue:out(Messages) of
{empty, _M2} ->
S1;
- {{value, Msg}, M2} ->
+ {{value, MsgProp_Msg}, M2} ->
S2 = S1#state{messages = M2},
case AckReq of
- true -> S2#state{acks = Acks ++ [{AckTag, Msg}]};
+ true -> S2#state{acks = Acks ++ [{AckTag, MsgProp_Msg}]};
false -> S2
end
end;
-next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _VQ]}) ->
+next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
- VQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = VQ1,
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1,
acks = propvals_by_keys(AcksState, AcksArg)};
next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
#state{messages = Messages, acks = AcksState} = S,
- VQ1 = {call, erlang, element, [2, Res]},
- RequeueMsgs = [proplists:get_value(Key, AcksState) || Key <- AcksArg ],
- S#state{bqstate = VQ1,
+ BQ1 = {call, erlang, element, [2, Res]},
+ RequeueMsgs = [proplists:get_value(Key, AcksState) || Key <- AcksArg],
+ S#state{bqstate = BQ1,
messages = queue:join(Messages, queue:from_list(RequeueMsgs)),
acks = propvals_by_keys(AcksState, AcksArg)};
-next_state(S, VQ, {call, ?BQMOD, set_ram_duration_target, _A}) ->
- S#state{bqstate = VQ};
+next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) ->
+ S#state{bqstate = BQ};
-next_state(S, Res, {call, ?BQMOD, purge, _A}) ->
- VQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = VQ1, messages = queue:new()}.
+next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1};
+
+next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
+ #state{messages = Messages} = S,
+ S#state{bqstate = BQ1, messages = drop_messages(Messages)};
+
+next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
+ BQ1 = {call, erlang, element, [2, Res]},
+ S#state{bqstate = BQ1, messages = queue:new()}.
%% Postconditions
postcondition(#state{messages = Messages}, {call, ?BQMOD, fetch, _Args}, Res) ->
case Res of
- {{MsgFetched, _IsDelivered, _AckTag, _Remaining_Len}, _VQ} ->
- MsgFetched =:= queue:head(Messages);
- {empty, _VQ} ->
+ {{MsgFetched, _IsDelivered, _AckTag, _Remaining_Len}, _BQ} ->
+ {_MsgProps, Msg} = queue:head(Messages),
+ MsgFetched =:= Msg;
+ {empty, _BQ} ->
queue:len(Messages) =:= 0
end;
postcondition(#state{messages = Messages}, {call, ?BQMOD, purge, _Args}, Res) ->
- {PurgeCount, _VQ} = Res,
+ {PurgeCount, _BQ} = Res,
queue:len(Messages) =:= PurgeCount;
-postcondition(#state{bqstate = VQ,
+postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
+ #state{confirms = Confirms} = S,
+ {ReportedConfirmed, _BQ} = Res,
+ lists:all(fun (M) -> lists:member(M, Confirms) end, ReportedConfirmed);
+
+postcondition(#state{bqstate = BQ,
messages = Messages},
{call, ?BQMOD, _Fun, _Args}, _Res) ->
- ?BQMOD:len(VQ) =:= queue:len(Messages).
+ ?BQMOD:len(BQ) =:= queue:len(Messages).
%% Helpers
@@ -201,3 +263,20 @@ rand_choice(List) ->
[] -> [];
_ -> [lists:nth(random:uniform(length(List)), List)]
end.
+
+dropfun(Props) ->
+ Expiry = eval({call, erlang, element,
+ [?RECORD_INDEX(expiry, message_properties), Props]}),
+ Expiry =/= 0.
+
+drop_messages(Messages) ->
+ case queue:out(Messages) of
+ {empty, _} ->
+ Messages;
+ {{value, MsgProps_Msg}, M2} ->
+ MsgProps = {call, erlang, element, [1, MsgProps_Msg]},
+ case dropfun(MsgProps) of
+ true -> drop_messages(M2);
+ false -> Messages
+ end
+ end.