diff options
| -rw-r--r-- | src/lqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 134 | ||||
| -rw-r--r-- | src/rabbit_fifo_index.erl | 52 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 348 |
4 files changed, 468 insertions, 74 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl index 0652061075..1abe4e0b82 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -21,7 +21,7 @@ %% is an O(1) operation, in contrast with queue:len/1 which is O(n). -export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2, - foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]). + foldl/3, foldr/3, from_list/1, drop/1, to_list/1, peek/1, peek_r/1]). -define(QUEUE, queue). @@ -32,6 +32,7 @@ -type result() :: 'empty' | {'value', value()}. -spec new() -> ?MODULE(). +-spec drop(?MODULE()) -> ?MODULE(). -spec is_empty(?MODULE()) -> boolean(). -spec len(?MODULE()) -> non_neg_integer(). -spec in(value(), ?MODULE()) -> ?MODULE(). @@ -48,6 +49,8 @@ new() -> {0, ?QUEUE:new()}. +drop({L, Q}) -> {L - 1, ?QUEUE:drop(Q)}. + is_empty({0, _Q}) -> true; is_empty(_) -> false. @@ -81,7 +84,8 @@ foldr(Fun, Init, Q) -> {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1) end. -len({L, _Q}) -> L. +len({L, _}) -> L. + peek({ 0, _Q}) -> empty; peek({_L, Q}) -> ?QUEUE:peek(Q). diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 740c6e202c..00d0db0b8a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -163,7 +163,7 @@ next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from % this list first before taking low_msg_num - returns = queue:new() :: queue:queue(msg_in_id()), + returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'), % a counter of enqueues - used to trigger shadow copy points enqueue_count = 0 :: non_neg_integer(), % a map containing all the live processes that have ever enqueued @@ -196,7 +196,8 @@ %% it instead takes messages from the `messages' map. %% This is done so that consumers are still served in a deterministic %% order on recovery. - prefix_msg_count = 0 :: non_neg_integer() + prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(), + PrefixMsgs :: non_neg_integer()} }). -opaque state() :: #state{}. @@ -279,7 +280,7 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0, #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> Checked = maps:without(MsgIds, Checked0), Returned = maps:with(MsgIds, Checked0), - MsgNumMsgs = [M || M <- maps:values(Returned)], + MsgNumMsgs = maps:values(Returned), return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State); _ -> {State, Effects0, ok} @@ -329,8 +330,8 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0, end; apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0, #state{messages = M, - prefix_msg_count = 0} = State0) when map_size(M) == 0 -> - %% FIX: also check if there are returned messages + prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 -> + %% FIXME: also check if there are returned messages %% TODO do we need metric visibility of empty get requests? {State0, Effects0, {dequeue, empty}}; apply(Meta, {checkout, {dequeue, settled}, ConsumerId}, @@ -382,7 +383,7 @@ apply(#{index := RaftIdx}, purge, Effects0, RaftIdx, Indexes, State1#state{ra_indexes = rabbit_fifo_index:empty(), messages = #{}, - returns = queue:new(), + returns = lqueue:new(), low_msg_num = undefined}, Effects1), {State, [garbage_collection | Effects], {purge, Total}}; apply(_, {down, ConsumerPid, noconnection}, @@ -476,7 +477,7 @@ apply(_, {update_state, Conf}, Effects, State) -> state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, name = Name, - prefix_msg_count = 0, + prefix_msg_counts = {0, 0}, become_leader_handler = BLH}) -> % return effects to monitor all current consumers and enqueuers Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), @@ -489,10 +490,10 @@ state_enter(leader, #state{consumers = Cons, {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; -state_enter(recovered, #state{prefix_msg_count = PrefixMsgCount}) - when PrefixMsgCount =/= 0 -> +state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) + when PrefixMsgCounts =/= {0, 0} -> %% TODO: remove assertion? - exit({rabbit_fifo, unexpected_prefix_msg_count, PrefixMsgCount}); + exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))]; @@ -722,9 +723,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, end, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun('$prefix_msg', - #state{prefix_msg_count = MsgCount} = S0) -> - S0#state{prefix_msg_count = MsgCount + 1}; + State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> + return_one(0, Msg, S0); ({MsgNum, Msg}, S0) -> return_one(MsgNum, Msg, S0) end, State0, MsgNumMsgs), @@ -826,6 +826,9 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes, end. % TODO update message then update messages and returns in single operations +return_one(0, '$prefix_msg', + #state{returns = Returns} = State0) -> + State0#state{returns = lqueue:in('$prefix_msg', Returns)}; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, #state{messages = Messages, returns = Returns} = State0) -> @@ -835,12 +838,11 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, Msg = {RaftId, {Header, RawMsg}}, % this should not affect the release cursor in any way State0#state{messages = maps:put(MsgNum, Msg, Messages), - returns = queue:in(MsgNum, Returns)}. + returns = lqueue:in(MsgNum, Returns)}. return_all(State, Checked) -> - maps:fold(fun (_, '$prefix_msg', - #state{prefix_msg_count = MsgCount} = S) -> - S#state{prefix_msg_count = MsgCount + 1}; + maps:fold(fun (_, '$prefix_msg', S) -> + return_one(0, '$prefix_msg', S); (_, {MsgNum, Msg}, S) -> return_one(MsgNum, Msg, S) end, State, Checked). @@ -869,13 +871,20 @@ append_send_msg_effects(Effects0, AccMap) -> end, Effects0, AccMap), [{aux, active} | Effects]. +next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State) + when PReturns > 0 -> + %% there are prefix returns, these should be served first + {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}}; next_checkout_message(#state{returns = Returns, low_msg_num = Low0, + prefix_msg_counts = {R, P}, next_msg_num = NextMsgNum} = State) -> %% use peek rather than out there as the most likely case is an empty %% queue - case queue:peek(Returns) of - empty -> + case lqueue:peek(Returns) of + {value, Next} -> + {Next, State#state{returns = lqueue:drop(Returns)}}; + empty when P == 0 -> case Low0 of undefined -> {undefined, State}; @@ -888,25 +897,32 @@ next_checkout_message(#state{returns = Returns, {Low0, State#state{low_msg_num = Low}} end end; - {value, Next} -> - {Next, State#state{returns = queue:drop(Returns)}} + empty -> + %% There are prefix msgs + {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}} end. -take_next_msg(#state{prefix_msg_count = 0, - messages = Messages0} = State0) -> - {NextMsgInId, State} = next_checkout_message(State0), - %% messages are available - case maps:take(NextMsgInId, Messages0) of - {IdxMsg, Messages} -> - {{NextMsgInId, IdxMsg}, State, Messages, 0}; - error -> - error - end; -take_next_msg(#state{prefix_msg_count = MsgCount, - messages = Messages} = State) -> - %% there is still a prefix message count for the consumer - %% "fake" a '$prefix_msg' message - {'$prefix_msg', State, Messages, MsgCount - 1}. +%% next message is determined as follows: +%% First we check if there are are prefex returns +%% Then we check if there are current returns +%% then we check prefix msgs +%% then we check current messages +%% +%% When we return it is always done to the current return queue +%% for both prefix messages and current messages +take_next_msg(#state{messages = Messages0} = State0) -> + case next_checkout_message(State0) of + {'$prefix_msg', State} -> + {'$prefix_msg', State, Messages0}; + {NextMsgInId, State} -> + %% messages are available + case maps:take(NextMsgInId, Messages0) of + {IdxMsg, Messages} -> + {{NextMsgInId, IdxMsg}, State, Messages}; + error -> + error + end + end. send_msg_effect({CTag, CPid}, Msgs) -> {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. @@ -917,7 +933,7 @@ checkout_one(#state{service_queue = SQ0, case queue:peek(SQ0) of {value, ConsumerId} -> case take_next_msg(InitState) of - {ConsumerMsg, State0, Messages, PrefMsgC} -> + {ConsumerMsg, State0, Messages} -> SQ1 = queue:drop(SQ0), %% there are consumers waiting to be serviced %% process consumer checkout @@ -943,7 +959,6 @@ checkout_one(#state{service_queue = SQ0, Cons0, SQ1, []), State = State0#state{service_queue = SQ, messages = Messages, - prefix_msg_count = PrefMsgC, consumers = Cons}, Msg = case ConsumerMsg of '$prefix_msg' -> '$prefix_msg'; @@ -1034,19 +1049,26 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point -dehydrate_state(#state{messages = Messages0, +dehydrate_state(#state{messages = Messages, consumers = Consumers, - prefix_msg_count = MsgCount} = State) -> + returns = Returns, + prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) -> + %% TODO: optimise to avoid having to iterate the queue to get the number + %% of current returned messages + RetLen = lqueue:len(Returns), % O(1) + CurReturns = length([R || R <- lqueue:to_list(Returns), + R =/= '$prefix_msg']), + PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns, State#state{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), low_msg_num = undefined, consumers = maps:map(fun (_, C) -> dehydrate_consumer(C) - % C#consumer{checked_out = #{}} end, Consumers), - returns = queue:new(), + returns = lqueue:new(), %% messages include returns - prefix_msg_count = maps:size(Messages0) + MsgCount}. + prefix_msg_counts = {RetLen + PrefRetCnt, + PrefixMsgCnt}}. dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0), @@ -1358,13 +1380,13 @@ down_with_noconnection_returns_unack_test() -> Cid = {<<"down_with_noconnect">>, Pid}, {State0, _} = enq(1, 1, second, test_init(test)), ?assertEqual(1, maps:size(State0#state.messages)), - ?assertEqual(0, queue:len(State0#state.returns)), + ?assertEqual(0, lqueue:len(State0#state.returns)), {State1, {_, _}} = deq(2, Cid, unsettled, State0), ?assertEqual(0, maps:size(State1#state.messages)), - ?assertEqual(0, queue:len(State1#state.returns)), + ?assertEqual(0, lqueue:len(State1#state.returns)), {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), ?assertEqual(1, maps:size(State2a#state.messages)), - ?assertEqual(1, queue:len(State2a#state.returns)), + ?assertEqual(1, lqueue:len(State2a#state.returns)), ok. down_with_noproc_enqueuer_is_cleaned_up_test() -> @@ -1579,6 +1601,26 @@ enq_check_settle_duplicate_test() -> % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). + +multi_return_snapshot_test() -> + %% this was discovered using property testing + C1 = {<<>>, c:pid(0,6723,1)}, + C2 = {<<0>>,c:pid(0,6723,1)}, + E = c:pid(0,6720,1), + Commands = [ + {checkout,{auto,2,simple_prefetch},C1}, + {enqueue,E,1,msg}, + {enqueue,E,2,msg}, + {checkout,cancel,C1}, %% both on returns queue + {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2 + {return,[0],C2}, %% E1 in returns, E2 with C2 + {return,[1],C2}, %% E2 in returns E1 with C2 + {settle,[2],C2} %% E2 with C2 + ], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + + run_snapshot_test(Name, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index e1848862fe..345a99a03c 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -15,83 +15,83 @@ -include_lib("ra/include/ra.hrl"). -compile({no_auto_import, [size/1]}). --record(state, {data = #{} :: #{integer() => term()}, - smallest :: undefined | non_neg_integer(), - largest :: undefined | non_neg_integer() - }). +-record(?MODULE, {data = #{} :: #{integer() => term()}, + smallest :: undefined | non_neg_integer(), + largest :: undefined | non_neg_integer() + }). --opaque state() :: #state{}. +-opaque state() :: #?MODULE{}. -export_type([state/0]). -spec empty() -> state(). empty() -> - #state{}. + #?MODULE{}. -spec fetch(integer(), state()) -> undefined | term(). -fetch(Key, #state{data = Data}) -> +fetch(Key, #?MODULE{data = Data}) -> maps:get(Key, Data, undefined). % only integer keys are supported -spec append(integer(), term(), state()) -> state(). append(Key, Value, - #state{data = Data, + #?MODULE{data = Data, smallest = Smallest, largest = Largest} = State) when Key > Largest orelse Largest =:= undefined -> - State#state{data = maps:put(Key, Value, Data), + State#?MODULE{data = maps:put(Key, Value, Data), smallest = ra_lib:default(Smallest, Key), largest = Key}. -spec return(integer(), term(), state()) -> state(). -return(Key, Value, #state{data = Data, smallest = Smallest} = State) +return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State) when is_integer(Key) andalso Key < Smallest -> % TODO: this could potentially result in very large gaps which would % result in poor performance of smallest/1 % We could try to persist a linked list of "smallests" to make it quicker % to skip from one to the other - needs measurement - State#state{data = maps:put(Key, Value, Data), + State#?MODULE{data = maps:put(Key, Value, Data), smallest = Key}; -return(Key, Value, #state{data = Data} = State) +return(Key, Value, #?MODULE{data = Data} = State) when is_integer(Key) -> - State#state{data = maps:put(Key, Value, Data)}. + State#?MODULE{data = maps:put(Key, Value, Data)}. -spec delete(integer(), state()) -> state(). -delete(Smallest, #state{data = Data0, +delete(Smallest, #?MODULE{data = Data0, largest = Largest, smallest = Smallest} = State) -> Data = maps:remove(Smallest, Data0), case find_next(Smallest + 1, Largest, Data) of undefined -> - State#state{data = Data, + State#?MODULE{data = Data, smallest = undefined, largest = undefined}; Next -> - State#state{data = Data, smallest = Next} + State#?MODULE{data = Data, smallest = Next} end; -delete(Key, #state{data = Data} = State) -> - State#state{data = maps:remove(Key, Data)}. +delete(Key, #?MODULE{data = Data} = State) -> + State#?MODULE{data = maps:remove(Key, Data)}. -spec size(state()) -> non_neg_integer(). -size(#state{data = Data}) -> +size(#?MODULE{data = Data}) -> maps:size(Data). -spec smallest(state()) -> undefined | {integer(), term()}. -smallest(#state{smallest = undefined}) -> +smallest(#?MODULE{smallest = undefined}) -> undefined; -smallest(#state{smallest = Smallest, data = Data}) -> +smallest(#?MODULE{smallest = Smallest, data = Data}) -> {Smallest, maps:get(Smallest, Data)}. -spec next_key_after(non_neg_integer(), state()) -> undefined | integer(). -next_key_after(_Idx, #state{smallest = undefined}) -> +next_key_after(_Idx, #?MODULE{smallest = undefined}) -> % map must be empty undefined; -next_key_after(Idx, #state{smallest = Smallest, +next_key_after(Idx, #?MODULE{smallest = Smallest, largest = Largest}) when Idx+1 < Smallest orelse Idx+1 > Largest -> undefined; -next_key_after(Idx, #state{data = Data} = State) -> +next_key_after(Idx, #?MODULE{data = Data} = State) -> Next = Idx+1, case maps:is_key(Next, Data) of true -> @@ -101,8 +101,8 @@ next_key_after(Idx, #state{data = Data} = State) -> end. -spec map(fun(), state()) -> state(). -map(F, #state{data = Data} = State) -> - State#state{data = maps:map(F, Data)}. +map(F, #?MODULE{data = Data} = State) -> + State#?MODULE{data = maps:map(F, Data)}. %% internal diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl new file mode 100644 index 0000000000..48e7b9aa7f --- /dev/null +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -0,0 +1,348 @@ +-module(rabbit_fifo_prop_SUITE). + +-compile(export_all). + +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +all_tests() -> + [ + snapshots, + scenario1, + scenario2, + scenario3, + scenario4 + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +% -type log_op() :: +% {enqueue, pid(), maybe(msg_seqno()), Msg :: raw_msg()}. + +scenario1(_Config) -> + C1 = {<<>>, c:pid(0,6723,1)}, + C2 = {<<0>>,c:pid(0,6723,1)}, + E = c:pid(0,6720,1), + + Commands = [ + {checkout,{auto,2,simple_prefetch},C1}, + {enqueue,E,1,msg1}, + {enqueue,E,2,msg2}, + {checkout,cancel,C1}, %% both on returns queue + {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2 + {return,[0],C2}, %% E1 in returns, E2 with C2 + {return,[1],C2}, %% E2 in returns E1 with C2 + {settle,[2],C2} %% E2 with C2 + ], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + +scenario2(_Config) -> + C1 = {<<>>, c:pid(0,346,1)}, + C2 = {<<>>,c:pid(0,379,1)}, + E = c:pid(0,327,1), + Commands = [{checkout,{auto,1,simple_prefetch},C1}, + {enqueue,E,1,msg1}, + {checkout,cancel,C1}, + {enqueue,E,2,msg2}, + {checkout,{auto,1,simple_prefetch},C2}, + {settle,[0],C1}, + {settle,[0],C2} + ], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + +scenario3(_Config) -> + C1 = {<<>>, c:pid(0,179,1)}, + E = c:pid(0,176,1), + Commands = [{checkout,{auto,2,simple_prefetch},C1}, + {enqueue,E,1,msg1}, + {return,[0],C1}, + {enqueue,E,2,msg2}, + {enqueue,E,3,msg3}, + {settle,[1],C1}, + {settle,[2],C1}], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + +scenario4(_Config) -> + C1 = {<<>>, c:pid(0,179,1)}, + E = c:pid(0,176,1), +Commands = [{checkout,{auto,1,simple_prefetch},C1}, + {enqueue,E,1,msg}, + {settle,[0],C1}], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + +snapshots(_Config) -> + run_proper( + fun () -> + ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)), + test1_prop(O)) + end, [], 1000). + +test1_prop(Commands) -> + ct:pal("Commands: ~p~n", [Commands]), + try run_snapshot_test(?FUNCTION_NAME, Commands) of + _ -> true + catch + Err -> + ct:pal("Err: ~p~n", [Err]), + false + end. + +log_gen() -> + ?LET(EPids, vector(2, pid_gen()), + ?LET(CPids, vector(2, pid_gen()), + resize(200, + list( + frequency( + [{20, enqueue_gen(oneof(EPids))}, + {40, {input_event, + frequency([{10, settle}, + {2, return}, + {1, discard}, + {1, requeue}])}}, + {2, checkout_gen(oneof(CPids))}, + {1, checkout_cancel_gen(oneof(CPids))}, + {1, down_gen(oneof(EPids ++ CPids))}, + {1, purge} + ]))))). + +pid_gen() -> + ?LET(_, integer(), spawn(fun () -> ok end)). + +down_gen(Pid) -> + ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). + +enqueue_gen(Pid) -> + ?LET(E, {enqueue, Pid, frequency([{10, enqueue}, + {1, delay}])}, E). + +checkout_cancel_gen(Pid) -> + {checkout, Pid, cancel}. + +checkout_gen(Pid) -> + %% pid, tag, prefetch + ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C). + + +-record(t, {state = rabbit_fifo:init(#{name => proper, + shadow_copy_interval => 1}) + :: rabbit_fifo:state(), + index = 1 :: non_neg_integer(), %% raft index + enqueuers = #{} :: #{pid() => term()}, + consumers = #{} :: #{{binary(), pid()} => term()}, + effects = queue:new() :: queue:queue(), + log = [] :: list(), + down = #{} :: #{pid() => noproc | noconnection} + }). + +expand(Ops) -> + %% execute each command against a rabbit_fifo state and capture all releavant + %% effects + T = #t{}, + #t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops), + %% process the remaining effects + #t{log = Log} = lists:foldl(fun do_apply/2, + T1#t{effects = queue:new()}, + queue:to_list(Effs)), + + lists:reverse(Log). + + +handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, + down = Down, + effects = Effs} = T) -> + case Down of + #{Pid := noproc} -> + %% if it's a noproc then it cannot exist - can it? + %% drop operation + T; + _ -> + Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0), + MsgSeq = maps:get(Pid, Enqs), + Cmd = {enqueue, Pid, MsgSeq, msg}, + case When of + enqueue -> + do_apply(Cmd, T#t{enqueuers = Enqs}); + delay -> + %% just put the command on the effects queue + ct:pal("delaying ~w", [Cmd]), + T#t{effects = queue:in(Cmd, Effs)} + end + end; +handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) -> + case maps:keys( + maps:filter(fun ({_, P}, _) when P == Pid -> true; + (_, _) -> false + end, Cons0)) of + [CId | _] -> + Cons = maps:remove(CId, Cons0), + Cmd = {checkout, cancel, CId}, + do_apply(Cmd, T#t{consumers = Cons}); + _ -> + T + end; +handle_op({checkout, CId, Prefetch}, #t{consumers = Cons0} = T) -> + case Cons0 of + #{CId := _} -> + %% ignore if it already exists + T; + _ -> + Cons = maps:put(CId, ok, Cons0), + Cmd = {checkout, {auto, Prefetch, simple_prefetch}, CId}, + do_apply(Cmd, T#t{consumers = Cons}) + end; +handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) -> + case Down of + #{Pid := noproc} -> + %% it it permanently down, cannot upgrade + T; + _ -> + %% it is either not down or down with noconnection + do_apply(Cmd, T#t{down = maps:put(Pid, Reason, Down)}) + end; +handle_op({input_event, requeue}, #t{effects = Effs} = T) -> + %% this simulates certain settlements arriving out of order + case queue:out(Effs) of + {{value, Cmd}, Q} -> + T#t{effects = queue:in(Cmd, Q)}; + _ -> + T + end; +handle_op({input_event, Settlement}, #t{effects = Effs} = T) -> + case queue:out(Effs) of + {{value, {settle, MsgIds, CId}}, Q} -> + do_apply({Settlement, MsgIds, CId}, T#t{effects = Q}); + {{value, {enqueue, _, _, _} = Cmd}, Q} -> + do_apply(Cmd, T#t{effects = Q}); + _ -> + T + end; +handle_op(purge, T) -> + do_apply(purge, T). + +do_apply(Cmd, #t{effects = Effs, index = Index, state = S0, + log = Log} = T) -> + {S, Effects, _} = rabbit_fifo:apply(#{index => Index}, Cmd, [], S0), + T#t{state = S, + index = Index + 1, + effects = enq_effs(Effects, Effs), + log = [Cmd | Log]}. + +enq_effs([], Q) -> Q; +enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) -> + MsgIds = [I || {I, _} <- Msgs], + %% always make settle commands by default + %% they can be changed depending on the input event later + Cmd = {settle, MsgIds, {CTag, P}}, + enq_effs(Rem, queue:in(Cmd, Q)); +enq_effs([_ | Rem], Q) -> + % ct:pal("enq_effs dropping ~w~n", [E]), + enq_effs(Rem, Q). + + +%% Utility +run_proper(Fun, Args, NumTests) -> + ?assertEqual( + true, + proper:counterexample( + erlang:apply(Fun, Args), + [{numtests, NumTests}, + {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines + (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) + end}])). + +run_snapshot_test(Name, Commands) -> + %% create every incremental permuation of the commands lists + %% and run the snapshot tests against that + [begin + % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), + run_snapshot_test0(Name, C) + end || C <- prefixes(Commands, 1, [])]. + +run_snapshot_test0(Name, Commands) -> + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + {State, Effects} = run_log(test_init(Name), Entries), + + [begin + Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; + (_) -> false + end, Entries), + % L = case Filtered of + % [] -> undefined; + % _ ->lists:last(Filtered) + % end, + + % ct:pal("running from snapshot: ~b to ~w" + % "~n~p~n", + % [SnapIdx, L, SnapState]), + {S, _} = run_log(SnapState, Filtered), + % assert log can be restored from any release cursor index + % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n", + % [Name, SnapIdx, S, State, SnapState, Filtered]), + ?assertEqual(State, S) + end || {release_cursor, SnapIdx, SnapState} <- Effects], + ok. + +prefixes(Source, N, Acc) when N > length(Source) -> + lists:reverse(Acc); +prefixes(Source, N, Acc) -> + {X, _} = lists:split(N, Source), + prefixes(Source, N+1, [X | Acc]). + +run_log(InitState, Entries) -> + lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> + case rabbit_fifo:apply(meta(Idx), E, Efx0, Acc0) of + {Acc, Efx, _} -> + {Acc, Efx} + end + end, {InitState, []}, Entries). + +test_init(Name) -> + rabbit_fifo:init(#{name => Name, + shadow_copy_interval => 0, + metrics_handler => {?MODULE, metrics_handler, []}}). +meta(Idx) -> + #{index => Idx, term => 1}. |
