diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 113 |
1 files changed, 62 insertions, 51 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index d358a041ef..1203fb454b 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -36,19 +36,21 @@ -export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]). -record(state, {bqstate, - len, %% int - messages, %% queue of {msg_props, basic_msg} - acks, %% dict of acktag => {msg_props, basic_msg} - confirms}). %% set of msgid + len, %% int + next_seq_id, %% int + messages, %% gb_trees of seqid => {msg_props, basic_msg} + acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}] + confirms}). %% set of msgid %% Initialise model initial_state() -> - #state{bqstate = qc_variable_queue_init(qc_test_queue()), - len = 0, - messages = queue:new(), - acks = orddict:new(), - confirms = gb_sets:new()}. + #state{bqstate = qc_variable_queue_init(qc_test_queue()), + len = 0, + next_seq_id = 0, + messages = gb_trees:empty(), + acks = [], + confirms = gb_sets:new()}. %% Property @@ -122,11 +124,11 @@ qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. qc_ack(#state{bqstate = BQ, acks = Acks}) -> - {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}. + {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}. qc_requeue(#state{bqstate = BQ, acks = Acks}) -> {call, ?BQMOD, requeue, - [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. + [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. qc_set_ram_duration_target(#state{bqstate = BQ}) -> {call, ?BQMOD, set_ram_duration_target, @@ -154,10 +156,10 @@ qc_purge(#state{bqstate = BQ}) -> precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg}) when Fun =:= ack; Fun =:= requeue -> - orddict:size(Acks) > 0; + length(Acks) > 0; precondition(#state{messages = Messages}, - {call, ?BQMOD, publish_delivered, _Arg}) -> - queue:is_empty(Messages); + {call, ?BQMOD, publish_delivered, _Arg}) -> + gb_trees:is_empty(Messages); precondition(_S, {call, ?BQMOD, _Fun, _Arg}) -> true; precondition(_S, {call, ?MODULE, timeout, _Arg}) -> @@ -168,14 +170,18 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) -> %% Model updates next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> - #state{len = Len, messages = Messages, confirms = Confirms} = S, + #state{len = Len, + messages = Messages, + confirms = Confirms, + next_seq_id = NextSeq} = S, MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, NeedsConfirm = {call, erlang, element, [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, S#state{bqstate = BQ, len = Len + 1, - messages = queue:in({MsgProps, Msg}, Messages), + next_seq_id = NextSeq + 1, + messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages), confirms = case eval(NeedsConfirm) of true -> gb_sets:add(MsgId, Confirms); _ -> Confirms @@ -183,17 +189,19 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) -> #state{len = Len, messages = Messages} = S, - Messages1 = repeat(Messages, fun(Msgs) -> - queue:in({MsgProps, Msg}, Msgs) - end, Count), - S#state{bqstate = BQ, - len = Len + Count, - messages = Messages1}; + {S1, Msgs1} = repeat({S, Messages}, + fun ({#state{next_seq_id = NextSeq} = State, Msgs}) -> + {State #state { next_seq_id = NextSeq + 1}, + gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)} + end, Count), + S1#state{bqstate = BQ, + len = Len + Count, + messages = Msgs1}; next_state(S, Res, {call, ?BQMOD, publish_delivered, [AckReq, Msg, MsgProps, _Pid, _BQ]}) -> - #state{confirms = Confirms, acks = Acks} = S, + #state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S, AckTag = {call, erlang, element, [1, Res]}, BQ1 = {call, erlang, element, [2, Res]}, MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, @@ -201,12 +209,13 @@ next_state(S, Res, {call, erlang, element, [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, S#state{bqstate = BQ1, + next_seq_id = NextSeq + 1, confirms = case eval(NeedsConfirm) of true -> gb_sets:add(MsgId, Confirms); _ -> Confirms end, acks = case AckReq of - true -> orddict:append(AckTag, {MsgProps, Msg}, Acks); + true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks]; false -> Acks end }; @@ -217,34 +226,36 @@ next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> BQ1 = {call, erlang, element, [2, Res]}, AckTag = {call, erlang, element, [3, ResultInfo]}, S1 = S#state{bqstate = BQ1}, - case queue:out(Messages) of - {empty, _M2} -> - S1; - {{value, MsgProp_Msg}, M2} -> - S2 = S1#state{len = Len - 1, messages = M2}, - case AckReq of - true -> - S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)}; - false -> - S2 - end + case gb_trees:is_empty(Messages) of + true -> S1; + false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages), + S2 = S1#state{len = Len - 1, messages = M2}, + case AckReq of + true -> + S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]}; + false -> + S2 + end end; next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> #state{acks = AcksState} = S, BQ1 = {call, erlang, element, [2, Res]}, S#state{bqstate = BQ1, - acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)}; + acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)}; next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) -> - #state{len = Len, messages = Messages, acks = AcksState} = S, + #state{messages = Messages, acks = AcksState} = S, BQ1 = {call, erlang, element, [2, Res]}, - RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) || - Key <- AcksArg]), + Messages1 = lists:foldl(fun (AckTag, Msgs) -> + {SeqId, MsgPropsMsg} = + proplists:get_value(AckTag, AcksState), + gb_trees:insert(SeqId, MsgPropsMsg, Msgs) + end, Messages, AcksArg), S#state{bqstate = BQ1, - len = Len + length(RequeueMsgs), - messages = queue:join(Messages, queue:from_list(RequeueMsgs)), - acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)}; + len = gb_trees:size(Messages1), + messages = Messages1, + acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)}; next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) -> S#state{bqstate = BQ}; @@ -259,8 +270,8 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) -> #state{messages = Messages} = S, - Messages1 = drop_messages(Messages), - S#state{bqstate = BQ1, len = queue:len(Messages1), messages = Messages1}; + Msgs1 = drop_messages(Messages), + S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1}; next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) -> S; @@ -270,7 +281,7 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) -> next_state(S, Res, {call, ?BQMOD, purge, _Args}) -> BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1, len = 0, messages = queue:new()}. + S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}. %% Postconditions @@ -278,9 +289,9 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, case Res of {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} -> - {_MsgProps, Msg} = queue:head(Messages), + {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), MsgFetched =:= Msg andalso - not orddict:is_key(AckTag, Acks) andalso + not proplists:is_defined(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms) andalso RemainingLen =:= Len - 1; {empty, _BQ} -> @@ -289,7 +300,7 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) -> #state{acks = Acks, confirms = Confrms} = S, - not orddict:is_key(AckTag, Acks) andalso + not proplists:is_defined(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms); postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) -> @@ -378,10 +389,10 @@ dropfun(Props) -> Expiry =/= 0. drop_messages(Messages) -> - case queue:out(Messages) of - {empty, _} -> + case gb_trees:is_empty(Messages) of + true -> Messages; - {{value, MsgProps_Msg}, M2} -> + false -> {_Seq, MsgProps_Msg, M2} = gb_trees:take_smallest(Messages), MsgProps = {call, erlang, element, [1, MsgProps_Msg]}, case dropfun(MsgProps) of true -> drop_messages(M2); |
