summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_backing_queue_qc.erl49
1 files changed, 44 insertions, 5 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index c0938be331..94e1ef3a04 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -78,12 +78,14 @@ prop_backing_queue_test() ->
command(S) ->
?SIZED(Size,
frequency([{Size, qc_publish(S)},
+ {Size, qc_publish_delivered(S)},
{Size, qc_fetch(S)},
{Size, qc_ack(S)},
{Size, qc_requeue(S)},
{Size, qc_ram(S)},
{Size, qc_drain_confirmed(S)},
{Size, qc_dropwhile(S)},
+ {Size, qc_is_empty(S)},
{1, qc_purge(S)}])).
qc_publish(#state{bqstate = BQ}) ->
@@ -94,6 +96,10 @@ qc_publish(#state{bqstate = BQ}) ->
expiry = choose(0, 10)},
self(), BQ]}.
+qc_publish_delivered(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, publish_delivered,
+ [boolean(), qc_message(), #message_properties{}, self(), BQ]}.
+
qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
@@ -113,6 +119,9 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
qc_dropwhile(#state{bqstate = BQ}) ->
{call, ?BQMOD, dropwhile, [fun dropfun/1, BQ]}.
+qc_is_empty(#state{bqstate = BQ}) ->
+ {call, ?BQMOD, is_empty, [BQ]}.
+
qc_purge(#state{bqstate = BQ}) ->
{call, ?BQMOD, purge, [BQ]}.
@@ -121,6 +130,9 @@ qc_purge(#state{bqstate = BQ}) ->
precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
when Fun =:= ack; Fun =:= requeue ->
length(Acks) > 0;
+precondition(#state{messages = Messages},
+ {call, ?BQMOD, publish_delivered, _Arg}) ->
+ queue:is_empty(Messages);
precondition(_S, {call, ?BQMOD, _Fun, _Arg}) ->
true.
@@ -132,13 +144,33 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
NeedsConfirm =
{call, erlang, element,
[?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
- Confirms1 = case eval(NeedsConfirm) of
- true -> gb_sets:add(MsgId, Confirms);
- _ -> Confirms
- end,
S#state{bqstate = BQ,
messages = queue:in({MsgProps, Msg}, Messages),
- confirms = Confirms1};
+ confirms = Confirms1 = case eval(NeedsConfirm) of
+ true -> gb_sets:add(MsgId, Confirms);
+ _ -> Confirms
+ end};
+
+next_state(S, Res,
+ {call, ?BQMOD, publish_delivered,
+ [AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
+ #state{confirms = Confirms, acks = Acks} = S,
+ AckTag = {call, erlang, element, [1, Res]},
+ BQ1 = {call, erlang, element, [2, Res]},
+ MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
+ NeedsConfirm =
+ {call, erlang, element,
+ [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
+ S#state{bqstate = BQ1,
+ confirms = case eval(NeedsConfirm) of
+ true -> gb_sets:add(MsgId, Confirms);
+ _ -> Confirms
+ end,
+ acks = Acks ++ case AckReq of
+ true -> [{AckTag, {MsgProps, Msg}}];
+ false -> []
+ end
+ };
next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
#state{messages = Messages, acks = Acks} = S,
@@ -182,6 +214,9 @@ next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
#state{messages = Messages} = S,
S#state{bqstate = BQ1, messages = drop_messages(Messages)};
+next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
+ S;
+
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
S#state{bqstate = BQ1, messages = queue:new()}.
@@ -201,6 +236,10 @@ postcondition(#state{messages = Messages}, {call, ?BQMOD, purge, _Args}, Res) ->
{PurgeCount, _BQ} = Res,
queue:len(Messages) =:= PurgeCount;
+postcondition(#state{messages = Messages},
+ {call, ?BQMOD, is_empty, _Args}, Res) ->
+ (queue:len(Messages) =:= 0) =:= Res;
+
postcondition(S, {call, ?BQMOD, drain_confirmed, _Args}, Res) ->
#state{confirms = Confirms} = S,
{ReportedConfirmed, _BQ} = Res,