summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-08-15 13:05:48 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-08-15 13:05:48 +0100
commit43b0f15fc6ade39b98c31298fa9393ce5142b4af (patch)
tree3ec5bf712f2d8c7c9a66744926b1edb7790e64fa
parent0f883e515008a52dd6ebf61eb428632c581d1423 (diff)
downloadrabbitmq-server-git-43b0f15fc6ade39b98c31298fa9393ce5142b4af.tar.gz
Update property for requeue order
Store sequence id with message to maintain ordering Record acks in a list - complex data structures suffer from corruption when contained symbolic values are evaluated
-rw-r--r--src/rabbit_backing_queue_qc.erl113
1 files changed, 62 insertions, 51 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index d358a041ef..1203fb454b 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -36,19 +36,21 @@
-export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]).
-record(state, {bqstate,
- len, %% int
- messages, %% queue of {msg_props, basic_msg}
- acks, %% dict of acktag => {msg_props, basic_msg}
- confirms}). %% set of msgid
+ len, %% int
+ next_seq_id, %% int
+ messages, %% gb_trees of seqid => {msg_props, basic_msg}
+ acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}]
+ confirms}). %% set of msgid
%% Initialise model
initial_state() ->
- #state{bqstate = qc_variable_queue_init(qc_test_queue()),
- len = 0,
- messages = queue:new(),
- acks = orddict:new(),
- confirms = gb_sets:new()}.
+ #state{bqstate = qc_variable_queue_init(qc_test_queue()),
+ len = 0,
+ next_seq_id = 0,
+ messages = gb_trees:empty(),
+ acks = [],
+ confirms = gb_sets:new()}.
%% Property
@@ -122,11 +124,11 @@ qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
- {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}.
+ {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, requeue,
- [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+ [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
qc_set_ram_duration_target(#state{bqstate = BQ}) ->
{call, ?BQMOD, set_ram_duration_target,
@@ -154,10 +156,10 @@ qc_purge(#state{bqstate = BQ}) ->
precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
when Fun =:= ack; Fun =:= requeue ->
- orddict:size(Acks) > 0;
+ length(Acks) > 0;
precondition(#state{messages = Messages},
- {call, ?BQMOD, publish_delivered, _Arg}) ->
- queue:is_empty(Messages);
+ {call, ?BQMOD, publish_delivered, _Arg}) ->
+ gb_trees:is_empty(Messages);
precondition(_S, {call, ?BQMOD, _Fun, _Arg}) ->
true;
precondition(_S, {call, ?MODULE, timeout, _Arg}) ->
@@ -168,14 +170,18 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
%% Model updates
next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
- #state{len = Len, messages = Messages, confirms = Confirms} = S,
+ #state{len = Len,
+ messages = Messages,
+ confirms = Confirms,
+ next_seq_id = NextSeq} = S,
MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
NeedsConfirm =
{call, erlang, element,
[?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
S#state{bqstate = BQ,
len = Len + 1,
- messages = queue:in({MsgProps, Msg}, Messages),
+ next_seq_id = NextSeq + 1,
+ messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages),
confirms = case eval(NeedsConfirm) of
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
@@ -183,17 +189,19 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) ->
#state{len = Len, messages = Messages} = S,
- Messages1 = repeat(Messages, fun(Msgs) ->
- queue:in({MsgProps, Msg}, Msgs)
- end, Count),
- S#state{bqstate = BQ,
- len = Len + Count,
- messages = Messages1};
+ {S1, Msgs1} = repeat({S, Messages},
+ fun ({#state{next_seq_id = NextSeq} = State, Msgs}) ->
+ {State #state { next_seq_id = NextSeq + 1},
+ gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)}
+ end, Count),
+ S1#state{bqstate = BQ,
+ len = Len + Count,
+ messages = Msgs1};
next_state(S, Res,
{call, ?BQMOD, publish_delivered,
[AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
- #state{confirms = Confirms, acks = Acks} = S,
+ #state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S,
AckTag = {call, erlang, element, [1, Res]},
BQ1 = {call, erlang, element, [2, Res]},
MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
@@ -201,12 +209,13 @@ next_state(S, Res,
{call, erlang, element,
[?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
S#state{bqstate = BQ1,
+ next_seq_id = NextSeq + 1,
confirms = case eval(NeedsConfirm) of
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
end,
acks = case AckReq of
- true -> orddict:append(AckTag, {MsgProps, Msg}, Acks);
+ true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks];
false -> Acks
end
};
@@ -217,34 +226,36 @@ next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
BQ1 = {call, erlang, element, [2, Res]},
AckTag = {call, erlang, element, [3, ResultInfo]},
S1 = S#state{bqstate = BQ1},
- case queue:out(Messages) of
- {empty, _M2} ->
- S1;
- {{value, MsgProp_Msg}, M2} ->
- S2 = S1#state{len = Len - 1, messages = M2},
- case AckReq of
- true ->
- S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)};
- false ->
- S2
- end
+ case gb_trees:is_empty(Messages) of
+ true -> S1;
+ false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
+ false ->
+ S2
+ end
end;
next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
S#state{bqstate = BQ1,
- acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+ acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)};
next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
- #state{len = Len, messages = Messages, acks = AcksState} = S,
+ #state{messages = Messages, acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
- RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) ||
- Key <- AcksArg]),
+ Messages1 = lists:foldl(fun (AckTag, Msgs) ->
+ {SeqId, MsgPropsMsg} =
+ proplists:get_value(AckTag, AcksState),
+ gb_trees:insert(SeqId, MsgPropsMsg, Msgs)
+ end, Messages, AcksArg),
S#state{bqstate = BQ1,
- len = Len + length(RequeueMsgs),
- messages = queue:join(Messages, queue:from_list(RequeueMsgs)),
- acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+ len = gb_trees:size(Messages1),
+ messages = Messages1,
+ acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)};
next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) ->
S#state{bqstate = BQ};
@@ -259,8 +270,8 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
#state{messages = Messages} = S,
- Messages1 = drop_messages(Messages),
- S#state{bqstate = BQ1, len = queue:len(Messages1), messages = Messages1};
+ Msgs1 = drop_messages(Messages),
+ S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1};
next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
S;
@@ -270,7 +281,7 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = BQ1, len = 0, messages = queue:new()}.
+ S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
%% Postconditions
@@ -278,9 +289,9 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
{{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
- {_MsgProps, Msg} = queue:head(Messages),
+ {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgFetched =:= Msg andalso
- not orddict:is_key(AckTag, Acks) andalso
+ not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
RemainingLen =:= Len - 1;
{empty, _BQ} ->
@@ -289,7 +300,7 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) ->
#state{acks = Acks, confirms = Confrms} = S,
- not orddict:is_key(AckTag, Acks) andalso
+ not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms);
postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) ->
@@ -378,10 +389,10 @@ dropfun(Props) ->
Expiry =/= 0.
drop_messages(Messages) ->
- case queue:out(Messages) of
- {empty, _} ->
+ case gb_trees:is_empty(Messages) of
+ true ->
Messages;
- {{value, MsgProps_Msg}, M2} ->
+ false -> {_Seq, MsgProps_Msg, M2} = gb_trees:take_smallest(Messages),
MsgProps = {call, erlang, element, [1, MsgProps_Msg]},
case dropfun(MsgProps) of
true -> drop_messages(M2);