diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 11 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 339 |
2 files changed, 324 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index b966dca82e..9da3a6913c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -327,9 +327,16 @@ apply(Meta, {down, Pid, noconnection}, Credit = increase_credit(C0, maps:size(Checked)), {St, Effs1} = return_all(State0, Effs, Cid, C0#consumer{credit = Credit}), - #{Cid := C} = St#?MODULE.consumers, + %% if the consumer was cancelled there is a chance it got + %% removed when returning hence we need to be defensive here + Waiting = case St#?MODULE.consumers of + #{Cid := C} -> + Waiting0 ++ [{Cid, C}]; + _ -> + Waiting0 + end, {St#?MODULE{consumers = #{}, - waiting_consumers = Waiting0 ++ [{Cid, C}]}, + waiting_consumers = Waiting}, Effs1}; _ -> {State0, []} end, diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 0f5fb8bf58..55e39ad8ec 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -8,6 +8,8 @@ -include_lib("proper/include/proper.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("ra/include/ra.hrl"). +-include("src/rabbit_fifo.hrl"). %%%=================================================================== %%% Common Test callbacks @@ -21,6 +23,7 @@ all() -> all_tests() -> [ + test_run_log, snapshots, scenario1, scenario2, @@ -38,7 +41,14 @@ all_tests() -> scenario14, scenario15, scenario16, - scenario17 + scenario17, + single_active, + single_active_01, + single_active_02, + single_active_03, + single_active_ordering, + single_active_ordering_01, + single_active_ordering_02 ]. groups() -> @@ -295,6 +305,82 @@ scenario17(_Config) -> }, Commands), ok. +single_active_01(_Config) -> + C1Pid = test_util:fake_pid(rabbit@fake_node1), + C1 = {<<0>>, C1Pid}, + C2Pid = test_util:fake_pid(rabbit@fake_node2), + C2 = {<<>>, C2Pid}, + E = test_util:fake_pid(rabbit@fake_node2), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,<<"one">>), + make_checkout(C2, {auto,1,simple_prefetch}), + make_checkout(C1, cancel), + {nodeup,rabbit@fake_node1} + ], + ?assert( + single_active_prop(#{name => ?FUNCTION_NAME, + single_active_consumer_on => true + }, Commands, false)), + ok. + +single_active_02(_Config) -> + C1Pid = test_util:fake_pid(node()), + C1 = {<<0>>, C1Pid}, + C2Pid = test_util:fake_pid(node()), + C2 = {<<>>, C2Pid}, + E = test_util:fake_pid(node()), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,<<"one">>), + {down,E,noconnection}, + make_checkout(C2, {auto,1,simple_prefetch}), + make_checkout(C2, cancel), + {down,E,noconnection} + ], + Conf = config(?FUNCTION_NAME, undefined, undefined, true, 1), + ?assert(single_active_prop(Conf, Commands, false)), + ok. + +single_active_03(_Config) -> + C1Pid = test_util:fake_pid(node()), + C1 = {<<0>>, C1Pid}, + % C2Pid = test_util:fake_pid(rabbit@fake_node2), + % C2 = {<<>>, C2Pid}, + Pid = test_util:fake_pid(node()), + E = test_util:fake_pid(rabbit@fake_node2), + Commands = [ + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E, 1, 0), + make_enqueue(E, 2, 1), + {down, Pid, noconnection}, + {nodeup, node()} + ], + Conf = config(?FUNCTION_NAME, 0, 0, true, 0), + ?assert(single_active_prop(Conf, Commands, true)), + ok. + +test_run_log(_Config) -> + Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end}, + run_proper( + fun () -> + ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit}, + frequency([{10, {0, 0, false, 0}}, + {5, {oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]), + boolean(), + oneof([range(1, 3), undefined]) + }}]), + ?FORALL(O, ?LET(Ops, log_gen(100), expand(Ops, Fun)), + collect({log_size, length(O)}, + dump_generated( + config(?FUNCTION_NAME, + Length, + Bytes, + SingleActiveConsumer, + DeliveryLimit), O)))) + end, [], 10). + snapshots(_Config) -> run_proper( fun () -> @@ -315,6 +401,84 @@ snapshots(_Config) -> DeliveryLimit), O)))) end, [], 2500). +single_active(_Config) -> + Size = 2000, + run_proper( + fun () -> + ?FORALL({Length, Bytes, DeliveryLimit}, + frequency([{10, {0, 0, 0}}, + {5, {oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]), + oneof([range(1, 3), undefined]) + }}]), + ?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops)), + collect({log_size, length(O)}, + single_active_prop( + config(?FUNCTION_NAME, + Length, + Bytes, + true, + DeliveryLimit), O, + false)))) + end, [], Size). + +single_active_ordering(_Config) -> + Size = 2000, + Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end}, + run_proper( + fun () -> + ?FORALL(O, ?LET(Ops, log_gen_ordered(Size), expand(Ops, Fun)), + collect({log_size, length(O)}, + single_active_prop(config(?FUNCTION_NAME, + undefined, + undefined, + true, + undefined), O, + true))) + end, [], Size). + +single_active_ordering_01(_Config) -> +% [{enqueue,<0.145.0>,1,0}, +% {enqueue,<0.145.0>,1,1}, +% {checkout,{<<>>,<0.148.0>},{auto,1,simple_prefetch},#{ack => true,args => [],prefetch => 1,username => <<117,115,101,114>>}} +% {enqueue,<0.140.0>,1,2}, +% {settle,{<<>>,<0.148.0>},[0]}] + C1Pid = test_util:fake_pid(node()), + C1 = {<<0>>, C1Pid}, + E = test_util:fake_pid(rabbit@fake_node2), + E2 = test_util:fake_pid(rabbit@fake_node2), + Commands = [ + make_enqueue(E, 1, 0), + make_enqueue(E, 2, 1), + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E2, 1, 2), + make_settle(C1, [0]) + ], + Conf = config(?FUNCTION_NAME, 0, 0, true, 0), + ?assert(single_active_prop(Conf, Commands, true)), + ok. + +single_active_ordering_02(_Config) -> +% [{checkout, % {<<>>,<0.177.0>}, % {auto,1,simple_prefetch}, +% {enqueue,<0.172.0>,2,1}, +% {down,<0.172.0>,noproc}, +% {enqueue,<0.172.0>,1,0}, +% {settle,{<<>>,<0.177.0>},[0]}] + C1Pid = test_util:fake_pid(node()), + C1 = {<<0>>, C1Pid}, + E = test_util:fake_pid(node()), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E, 2, 1), + %% CANNOT HAPPEN + {down,E,noproc}, + make_enqueue(E, 1, 0), + make_settle(C1, [0]) + ], + Conf = config(?FUNCTION_NAME, 0, 0, true, 0), + ?assert(single_active_prop(Conf, Commands, true)), + ok. + config(Name, Length, Bytes, SingleActive, DeliveryLimit) -> #{name => Name, max_length => map_max(Length), @@ -325,6 +489,66 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit) -> map_max(0) -> undefined; map_max(N) -> N. +single_active_prop(Conf0, Commands, ValidateOrder) -> + Conf = Conf0#{release_cursor_interval => 100}, + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + %% invariant: there can only be one active consumer at any one time + %% there can however be multiple cancelled consumers + Invariant = fun (#rabbit_fifo{consumers = Consumers}) -> + Up = maps:filter(fun (_, #consumer{status = S}) -> + S == up + end, Consumers), + map_size(Up) =< 1 + end, + try run_log(test_init(Conf), Entries, Invariant) of + {_State, Effects} when ValidateOrder -> + %% validate message ordering + Final = lists:foldl(fun ({send_msg, Pid, {delivery, Tag, Msgs}, ra_event}, + Acc) -> + validate_msg_order({Tag, Pid}, Msgs, Acc); + (_, Acc) -> + Acc + end, -1, Effects), + ct:pal("Final: ~p~n", [Final]), + true; + _ -> + true + catch + Err -> + ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]), + ct:pal("Err: ~p~n", [Err]), + false + end. + +%% single active consumer ordering invariant: +%% only redelivered messages can go backwards +validate_msg_order(_, [], S) -> + S; +validate_msg_order(Cid, [{_, {H, Num}} | Rem], PrevMax) -> + Redelivered = maps:is_key(delivery_count, H), + case undefined of + _ when Num == PrevMax + 1 -> + %% forwards case + validate_msg_order(Cid, Rem, Num); + _ when Redelivered andalso Num =< PrevMax -> + %% the seq is lower but this is a redelivery + %% when the consumer changed and the next messages has been redelivered + %% we may go backwards but keep the highest seen + validate_msg_order(Cid, Rem, PrevMax); + _ -> + ct:pal("out of order ~w Prev ~w Curr ~w Redel ~w", + [Cid, PrevMax, Num, Redelivered]), + throw({outoforder, Cid, PrevMax, Num}) + end. + + + + +dump_generated(Conf, Commands) -> + ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]), + true. + snapshots_prop(Conf, Commands) -> try run_snapshot_test(Conf, Commands) of _ -> true @@ -336,6 +560,9 @@ snapshots_prop(Conf, Commands) -> end. log_gen(Size) -> + log_gen(Size, binary()). + +log_gen(Size, _Body) -> Nodes = [node(), fakenode@fake, fakenode@fake2 @@ -358,6 +585,31 @@ log_gen(Size) -> {1, purge} ]))))). +log_gen_ordered(Size) -> + Nodes = [node(), + fakenode@fake, + fakenode@fake2 + ], + ?LET(EPids, vector(1, pid_gen(Nodes)), + ?LET(CPids, vector(5, pid_gen(Nodes)), + resize(Size, + list( + frequency( + [{20, enqueue_gen(oneof(EPids), 10, 0)}, + {40, {input_event, + frequency([{10, settle}, + {2, return}, + {1, discard}, + {1, requeue}])}}, + {2, checkout_gen(oneof(CPids))}, + {1, checkout_cancel_gen(oneof(CPids))}, + {1, down_gen(oneof(EPids ++ CPids))}, + {1, nodeup_gen(Nodes)} + ]))))). + +monotonic_gen() -> + ?LET(_, integer(), erlang:unique_integer([positive, monotonic])). + pid_gen(Nodes) -> ?LET(Node, oneof(Nodes), test_util:fake_pid(atom_to_binary(Node, utf8))). @@ -369,9 +621,12 @@ nodeup_gen(Nodes) -> {nodeup, oneof(Nodes)}. enqueue_gen(Pid) -> + enqueue_gen(Pid, 10, 1). + +enqueue_gen(Pid, Enq, Del) -> ?LET(E, {enqueue, Pid, - frequency([{10, enqueue}, - {1, delay}]), + frequency([{Enq, enqueue}, + {Del, delay}]), binary()}, E). checkout_cancel_gen(Pid) -> @@ -390,16 +645,22 @@ checkout_gen(Pid) -> enqueuers = #{} :: #{pid() => term()}, consumers = #{} :: #{{binary(), pid()} => term()}, effects = queue:new() :: queue:queue(), + %% to transform the body + enq_body_fun = {0, fun ra_lib:id/1}, log = [] :: list(), down = #{} :: #{pid() => noproc | noconnection} }). expand(Ops) -> + expand(Ops, {undefined, fun ra_lib:id/1}). + +expand(Ops, EnqFun) -> + ct:pal("OPs ~w", [Ops]), %% execute each command against a rabbit_fifo state and capture all relevant %% effects - T = #t{}, + T = #t{enq_body_fun = EnqFun}, #t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops), - %% process the remaining effects + %% process the remaining effect #t{log = Log} = lists:foldl(fun do_apply/2, T1#t{effects = queue:new()}, queue:to_list(Effs)), @@ -409,6 +670,7 @@ expand(Ops) -> handle_op({enqueue, Pid, When, Data}, #t{enqueuers = Enqs0, + enq_body_fun = {EnqSt0, Fun}, down = Down, effects = Effs} = T) -> case Down of @@ -419,13 +681,18 @@ handle_op({enqueue, Pid, When, Data}, _ -> Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0), MsgSeq = maps:get(Pid, Enqs), - Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data), + {EnqSt, Msg} = Fun({EnqSt0, Data}), + Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Msg), case When of enqueue -> - do_apply(Cmd, T#t{enqueuers = Enqs}); + % ct:pal("ENQ ~w", [Cmd]), + do_apply(Cmd, T#t{enqueuers = Enqs, + enq_body_fun = {EnqSt, Fun}}); delay -> %% just put the command on the effects queue - T#t{effects = queue:in(Cmd, Effs)} + T#t{effects = queue:in(Cmd, Effs), + enqueuers = Enqs, + enq_body_fun = {EnqSt, Fun}} end end; handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) -> @@ -485,8 +752,8 @@ handle_op({input_event, Settlement}, #t{effects = Effs, discard -> rabbit_fifo:make_discard(CId, MsgIds) end, do_apply(Cmd, T#t{effects = Q}); - {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue -> - case maps:is_key(element(2, Cmd), Down) of + {{value, {enqueue, Pid, _, _} = Cmd}, Q} -> + case maps:is_key(Pid, Down) of true -> %% enqueues cannot arrive after down for the same process %% drop message @@ -500,21 +767,31 @@ handle_op({input_event, Settlement}, #t{effects = Effs, handle_op(purge, T) -> do_apply(rabbit_fifo:make_purge(), T). -do_apply(Cmd, #t{effects = Effs, index = Index, state = S0, + +do_apply(Cmd, #t{effects = Effs, + index = Index, state = S0, + down = Down, log = Log} = T) -> - {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of - {S, _, E} when is_list(E) -> - {S, E}; - {S, _, E} -> - {S, [E]}; - {S, _} -> - {S, []} - end, - - T#t{state = St, - index = Index + 1, - effects = enq_effs(Effects, Effs), - log = [Cmd | Log]}. + case Cmd of + {enqueue, Pid, _, _} when is_map_key(Pid, Down) -> + ct:pal("Pid ~w is donw ~w", [Pid, Down]), + %% down + T; + _ -> + {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of + {S, _, E} when is_list(E) -> + {S, E}; + {S, _, E} -> + {S, [E]}; + {S, _} -> + {S, []} + end, + + T#t{state = St, + index = Index + 1, + effects = enq_effs(Effects, Effs), + log = [Cmd | Log]} + end. enq_effs([], Q) -> Q; enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) -> @@ -582,13 +859,27 @@ prefixes(Source, N, Acc) -> prefixes(Source, N+1, [X | Acc]). run_log(InitState, Entries) -> + run_log(InitState, Entries, fun ra_lib:id/1). + +run_log(InitState, Entries, InvariantFun) -> + Invariant = fun(E, S) -> + case InvariantFun(S) of + true -> ok; + false -> + throw({invariant, E, S}) + end + end, + lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> case rabbit_fifo:apply(meta(Idx), E, Acc0) of {Acc, _, Efx} when is_list(Efx) -> + Invariant(E, Acc), {Acc, Efx0 ++ Efx}; {Acc, _, Efx} -> + Invariant(E, Acc), {Acc, Efx0 ++ [Efx]}; {Acc, _} -> + Invariant(E, Acc), {Acc, Efx0} end end, {InitState, []}, Entries). |
