summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lqueue.erl8
-rw-r--r--src/rabbit_fifo.erl134
-rw-r--r--src/rabbit_fifo_index.erl52
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl348
4 files changed, 468 insertions, 74 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 0652061075..1abe4e0b82 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -21,7 +21,7 @@
%% is an O(1) operation, in contrast with queue:len/1 which is O(n).
-export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2,
- foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]).
+ foldl/3, foldr/3, from_list/1, drop/1, to_list/1, peek/1, peek_r/1]).
-define(QUEUE, queue).
@@ -32,6 +32,7 @@
-type result() :: 'empty' | {'value', value()}.
-spec new() -> ?MODULE().
+-spec drop(?MODULE()) -> ?MODULE().
-spec is_empty(?MODULE()) -> boolean().
-spec len(?MODULE()) -> non_neg_integer().
-spec in(value(), ?MODULE()) -> ?MODULE().
@@ -48,6 +49,8 @@
new() -> {0, ?QUEUE:new()}.
+drop({L, Q}) -> {L - 1, ?QUEUE:drop(Q)}.
+
is_empty({0, _Q}) -> true;
is_empty(_) -> false.
@@ -81,7 +84,8 @@ foldr(Fun, Init, Q) ->
{{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1)
end.
-len({L, _Q}) -> L.
+len({L, _}) -> L.
+
peek({ 0, _Q}) -> empty;
peek({_L, Q}) -> ?QUEUE:peek(Q).
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 740c6e202c..00d0db0b8a 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -163,7 +163,7 @@
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
% this list first before taking low_msg_num
- returns = queue:new() :: queue:queue(msg_in_id()),
+ returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'),
% a counter of enqueues - used to trigger shadow copy points
enqueue_count = 0 :: non_neg_integer(),
% a map containing all the live processes that have ever enqueued
@@ -196,7 +196,8 @@
%% it instead takes messages from the `messages' map.
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msg_count = 0 :: non_neg_integer()
+ prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
+ PrefixMsgs :: non_neg_integer()}
}).
-opaque state() :: #state{}.
@@ -279,7 +280,7 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0,
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
Returned = maps:with(MsgIds, Checked0),
- MsgNumMsgs = [M || M <- maps:values(Returned)],
+ MsgNumMsgs = maps:values(Returned),
return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State);
_ ->
{State, Effects0, ok}
@@ -329,8 +330,8 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
end;
apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0,
#state{messages = M,
- prefix_msg_count = 0} = State0) when map_size(M) == 0 ->
- %% FIX: also check if there are returned messages
+ prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
+ %% FIXME: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
{State0, Effects0, {dequeue, empty}};
apply(Meta, {checkout, {dequeue, settled}, ConsumerId},
@@ -382,7 +383,7 @@ apply(#{index := RaftIdx}, purge, Effects0,
RaftIdx, Indexes,
State1#state{ra_indexes = rabbit_fifo_index:empty(),
messages = #{},
- returns = queue:new(),
+ returns = lqueue:new(),
low_msg_num = undefined}, Effects1),
{State, [garbage_collection | Effects], {purge, Total}};
apply(_, {down, ConsumerPid, noconnection},
@@ -476,7 +477,7 @@ apply(_, {update_state, Conf}, Effects, State) ->
state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
name = Name,
- prefix_msg_count = 0,
+ prefix_msg_counts = {0, 0},
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
@@ -489,10 +490,10 @@ state_enter(leader, #state{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
-state_enter(recovered, #state{prefix_msg_count = PrefixMsgCount})
- when PrefixMsgCount =/= 0 ->
+state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
+ when PrefixMsgCounts =/= {0, 0} ->
%% TODO: remove assertion?
- exit({rabbit_fifo, unexpected_prefix_msg_count, PrefixMsgCount});
+ exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
[{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
@@ -722,9 +723,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
end,
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun('$prefix_msg',
- #state{prefix_msg_count = MsgCount} = S0) ->
- S0#state{prefix_msg_count = MsgCount + 1};
+ State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
+ return_one(0, Msg, S0);
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
end, State0, MsgNumMsgs),
@@ -826,6 +826,9 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
end.
% TODO update message then update messages and returns in single operations
+return_one(0, '$prefix_msg',
+ #state{returns = Returns} = State0) ->
+ State0#state{returns = lqueue:in('$prefix_msg', Returns)};
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
#state{messages = Messages,
returns = Returns} = State0) ->
@@ -835,12 +838,11 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
State0#state{messages = maps:put(MsgNum, Msg, Messages),
- returns = queue:in(MsgNum, Returns)}.
+ returns = lqueue:in(MsgNum, Returns)}.
return_all(State, Checked) ->
- maps:fold(fun (_, '$prefix_msg',
- #state{prefix_msg_count = MsgCount} = S) ->
- S#state{prefix_msg_count = MsgCount + 1};
+ maps:fold(fun (_, '$prefix_msg', S) ->
+ return_one(0, '$prefix_msg', S);
(_, {MsgNum, Msg}, S) ->
return_one(MsgNum, Msg, S)
end, State, Checked).
@@ -869,13 +871,20 @@ append_send_msg_effects(Effects0, AccMap) ->
end, Effects0, AccMap),
[{aux, active} | Effects].
+next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State)
+ when PReturns > 0 ->
+ %% there are prefix returns, these should be served first
+ {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}};
next_checkout_message(#state{returns = Returns,
low_msg_num = Low0,
+ prefix_msg_counts = {R, P},
next_msg_num = NextMsgNum} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
- case queue:peek(Returns) of
- empty ->
+ case lqueue:peek(Returns) of
+ {value, Next} ->
+ {Next, State#state{returns = lqueue:drop(Returns)}};
+ empty when P == 0 ->
case Low0 of
undefined ->
{undefined, State};
@@ -888,25 +897,32 @@ next_checkout_message(#state{returns = Returns,
{Low0, State#state{low_msg_num = Low}}
end
end;
- {value, Next} ->
- {Next, State#state{returns = queue:drop(Returns)}}
+ empty ->
+ %% There are prefix msgs
+ {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}}
end.
-take_next_msg(#state{prefix_msg_count = 0,
- messages = Messages0} = State0) ->
- {NextMsgInId, State} = next_checkout_message(State0),
- %% messages are available
- case maps:take(NextMsgInId, Messages0) of
- {IdxMsg, Messages} ->
- {{NextMsgInId, IdxMsg}, State, Messages, 0};
- error ->
- error
- end;
-take_next_msg(#state{prefix_msg_count = MsgCount,
- messages = Messages} = State) ->
- %% there is still a prefix message count for the consumer
- %% "fake" a '$prefix_msg' message
- {'$prefix_msg', State, Messages, MsgCount - 1}.
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%%
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#state{messages = Messages0} = State0) ->
+ case next_checkout_message(State0) of
+ {'$prefix_msg', State} ->
+ {'$prefix_msg', State, Messages0};
+ {NextMsgInId, State} ->
+ %% messages are available
+ case maps:take(NextMsgInId, Messages0) of
+ {IdxMsg, Messages} ->
+ {{NextMsgInId, IdxMsg}, State, Messages};
+ error ->
+ error
+ end
+ end.
send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
@@ -917,7 +933,7 @@ checkout_one(#state{service_queue = SQ0,
case queue:peek(SQ0) of
{value, ConsumerId} ->
case take_next_msg(InitState) of
- {ConsumerMsg, State0, Messages, PrefMsgC} ->
+ {ConsumerMsg, State0, Messages} ->
SQ1 = queue:drop(SQ0),
%% there are consumers waiting to be serviced
%% process consumer checkout
@@ -943,7 +959,6 @@ checkout_one(#state{service_queue = SQ0,
Cons0, SQ1, []),
State = State0#state{service_queue = SQ,
messages = Messages,
- prefix_msg_count = PrefMsgC,
consumers = Cons},
Msg = case ConsumerMsg of
'$prefix_msg' -> '$prefix_msg';
@@ -1034,19 +1049,26 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
-dehydrate_state(#state{messages = Messages0,
+dehydrate_state(#state{messages = Messages,
consumers = Consumers,
- prefix_msg_count = MsgCount} = State) ->
+ returns = Returns,
+ prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) ->
+ %% TODO: optimise to avoid having to iterate the queue to get the number
+ %% of current returned messages
+ RetLen = lqueue:len(Returns), % O(1)
+ CurReturns = length([R || R <- lqueue:to_list(Returns),
+ R =/= '$prefix_msg']),
+ PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns,
State#state{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
dehydrate_consumer(C)
- % C#consumer{checked_out = #{}}
end, Consumers),
- returns = queue:new(),
+ returns = lqueue:new(),
%% messages include returns
- prefix_msg_count = maps:size(Messages0) + MsgCount}.
+ prefix_msg_counts = {RetLen + PrefRetCnt,
+ PrefixMsgCnt}}.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
@@ -1358,13 +1380,13 @@ down_with_noconnection_returns_unack_test() ->
Cid = {<<"down_with_noconnect">>, Pid},
{State0, _} = enq(1, 1, second, test_init(test)),
?assertEqual(1, maps:size(State0#state.messages)),
- ?assertEqual(0, queue:len(State0#state.returns)),
+ ?assertEqual(0, lqueue:len(State0#state.returns)),
{State1, {_, _}} = deq(2, Cid, unsettled, State0),
?assertEqual(0, maps:size(State1#state.messages)),
- ?assertEqual(0, queue:len(State1#state.returns)),
+ ?assertEqual(0, lqueue:len(State1#state.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
?assertEqual(1, maps:size(State2a#state.messages)),
- ?assertEqual(1, queue:len(State2a#state.returns)),
+ ?assertEqual(1, lqueue:len(State2a#state.returns)),
ok.
down_with_noproc_enqueuer_is_cleaned_up_test() ->
@@ -1579,6 +1601,26 @@ enq_check_settle_duplicate_test() ->
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
+
+multi_return_snapshot_test() ->
+ %% this was discovered using property testing
+ C1 = {<<>>, c:pid(0,6723,1)},
+ C2 = {<<0>>,c:pid(0,6723,1)},
+ E = c:pid(0,6720,1),
+ Commands = [
+ {checkout,{auto,2,simple_prefetch},C1},
+ {enqueue,E,1,msg},
+ {enqueue,E,2,msg},
+ {checkout,cancel,C1}, %% both on returns queue
+ {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2
+ {return,[0],C2}, %% E1 in returns, E2 with C2
+ {return,[1],C2}, %% E2 in returns E1 with C2
+ {settle,[2],C2} %% E2 with C2
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+
run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index e1848862fe..345a99a03c 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -15,83 +15,83 @@
-include_lib("ra/include/ra.hrl").
-compile({no_auto_import, [size/1]}).
--record(state, {data = #{} :: #{integer() => term()},
- smallest :: undefined | non_neg_integer(),
- largest :: undefined | non_neg_integer()
- }).
+-record(?MODULE, {data = #{} :: #{integer() => term()},
+ smallest :: undefined | non_neg_integer(),
+ largest :: undefined | non_neg_integer()
+ }).
--opaque state() :: #state{}.
+-opaque state() :: #?MODULE{}.
-export_type([state/0]).
-spec empty() -> state().
empty() ->
- #state{}.
+ #?MODULE{}.
-spec fetch(integer(), state()) -> undefined | term().
-fetch(Key, #state{data = Data}) ->
+fetch(Key, #?MODULE{data = Data}) ->
maps:get(Key, Data, undefined).
% only integer keys are supported
-spec append(integer(), term(), state()) -> state().
append(Key, Value,
- #state{data = Data,
+ #?MODULE{data = Data,
smallest = Smallest,
largest = Largest} = State)
when Key > Largest orelse Largest =:= undefined ->
- State#state{data = maps:put(Key, Value, Data),
+ State#?MODULE{data = maps:put(Key, Value, Data),
smallest = ra_lib:default(Smallest, Key),
largest = Key}.
-spec return(integer(), term(), state()) -> state().
-return(Key, Value, #state{data = Data, smallest = Smallest} = State)
+return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State)
when is_integer(Key) andalso Key < Smallest ->
% TODO: this could potentially result in very large gaps which would
% result in poor performance of smallest/1
% We could try to persist a linked list of "smallests" to make it quicker
% to skip from one to the other - needs measurement
- State#state{data = maps:put(Key, Value, Data),
+ State#?MODULE{data = maps:put(Key, Value, Data),
smallest = Key};
-return(Key, Value, #state{data = Data} = State)
+return(Key, Value, #?MODULE{data = Data} = State)
when is_integer(Key) ->
- State#state{data = maps:put(Key, Value, Data)}.
+ State#?MODULE{data = maps:put(Key, Value, Data)}.
-spec delete(integer(), state()) -> state().
-delete(Smallest, #state{data = Data0,
+delete(Smallest, #?MODULE{data = Data0,
largest = Largest,
smallest = Smallest} = State) ->
Data = maps:remove(Smallest, Data0),
case find_next(Smallest + 1, Largest, Data) of
undefined ->
- State#state{data = Data,
+ State#?MODULE{data = Data,
smallest = undefined,
largest = undefined};
Next ->
- State#state{data = Data, smallest = Next}
+ State#?MODULE{data = Data, smallest = Next}
end;
-delete(Key, #state{data = Data} = State) ->
- State#state{data = maps:remove(Key, Data)}.
+delete(Key, #?MODULE{data = Data} = State) ->
+ State#?MODULE{data = maps:remove(Key, Data)}.
-spec size(state()) -> non_neg_integer().
-size(#state{data = Data}) ->
+size(#?MODULE{data = Data}) ->
maps:size(Data).
-spec smallest(state()) -> undefined | {integer(), term()}.
-smallest(#state{smallest = undefined}) ->
+smallest(#?MODULE{smallest = undefined}) ->
undefined;
-smallest(#state{smallest = Smallest, data = Data}) ->
+smallest(#?MODULE{smallest = Smallest, data = Data}) ->
{Smallest, maps:get(Smallest, Data)}.
-spec next_key_after(non_neg_integer(), state()) -> undefined | integer().
-next_key_after(_Idx, #state{smallest = undefined}) ->
+next_key_after(_Idx, #?MODULE{smallest = undefined}) ->
% map must be empty
undefined;
-next_key_after(Idx, #state{smallest = Smallest,
+next_key_after(Idx, #?MODULE{smallest = Smallest,
largest = Largest})
when Idx+1 < Smallest orelse Idx+1 > Largest ->
undefined;
-next_key_after(Idx, #state{data = Data} = State) ->
+next_key_after(Idx, #?MODULE{data = Data} = State) ->
Next = Idx+1,
case maps:is_key(Next, Data) of
true ->
@@ -101,8 +101,8 @@ next_key_after(Idx, #state{data = Data} = State) ->
end.
-spec map(fun(), state()) -> state().
-map(F, #state{data = Data} = State) ->
- State#state{data = maps:map(F, Data)}.
+map(F, #?MODULE{data = Data} = State) ->
+ State#?MODULE{data = maps:map(F, Data)}.
%% internal
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
new file mode 100644
index 0000000000..48e7b9aa7f
--- /dev/null
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -0,0 +1,348 @@
+-module(rabbit_fifo_prop_SUITE).
+
+-compile(export_all).
+
+-export([
+ ]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+
+all_tests() ->
+ [
+ snapshots,
+ scenario1,
+ scenario2,
+ scenario3,
+ scenario4
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+init_per_group(_Group, Config) ->
+ Config.
+
+end_per_group(_Group, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+% -type log_op() ::
+% {enqueue, pid(), maybe(msg_seqno()), Msg :: raw_msg()}.
+
+scenario1(_Config) ->
+ C1 = {<<>>, c:pid(0,6723,1)},
+ C2 = {<<0>>,c:pid(0,6723,1)},
+ E = c:pid(0,6720,1),
+
+ Commands = [
+ {checkout,{auto,2,simple_prefetch},C1},
+ {enqueue,E,1,msg1},
+ {enqueue,E,2,msg2},
+ {checkout,cancel,C1}, %% both on returns queue
+ {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2
+ {return,[0],C2}, %% E1 in returns, E2 with C2
+ {return,[1],C2}, %% E2 in returns E1 with C2
+ {settle,[2],C2} %% E2 with C2
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+scenario2(_Config) ->
+ C1 = {<<>>, c:pid(0,346,1)},
+ C2 = {<<>>,c:pid(0,379,1)},
+ E = c:pid(0,327,1),
+ Commands = [{checkout,{auto,1,simple_prefetch},C1},
+ {enqueue,E,1,msg1},
+ {checkout,cancel,C1},
+ {enqueue,E,2,msg2},
+ {checkout,{auto,1,simple_prefetch},C2},
+ {settle,[0],C1},
+ {settle,[0],C2}
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+scenario3(_Config) ->
+ C1 = {<<>>, c:pid(0,179,1)},
+ E = c:pid(0,176,1),
+ Commands = [{checkout,{auto,2,simple_prefetch},C1},
+ {enqueue,E,1,msg1},
+ {return,[0],C1},
+ {enqueue,E,2,msg2},
+ {enqueue,E,3,msg3},
+ {settle,[1],C1},
+ {settle,[2],C1}],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+scenario4(_Config) ->
+ C1 = {<<>>, c:pid(0,179,1)},
+ E = c:pid(0,176,1),
+Commands = [{checkout,{auto,1,simple_prefetch},C1},
+ {enqueue,E,1,msg},
+ {settle,[0],C1}],
+ run_snapshot_test(?FUNCTION_NAME, Commands),
+ ok.
+
+snapshots(_Config) ->
+ run_proper(
+ fun () ->
+ ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)),
+ test1_prop(O))
+ end, [], 1000).
+
+test1_prop(Commands) ->
+ ct:pal("Commands: ~p~n", [Commands]),
+ try run_snapshot_test(?FUNCTION_NAME, Commands) of
+ _ -> true
+ catch
+ Err ->
+ ct:pal("Err: ~p~n", [Err]),
+ false
+ end.
+
+log_gen() ->
+ ?LET(EPids, vector(2, pid_gen()),
+ ?LET(CPids, vector(2, pid_gen()),
+ resize(200,
+ list(
+ frequency(
+ [{20, enqueue_gen(oneof(EPids))},
+ {40, {input_event,
+ frequency([{10, settle},
+ {2, return},
+ {1, discard},
+ {1, requeue}])}},
+ {2, checkout_gen(oneof(CPids))},
+ {1, checkout_cancel_gen(oneof(CPids))},
+ {1, down_gen(oneof(EPids ++ CPids))},
+ {1, purge}
+ ]))))).
+
+pid_gen() ->
+ ?LET(_, integer(), spawn(fun () -> ok end)).
+
+down_gen(Pid) ->
+ ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
+
+enqueue_gen(Pid) ->
+ ?LET(E, {enqueue, Pid, frequency([{10, enqueue},
+ {1, delay}])}, E).
+
+checkout_cancel_gen(Pid) ->
+ {checkout, Pid, cancel}.
+
+checkout_gen(Pid) ->
+ %% pid, tag, prefetch
+ ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C).
+
+
+-record(t, {state = rabbit_fifo:init(#{name => proper,
+ shadow_copy_interval => 1})
+ :: rabbit_fifo:state(),
+ index = 1 :: non_neg_integer(), %% raft index
+ enqueuers = #{} :: #{pid() => term()},
+ consumers = #{} :: #{{binary(), pid()} => term()},
+ effects = queue:new() :: queue:queue(),
+ log = [] :: list(),
+ down = #{} :: #{pid() => noproc | noconnection}
+ }).
+
+expand(Ops) ->
+ %% execute each command against a rabbit_fifo state and capture all releavant
+ %% effects
+ T = #t{},
+ #t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops),
+ %% process the remaining effects
+ #t{log = Log} = lists:foldl(fun do_apply/2,
+ T1#t{effects = queue:new()},
+ queue:to_list(Effs)),
+
+ lists:reverse(Log).
+
+
+handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
+ down = Down,
+ effects = Effs} = T) ->
+ case Down of
+ #{Pid := noproc} ->
+ %% if it's a noproc then it cannot exist - can it?
+ %% drop operation
+ T;
+ _ ->
+ Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
+ MsgSeq = maps:get(Pid, Enqs),
+ Cmd = {enqueue, Pid, MsgSeq, msg},
+ case When of
+ enqueue ->
+ do_apply(Cmd, T#t{enqueuers = Enqs});
+ delay ->
+ %% just put the command on the effects queue
+ ct:pal("delaying ~w", [Cmd]),
+ T#t{effects = queue:in(Cmd, Effs)}
+ end
+ end;
+handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) ->
+ case maps:keys(
+ maps:filter(fun ({_, P}, _) when P == Pid -> true;
+ (_, _) -> false
+ end, Cons0)) of
+ [CId | _] ->
+ Cons = maps:remove(CId, Cons0),
+ Cmd = {checkout, cancel, CId},
+ do_apply(Cmd, T#t{consumers = Cons});
+ _ ->
+ T
+ end;
+handle_op({checkout, CId, Prefetch}, #t{consumers = Cons0} = T) ->
+ case Cons0 of
+ #{CId := _} ->
+ %% ignore if it already exists
+ T;
+ _ ->
+ Cons = maps:put(CId, ok, Cons0),
+ Cmd = {checkout, {auto, Prefetch, simple_prefetch}, CId},
+ do_apply(Cmd, T#t{consumers = Cons})
+ end;
+handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) ->
+ case Down of
+ #{Pid := noproc} ->
+ %% it it permanently down, cannot upgrade
+ T;
+ _ ->
+ %% it is either not down or down with noconnection
+ do_apply(Cmd, T#t{down = maps:put(Pid, Reason, Down)})
+ end;
+handle_op({input_event, requeue}, #t{effects = Effs} = T) ->
+ %% this simulates certain settlements arriving out of order
+ case queue:out(Effs) of
+ {{value, Cmd}, Q} ->
+ T#t{effects = queue:in(Cmd, Q)};
+ _ ->
+ T
+ end;
+handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
+ case queue:out(Effs) of
+ {{value, {settle, MsgIds, CId}}, Q} ->
+ do_apply({Settlement, MsgIds, CId}, T#t{effects = Q});
+ {{value, {enqueue, _, _, _} = Cmd}, Q} ->
+ do_apply(Cmd, T#t{effects = Q});
+ _ ->
+ T
+ end;
+handle_op(purge, T) ->
+ do_apply(purge, T).
+
+do_apply(Cmd, #t{effects = Effs, index = Index, state = S0,
+ log = Log} = T) ->
+ {S, Effects, _} = rabbit_fifo:apply(#{index => Index}, Cmd, [], S0),
+ T#t{state = S,
+ index = Index + 1,
+ effects = enq_effs(Effects, Effs),
+ log = [Cmd | Log]}.
+
+enq_effs([], Q) -> Q;
+enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
+ MsgIds = [I || {I, _} <- Msgs],
+ %% always make settle commands by default
+ %% they can be changed depending on the input event later
+ Cmd = {settle, MsgIds, {CTag, P}},
+ enq_effs(Rem, queue:in(Cmd, Q));
+enq_effs([_ | Rem], Q) ->
+ % ct:pal("enq_effs dropping ~w~n", [E]),
+ enq_effs(Rem, Q).
+
+
+%% Utility
+run_proper(Fun, Args, NumTests) ->
+ ?assertEqual(
+ true,
+ proper:counterexample(
+ erlang:apply(Fun, Args),
+ [{numtests, NumTests},
+ {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines
+ (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A)
+ end}])).
+
+run_snapshot_test(Name, Commands) ->
+ %% create every incremental permuation of the commands lists
+ %% and run the snapshot tests against that
+ [begin
+ % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
+ run_snapshot_test0(Name, C)
+ end || C <- prefixes(Commands, 1, [])].
+
+run_snapshot_test0(Name, Commands) ->
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ {State, Effects} = run_log(test_init(Name), Entries),
+
+ [begin
+ Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
+ (_) -> false
+ end, Entries),
+ % L = case Filtered of
+ % [] -> undefined;
+ % _ ->lists:last(Filtered)
+ % end,
+
+ % ct:pal("running from snapshot: ~b to ~w"
+ % "~n~p~n",
+ % [SnapIdx, L, SnapState]),
+ {S, _} = run_log(SnapState, Filtered),
+ % assert log can be restored from any release cursor index
+ % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
+ % [Name, SnapIdx, S, State, SnapState, Filtered]),
+ ?assertEqual(State, S)
+ end || {release_cursor, SnapIdx, SnapState} <- Effects],
+ ok.
+
+prefixes(Source, N, Acc) when N > length(Source) ->
+ lists:reverse(Acc);
+prefixes(Source, N, Acc) ->
+ {X, _} = lists:split(N, Source),
+ prefixes(Source, N+1, [X | Acc]).
+
+run_log(InitState, Entries) ->
+ lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
+ case rabbit_fifo:apply(meta(Idx), E, Efx0, Acc0) of
+ {Acc, Efx, _} ->
+ {Acc, Efx}
+ end
+ end, {InitState, []}, Entries).
+
+test_init(Name) ->
+ rabbit_fifo:init(#{name => Name,
+ shadow_copy_interval => 0,
+ metrics_handler => {?MODULE, metrics_handler, []}}).
+meta(Idx) ->
+ #{index => Idx, term => 1}.