summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl11
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl339
2 files changed, 324 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index b966dca82e..9da3a6913c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -327,9 +327,16 @@ apply(Meta, {down, Pid, noconnection},
Credit = increase_credit(C0, maps:size(Checked)),
{St, Effs1} = return_all(State0, Effs,
Cid, C0#consumer{credit = Credit}),
- #{Cid := C} = St#?MODULE.consumers,
+ %% if the consumer was cancelled there is a chance it got
+ %% removed when returning hence we need to be defensive here
+ Waiting = case St#?MODULE.consumers of
+ #{Cid := C} ->
+ Waiting0 ++ [{Cid, C}];
+ _ ->
+ Waiting0
+ end,
{St#?MODULE{consumers = #{},
- waiting_consumers = Waiting0 ++ [{Cid, C}]},
+ waiting_consumers = Waiting},
Effs1};
_ -> {State0, []}
end,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 0f5fb8bf58..55e39ad8ec 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -8,6 +8,8 @@
-include_lib("proper/include/proper.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
+-include_lib("ra/include/ra.hrl").
+-include("src/rabbit_fifo.hrl").
%%%===================================================================
%%% Common Test callbacks
@@ -21,6 +23,7 @@ all() ->
all_tests() ->
[
+ test_run_log,
snapshots,
scenario1,
scenario2,
@@ -38,7 +41,14 @@ all_tests() ->
scenario14,
scenario15,
scenario16,
- scenario17
+ scenario17,
+ single_active,
+ single_active_01,
+ single_active_02,
+ single_active_03,
+ single_active_ordering,
+ single_active_ordering_01,
+ single_active_ordering_02
].
groups() ->
@@ -295,6 +305,82 @@ scenario17(_Config) ->
}, Commands),
ok.
+single_active_01(_Config) ->
+ C1Pid = test_util:fake_pid(rabbit@fake_node1),
+ C1 = {<<0>>, C1Pid},
+ C2Pid = test_util:fake_pid(rabbit@fake_node2),
+ C2 = {<<>>, C2Pid},
+ E = test_util:fake_pid(rabbit@fake_node2),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,<<"one">>),
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ make_checkout(C1, cancel),
+ {nodeup,rabbit@fake_node1}
+ ],
+ ?assert(
+ single_active_prop(#{name => ?FUNCTION_NAME,
+ single_active_consumer_on => true
+ }, Commands, false)),
+ ok.
+
+single_active_02(_Config) ->
+ C1Pid = test_util:fake_pid(node()),
+ C1 = {<<0>>, C1Pid},
+ C2Pid = test_util:fake_pid(node()),
+ C2 = {<<>>, C2Pid},
+ E = test_util:fake_pid(node()),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,<<"one">>),
+ {down,E,noconnection},
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ make_checkout(C2, cancel),
+ {down,E,noconnection}
+ ],
+ Conf = config(?FUNCTION_NAME, undefined, undefined, true, 1),
+ ?assert(single_active_prop(Conf, Commands, false)),
+ ok.
+
+single_active_03(_Config) ->
+ C1Pid = test_util:fake_pid(node()),
+ C1 = {<<0>>, C1Pid},
+ % C2Pid = test_util:fake_pid(rabbit@fake_node2),
+ % C2 = {<<>>, C2Pid},
+ Pid = test_util:fake_pid(node()),
+ E = test_util:fake_pid(rabbit@fake_node2),
+ Commands = [
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E, 1, 0),
+ make_enqueue(E, 2, 1),
+ {down, Pid, noconnection},
+ {nodeup, node()}
+ ],
+ Conf = config(?FUNCTION_NAME, 0, 0, true, 0),
+ ?assert(single_active_prop(Conf, Commands, true)),
+ ok.
+
+test_run_log(_Config) ->
+ Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end},
+ run_proper(
+ fun () ->
+ ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit},
+ frequency([{10, {0, 0, false, 0}},
+ {5, {oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined]),
+ boolean(),
+ oneof([range(1, 3), undefined])
+ }}]),
+ ?FORALL(O, ?LET(Ops, log_gen(100), expand(Ops, Fun)),
+ collect({log_size, length(O)},
+ dump_generated(
+ config(?FUNCTION_NAME,
+ Length,
+ Bytes,
+ SingleActiveConsumer,
+ DeliveryLimit), O))))
+ end, [], 10).
+
snapshots(_Config) ->
run_proper(
fun () ->
@@ -315,6 +401,84 @@ snapshots(_Config) ->
DeliveryLimit), O))))
end, [], 2500).
+single_active(_Config) ->
+ Size = 2000,
+ run_proper(
+ fun () ->
+ ?FORALL({Length, Bytes, DeliveryLimit},
+ frequency([{10, {0, 0, 0}},
+ {5, {oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined]),
+ oneof([range(1, 3), undefined])
+ }}]),
+ ?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops)),
+ collect({log_size, length(O)},
+ single_active_prop(
+ config(?FUNCTION_NAME,
+ Length,
+ Bytes,
+ true,
+ DeliveryLimit), O,
+ false))))
+ end, [], Size).
+
+single_active_ordering(_Config) ->
+ Size = 2000,
+ Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end},
+ run_proper(
+ fun () ->
+ ?FORALL(O, ?LET(Ops, log_gen_ordered(Size), expand(Ops, Fun)),
+ collect({log_size, length(O)},
+ single_active_prop(config(?FUNCTION_NAME,
+ undefined,
+ undefined,
+ true,
+ undefined), O,
+ true)))
+ end, [], Size).
+
+single_active_ordering_01(_Config) ->
+% [{enqueue,<0.145.0>,1,0},
+% {enqueue,<0.145.0>,1,1},
+% {checkout,{<<>>,<0.148.0>},{auto,1,simple_prefetch},#{ack => true,args => [],prefetch => 1,username => <<117,115,101,114>>}}
+% {enqueue,<0.140.0>,1,2},
+% {settle,{<<>>,<0.148.0>},[0]}]
+ C1Pid = test_util:fake_pid(node()),
+ C1 = {<<0>>, C1Pid},
+ E = test_util:fake_pid(rabbit@fake_node2),
+ E2 = test_util:fake_pid(rabbit@fake_node2),
+ Commands = [
+ make_enqueue(E, 1, 0),
+ make_enqueue(E, 2, 1),
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E2, 1, 2),
+ make_settle(C1, [0])
+ ],
+ Conf = config(?FUNCTION_NAME, 0, 0, true, 0),
+ ?assert(single_active_prop(Conf, Commands, true)),
+ ok.
+
+single_active_ordering_02(_Config) ->
+% [{checkout, % {<<>>,<0.177.0>}, % {auto,1,simple_prefetch},
+% {enqueue,<0.172.0>,2,1},
+% {down,<0.172.0>,noproc},
+% {enqueue,<0.172.0>,1,0},
+% {settle,{<<>>,<0.177.0>},[0]}]
+ C1Pid = test_util:fake_pid(node()),
+ C1 = {<<0>>, C1Pid},
+ E = test_util:fake_pid(node()),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E, 2, 1),
+ %% CANNOT HAPPEN
+ {down,E,noproc},
+ make_enqueue(E, 1, 0),
+ make_settle(C1, [0])
+ ],
+ Conf = config(?FUNCTION_NAME, 0, 0, true, 0),
+ ?assert(single_active_prop(Conf, Commands, true)),
+ ok.
+
config(Name, Length, Bytes, SingleActive, DeliveryLimit) ->
#{name => Name,
max_length => map_max(Length),
@@ -325,6 +489,66 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit) ->
map_max(0) -> undefined;
map_max(N) -> N.
+single_active_prop(Conf0, Commands, ValidateOrder) ->
+ Conf = Conf0#{release_cursor_interval => 100},
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ %% invariant: there can only be one active consumer at any one time
+ %% there can however be multiple cancelled consumers
+ Invariant = fun (#rabbit_fifo{consumers = Consumers}) ->
+ Up = maps:filter(fun (_, #consumer{status = S}) ->
+ S == up
+ end, Consumers),
+ map_size(Up) =< 1
+ end,
+ try run_log(test_init(Conf), Entries, Invariant) of
+ {_State, Effects} when ValidateOrder ->
+ %% validate message ordering
+ Final = lists:foldl(fun ({send_msg, Pid, {delivery, Tag, Msgs}, ra_event},
+ Acc) ->
+ validate_msg_order({Tag, Pid}, Msgs, Acc);
+ (_, Acc) ->
+ Acc
+ end, -1, Effects),
+ ct:pal("Final: ~p~n", [Final]),
+ true;
+ _ ->
+ true
+ catch
+ Err ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ ct:pal("Err: ~p~n", [Err]),
+ false
+ end.
+
+%% single active consumer ordering invariant:
+%% only redelivered messages can go backwards
+validate_msg_order(_, [], S) ->
+ S;
+validate_msg_order(Cid, [{_, {H, Num}} | Rem], PrevMax) ->
+ Redelivered = maps:is_key(delivery_count, H),
+ case undefined of
+ _ when Num == PrevMax + 1 ->
+ %% forwards case
+ validate_msg_order(Cid, Rem, Num);
+ _ when Redelivered andalso Num =< PrevMax ->
+ %% the seq is lower but this is a redelivery
+ %% when the consumer changed and the next messages has been redelivered
+ %% we may go backwards but keep the highest seen
+ validate_msg_order(Cid, Rem, PrevMax);
+ _ ->
+ ct:pal("out of order ~w Prev ~w Curr ~w Redel ~w",
+ [Cid, PrevMax, Num, Redelivered]),
+ throw({outoforder, Cid, PrevMax, Num})
+ end.
+
+
+
+
+dump_generated(Conf, Commands) ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ true.
+
snapshots_prop(Conf, Commands) ->
try run_snapshot_test(Conf, Commands) of
_ -> true
@@ -336,6 +560,9 @@ snapshots_prop(Conf, Commands) ->
end.
log_gen(Size) ->
+ log_gen(Size, binary()).
+
+log_gen(Size, _Body) ->
Nodes = [node(),
fakenode@fake,
fakenode@fake2
@@ -358,6 +585,31 @@ log_gen(Size) ->
{1, purge}
]))))).
+log_gen_ordered(Size) ->
+ Nodes = [node(),
+ fakenode@fake,
+ fakenode@fake2
+ ],
+ ?LET(EPids, vector(1, pid_gen(Nodes)),
+ ?LET(CPids, vector(5, pid_gen(Nodes)),
+ resize(Size,
+ list(
+ frequency(
+ [{20, enqueue_gen(oneof(EPids), 10, 0)},
+ {40, {input_event,
+ frequency([{10, settle},
+ {2, return},
+ {1, discard},
+ {1, requeue}])}},
+ {2, checkout_gen(oneof(CPids))},
+ {1, checkout_cancel_gen(oneof(CPids))},
+ {1, down_gen(oneof(EPids ++ CPids))},
+ {1, nodeup_gen(Nodes)}
+ ]))))).
+
+monotonic_gen() ->
+ ?LET(_, integer(), erlang:unique_integer([positive, monotonic])).
+
pid_gen(Nodes) ->
?LET(Node, oneof(Nodes),
test_util:fake_pid(atom_to_binary(Node, utf8))).
@@ -369,9 +621,12 @@ nodeup_gen(Nodes) ->
{nodeup, oneof(Nodes)}.
enqueue_gen(Pid) ->
+ enqueue_gen(Pid, 10, 1).
+
+enqueue_gen(Pid, Enq, Del) ->
?LET(E, {enqueue, Pid,
- frequency([{10, enqueue},
- {1, delay}]),
+ frequency([{Enq, enqueue},
+ {Del, delay}]),
binary()}, E).
checkout_cancel_gen(Pid) ->
@@ -390,16 +645,22 @@ checkout_gen(Pid) ->
enqueuers = #{} :: #{pid() => term()},
consumers = #{} :: #{{binary(), pid()} => term()},
effects = queue:new() :: queue:queue(),
+ %% to transform the body
+ enq_body_fun = {0, fun ra_lib:id/1},
log = [] :: list(),
down = #{} :: #{pid() => noproc | noconnection}
}).
expand(Ops) ->
+ expand(Ops, {undefined, fun ra_lib:id/1}).
+
+expand(Ops, EnqFun) ->
+ ct:pal("OPs ~w", [Ops]),
%% execute each command against a rabbit_fifo state and capture all relevant
%% effects
- T = #t{},
+ T = #t{enq_body_fun = EnqFun},
#t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops),
- %% process the remaining effects
+ %% process the remaining effect
#t{log = Log} = lists:foldl(fun do_apply/2,
T1#t{effects = queue:new()},
queue:to_list(Effs)),
@@ -409,6 +670,7 @@ expand(Ops) ->
handle_op({enqueue, Pid, When, Data},
#t{enqueuers = Enqs0,
+ enq_body_fun = {EnqSt0, Fun},
down = Down,
effects = Effs} = T) ->
case Down of
@@ -419,13 +681,18 @@ handle_op({enqueue, Pid, When, Data},
_ ->
Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
MsgSeq = maps:get(Pid, Enqs),
- Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data),
+ {EnqSt, Msg} = Fun({EnqSt0, Data}),
+ Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Msg),
case When of
enqueue ->
- do_apply(Cmd, T#t{enqueuers = Enqs});
+ % ct:pal("ENQ ~w", [Cmd]),
+ do_apply(Cmd, T#t{enqueuers = Enqs,
+ enq_body_fun = {EnqSt, Fun}});
delay ->
%% just put the command on the effects queue
- T#t{effects = queue:in(Cmd, Effs)}
+ T#t{effects = queue:in(Cmd, Effs),
+ enqueuers = Enqs,
+ enq_body_fun = {EnqSt, Fun}}
end
end;
handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) ->
@@ -485,8 +752,8 @@ handle_op({input_event, Settlement}, #t{effects = Effs,
discard -> rabbit_fifo:make_discard(CId, MsgIds)
end,
do_apply(Cmd, T#t{effects = Q});
- {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue ->
- case maps:is_key(element(2, Cmd), Down) of
+ {{value, {enqueue, Pid, _, _} = Cmd}, Q} ->
+ case maps:is_key(Pid, Down) of
true ->
%% enqueues cannot arrive after down for the same process
%% drop message
@@ -500,21 +767,31 @@ handle_op({input_event, Settlement}, #t{effects = Effs,
handle_op(purge, T) ->
do_apply(rabbit_fifo:make_purge(), T).
-do_apply(Cmd, #t{effects = Effs, index = Index, state = S0,
+
+do_apply(Cmd, #t{effects = Effs,
+ index = Index, state = S0,
+ down = Down,
log = Log} = T) ->
- {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of
- {S, _, E} when is_list(E) ->
- {S, E};
- {S, _, E} ->
- {S, [E]};
- {S, _} ->
- {S, []}
- end,
-
- T#t{state = St,
- index = Index + 1,
- effects = enq_effs(Effects, Effs),
- log = [Cmd | Log]}.
+ case Cmd of
+ {enqueue, Pid, _, _} when is_map_key(Pid, Down) ->
+ ct:pal("Pid ~w is donw ~w", [Pid, Down]),
+ %% down
+ T;
+ _ ->
+ {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of
+ {S, _, E} when is_list(E) ->
+ {S, E};
+ {S, _, E} ->
+ {S, [E]};
+ {S, _} ->
+ {S, []}
+ end,
+
+ T#t{state = St,
+ index = Index + 1,
+ effects = enq_effs(Effects, Effs),
+ log = [Cmd | Log]}
+ end.
enq_effs([], Q) -> Q;
enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
@@ -582,13 +859,27 @@ prefixes(Source, N, Acc) ->
prefixes(Source, N+1, [X | Acc]).
run_log(InitState, Entries) ->
+ run_log(InitState, Entries, fun ra_lib:id/1).
+
+run_log(InitState, Entries, InvariantFun) ->
+ Invariant = fun(E, S) ->
+ case InvariantFun(S) of
+ true -> ok;
+ false ->
+ throw({invariant, E, S})
+ end
+ end,
+
lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
case rabbit_fifo:apply(meta(Idx), E, Acc0) of
{Acc, _, Efx} when is_list(Efx) ->
+ Invariant(E, Acc),
{Acc, Efx0 ++ Efx};
{Acc, _, Efx} ->
+ Invariant(E, Acc),
{Acc, Efx0 ++ [Efx]};
{Acc, _} ->
+ Invariant(E, Acc),
{Acc, Efx0}
end
end, {InitState, []}, Entries).