diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 111 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 66 |
2 files changed, 123 insertions, 54 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 4ed2bb743a..39dbd8f3f1 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -81,7 +81,8 @@ %% in enqueue messages. Used to ensure ordering of messages send from the %% same process --type msg_header() :: #{delivery_count => non_neg_integer()}. +-type msg_header() :: #{size := msg_size(), + delivery_count => non_neg_integer()}. %% The message header map: %% delivery_count: the number of unsuccessful delivery attempts. %% A non-zero value indicates a previous attempt. @@ -94,7 +95,7 @@ -type indexed_msg() :: {ra_index(), msg()}. --type prefix_msg() :: {'$prefix_msg', msg_size()}. +-type prefix_msg() :: {'$prefix_msg', msg_header()}. -type delivery_msg() :: {msg_id(), msg()}. %% A tuple consisting of the message id and the headered message. @@ -242,8 +243,8 @@ %% overflow calculations). %% This is done so that consumers are still served in a deterministic %% order on recovery. - prefix_msgs = {[], []} :: {Return :: [msg_size()], - PrefixMsgs :: [msg_size()]}, + prefix_msgs = {[], []} :: {Return :: [msg_header()], + PrefixMsgs :: [msg_header()]}, msg_bytes_enqueue = 0 :: non_neg_integer(), msg_bytes_checkout = 0 :: non_neg_integer(), max_length :: maybe(non_neg_integer()), @@ -970,11 +971,9 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1 end. apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> - Bytes = message_size(RawMsg), case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of {ok, State1, Effects1} -> - State2 = append_to_master_index(RaftIdx, - add_bytes_enqueue(Bytes, State1)), + State2 = append_to_master_index(RaftIdx, State1), {State, ok, Effects} = checkout(Meta, State2, Effects1), {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; {duplicate, State, Effects} -> @@ -991,7 +990,7 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) -> Effects = dead_letter_effects(maxlen, maps:put(none, FullMsg, #{}), State, Effects0), {State, Effects}; - {{'$prefix_msg', Bytes}, State1} -> + {{'$prefix_msg', #{size := Bytes}}, State1} -> State = add_bytes_drop(Bytes, State1), {State, Effects0}; empty -> @@ -1001,12 +1000,14 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) -> enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> - Msg = {RaftIdx, {#{}, RawMsg}}, % indexed message with header map - State0#state{messages = Messages#{NextMsgNum => Msg}, - % this is probably only done to record it when low_msg_num - % is undefined - low_msg_num = min(LowMsgNum, NextMsgNum), - next_msg_num = NextMsgNum + 1}. + Size = message_size(RawMsg), + Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map + State = add_bytes_enqueue(Size, State0), + State#state{messages = Messages#{NextMsgNum => Msg}, + % this is probably only done to record it when low_msg_num + % is undefined + low_msg_num = min(LowMsgNum, NextMsgNum), + next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, #state{ra_indexes = Indexes0} = State0) -> @@ -1088,11 +1089,14 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, credit = increase_credit(Con0, length(MsgNumMsgs))}, {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) -> - return_one(0, Msg, S0, E0, ConsumerId, Con); - ({MsgNum, Msg}, {S0, E0}) -> - return_one(MsgNum, Msg, S0, E0, ConsumerId, Con) - end, {State0, Effects1}, MsgNumMsgs), + {State1, Effects2} = lists:foldl( + fun({'$prefix_msg', _} = Msg, {S0, E0}) -> + return_one(0, Msg, S0, E0, + ConsumerId, Con); + ({MsgNum, Msg}, {S0, E0}) -> + return_one(MsgNum, Msg, S0, E0, + ConsumerId, Con) + end, {State0, Effects1}, MsgNumMsgs), checkout(Meta, State1#state{consumers = Cons, service_queue = SQ}, Effects2). @@ -1152,9 +1156,8 @@ dead_letter_effects(_Reason, _Discarded, Effects; dead_letter_effects(Reason, Discarded, #state{dead_letter_handler = {Mod, Fun, Args}}, Effects) -> - DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, - % MsgId, MsgIdID, RaftId, Header - Acc) -> [{Reason, Msg} | Acc] + DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) -> + [{Reason, Msg} | Acc] end, [], Discarded), [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects]. @@ -1200,24 +1203,44 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. -return_one(0, {'$prefix_msg', _} = Msg, - #state{returns = Returns} = State0, Effects, _ConsumerId, _Con) -> - {add_bytes_return(Msg, - State0#state{returns = lqueue:in(Msg, Returns)}), Effects}; +return_one(0, {'$prefix_msg', Header0}, + #state{returns = Returns, + delivery_limit = DeliveryLimit} = State0, Effects0, + ConsumerId, Con) -> + Header = maps:update_with(delivery_count, + fun (C) -> C+1 end, + 1, Header0), + Msg = {'$prefix_msg', Header}, + case maps:get(delivery_count, Header) of + DeliveryCount when DeliveryCount > DeliveryLimit -> + Checked = Con#consumer.checked_out, + {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked, + Effects0, State0), + {add_bytes_settle(Msg, State1), Effects}; + _ -> + %% this should not affect the release cursor in any way + {add_bytes_return(Msg, + State0#state{returns = lqueue:in(Msg, Returns)}), + Effects0} + end; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, #state{returns = Returns, - delivery_limit = DeliveryLimit} = State0, Effects0, ConsumerId, Con) -> + delivery_limit = DeliveryLimit} = State0, + Effects0, ConsumerId, Con) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), + Msg = {RaftId, {Header, RawMsg}}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0), - Checked = maps:without([MsgNum], Con#consumer.checked_out), - {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0), + DlMsg = {MsgNum, Msg}, + Effects = dead_letter_effects(rejected, maps:put(none, DlMsg, #{}), + State0, Effects0), + Checked = Con#consumer.checked_out, + {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, + Effects, State0), {add_bytes_settle(RawMsg, State1), Effects1}; _ -> - Msg = {RaftId, {Header, RawMsg}}, %% this should not affect the release cursor in any way {add_bytes_return(RawMsg, State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} @@ -1293,9 +1316,9 @@ append_send_msg_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages -take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) -> +take_next_msg(#state{prefix_msgs = {[Header | Rem], P}} = State) -> %% there are prefix returns, these should be served first - {{'$prefix_msg', Bytes}, + {{'$prefix_msg', Header}, State#state{prefix_msgs = {Rem, P}}}; take_next_msg(#state{returns = Returns, low_msg_num = Low0, @@ -1325,9 +1348,9 @@ take_next_msg(#state{returns = Returns, end end; empty -> - [Bytes | Rem] = P, + [Header | Rem] = P, %% There are prefix msgs - {{'$prefix_msg', Bytes}, + {{'$prefix_msg', Header}, State#state{prefix_msgs = {R, Rem}}} end. @@ -1486,15 +1509,15 @@ dehydrate_state(#state{messages = Messages, returns = Returns, prefix_msgs = {PrefRet0, PrefMsg0}} = State) -> %% TODO: optimise this function as far as possible - PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) -> - [Bytes | Acc]; - ({_, {_, {_, Raw}}}, Acc) -> - [message_size(Raw) | Acc] + PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) -> + [Header | Acc]; + ({_, {_, {Header, _}}}, Acc) -> + [Header | Acc] end, lists:reverse(PrefRet0), lqueue:to_list(Returns)), - PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) -> - [message_size(Raw) | Acc] + PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) -> + [Header| Acc] end, lists:reverse(PrefMsg0), lists:sort(maps:to_list(Messages))), @@ -1512,8 +1535,8 @@ dehydrate_state(#state{messages = Messages, dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> M; - (_, {_, {_, {_, Raw}}}) -> - {'$prefix_msg', message_size(Raw)} + (_, {_, {_, {Header, _}}}) -> + {'$prefix_msg', Header} end, Checked0), Con#consumer{checked_out = Checked}. @@ -1591,7 +1614,7 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout, message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, iolist_size(PFR); -message_size({'$prefix_msg', B}) -> +message_size({'$prefix_msg', #{size := B}}) -> B; message_size(B) when is_binary(B) -> byte_size(B); diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index b58cc9ced0..dd56659bda 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -5,8 +5,8 @@ -export([ ]). --include_lib("common_test/include/ct.hrl"). -include_lib("proper/include/proper.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). %%%=================================================================== @@ -36,7 +36,9 @@ all_tests() -> scenario12, scenario13, scenario14, - scenario15 + scenario15, + scenario16, + fake_pid ]. groups() -> @@ -251,20 +253,59 @@ scenario15(_Config) -> delivery_limit => 1}, Commands), ok. +scenario16(_Config) -> + C1Pid = c:pid(0,883,1), + C1 = {<<>>, C1Pid}, + C2 = {<<>>, c:pid(0,882,1)}, + E = c:pid(0,176,1), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E, 1, msg1), + make_checkout(C2, {auto,1,simple_prefetch}), + {down, C1Pid, noproc}, %% msg1 allocated to C2 + make_return(C2, [0]), %% msg1 returned + make_enqueue(E, 2, <<>>), + make_settle(C2, [0]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + delivery_limit => 1}, Commands), + ok. + +fake_pid(_Config) -> + Pid = fake_external_pid(<<"mynode@banana">>), + ?assertNotEqual(node(Pid), node()), + ?assert(is_pid(Pid)), + ok. + +fake_external_pid(Node) when is_binary(Node) -> + ThisNodeSize = size(term_to_binary(node())) + 1, + Pid = spawn(fun () -> ok end), + %% drop the local node data from a local pid + <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid), + S = size(Node), + %% replace it with the incoming node binary + Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>, + binary_to_term(Final). + snapshots(_Config) -> run_proper( fun () -> ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit}, frequency([{10, {0, 0, false, 0}}, - {5, {non_neg_integer(), non_neg_integer(), - boolean(), non_neg_integer()}}]), - ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)), - collect({Length, Bytes}, + {5, {oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]), + boolean(), + oneof([range(1, 3), undefined]) + }}]), + ?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)), + collect({log_size, length(O)}, snapshots_prop( config(?FUNCTION_NAME, - Length, Bytes, - SingleActiveConsumer, DeliveryLimit), O)))) - end, [], 2000). + Length, + Bytes, + SingleActiveConsumer, + DeliveryLimit), O)))) + end, [], 2500). config(Name, Length, Bytes, SingleActive, DeliveryLimit) -> #{name => Name, @@ -305,7 +346,10 @@ log_gen(Size) -> ]))))). pid_gen() -> - ?LET(_, integer(), spawn(fun () -> ok end)). + ?LET(Node, oneof([atom_to_binary(node(), utf8), + <<"fakenode@fake">>, + <<"fakenode@fake2">> + ]), fake_external_pid(Node)). down_gen(Pid) -> ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). @@ -493,6 +537,7 @@ run_snapshot_test0(Conf, Commands) -> State = rabbit_fifo:normalize(State0), [begin + % ct:pal("release_cursor: ~b~n", [SnapIdx]), %% drop all entries below and including the snapshot Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false @@ -506,6 +551,7 @@ run_snapshot_test0(Conf, Commands) -> ct:pal("Snapshot tests failed run log:~n" "~p~n from ~n~p~n Entries~n~p~n", [Filtered, SnapState, Entries]), + ct:pal("Expected~n~p~nGot:~n~p", [State, S]), ?assertEqual(State, S) end end || {release_cursor, SnapIdx, SnapState} <- Effects], |
