summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl111
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl66
2 files changed, 123 insertions, 54 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 4ed2bb743a..39dbd8f3f1 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -81,7 +81,8 @@
%% in enqueue messages. Used to ensure ordering of messages send from the
%% same process
--type msg_header() :: #{delivery_count => non_neg_integer()}.
+-type msg_header() :: #{size := msg_size(),
+ delivery_count => non_neg_integer()}.
%% The message header map:
%% delivery_count: the number of unsuccessful delivery attempts.
%% A non-zero value indicates a previous attempt.
@@ -94,7 +95,7 @@
-type indexed_msg() :: {ra_index(), msg()}.
--type prefix_msg() :: {'$prefix_msg', msg_size()}.
+-type prefix_msg() :: {'$prefix_msg', msg_header()}.
-type delivery_msg() :: {msg_id(), msg()}.
%% A tuple consisting of the message id and the headered message.
@@ -242,8 +243,8 @@
%% overflow calculations).
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msgs = {[], []} :: {Return :: [msg_size()],
- PrefixMsgs :: [msg_size()]},
+ prefix_msgs = {[], []} :: {Return :: [msg_header()],
+ PrefixMsgs :: [msg_header()]},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
max_length :: maybe(non_neg_integer()),
@@ -970,11 +971,9 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
end.
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
- Bytes = message_size(RawMsg),
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
{ok, State1, Effects1} ->
- State2 = append_to_master_index(RaftIdx,
- add_bytes_enqueue(Bytes, State1)),
+ State2 = append_to_master_index(RaftIdx, State1),
{State, ok, Effects} = checkout(Meta, State2, Effects1),
{maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
{duplicate, State, Effects} ->
@@ -991,7 +990,7 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
Effects = dead_letter_effects(maxlen, maps:put(none, FullMsg, #{}),
State, Effects0),
{State, Effects};
- {{'$prefix_msg', Bytes}, State1} ->
+ {{'$prefix_msg', #{size := Bytes}}, State1} ->
State = add_bytes_drop(Bytes, State1),
{State, Effects0};
empty ->
@@ -1001,12 +1000,14 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
- Msg = {RaftIdx, {#{}, RawMsg}}, % indexed message with header map
- State0#state{messages = Messages#{NextMsgNum => Msg},
- % this is probably only done to record it when low_msg_num
- % is undefined
- low_msg_num = min(LowMsgNum, NextMsgNum),
- next_msg_num = NextMsgNum + 1}.
+ Size = message_size(RawMsg),
+ Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map
+ State = add_bytes_enqueue(Size, State0),
+ State#state{messages = Messages#{NextMsgNum => Msg},
+ % this is probably only done to record it when low_msg_num
+ % is undefined
+ low_msg_num = min(LowMsgNum, NextMsgNum),
+ next_msg_num = NextMsgNum + 1}.
append_to_master_index(RaftIdx,
#state{ra_indexes = Indexes0} = State0) ->
@@ -1088,11 +1089,14 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
- return_one(0, Msg, S0, E0, ConsumerId, Con);
- ({MsgNum, Msg}, {S0, E0}) ->
- return_one(MsgNum, Msg, S0, E0, ConsumerId, Con)
- end, {State0, Effects1}, MsgNumMsgs),
+ {State1, Effects2} = lists:foldl(
+ fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
+ return_one(0, Msg, S0, E0,
+ ConsumerId, Con);
+ ({MsgNum, Msg}, {S0, E0}) ->
+ return_one(MsgNum, Msg, S0, E0,
+ ConsumerId, Con)
+ end, {State0, Effects1}, MsgNumMsgs),
checkout(Meta, State1#state{consumers = Cons,
service_queue = SQ},
Effects2).
@@ -1152,9 +1156,8 @@ dead_letter_effects(_Reason, _Discarded,
Effects;
dead_letter_effects(Reason, Discarded,
#state{dead_letter_handler = {Mod, Fun, Args}}, Effects) ->
- DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}},
- % MsgId, MsgIdID, RaftId, Header
- Acc) -> [{Reason, Msg} | Acc]
+ DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) ->
+ [{Reason, Msg} | Acc]
end, [], Discarded),
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
@@ -1200,24 +1203,44 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
-return_one(0, {'$prefix_msg', _} = Msg,
- #state{returns = Returns} = State0, Effects, _ConsumerId, _Con) ->
- {add_bytes_return(Msg,
- State0#state{returns = lqueue:in(Msg, Returns)}), Effects};
+return_one(0, {'$prefix_msg', Header0},
+ #state{returns = Returns,
+ delivery_limit = DeliveryLimit} = State0, Effects0,
+ ConsumerId, Con) ->
+ Header = maps:update_with(delivery_count,
+ fun (C) -> C+1 end,
+ 1, Header0),
+ Msg = {'$prefix_msg', Header},
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ Checked = Con#consumer.checked_out,
+ {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked,
+ Effects0, State0),
+ {add_bytes_settle(Msg, State1), Effects};
+ _ ->
+ %% this should not affect the release cursor in any way
+ {add_bytes_return(Msg,
+ State0#state{returns = lqueue:in(Msg, Returns)}),
+ Effects0}
+ end;
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
#state{returns = Returns,
- delivery_limit = DeliveryLimit} = State0, Effects0, ConsumerId, Con) ->
+ delivery_limit = DeliveryLimit} = State0,
+ Effects0, ConsumerId, Con) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
+ Msg = {RaftId, {Header, RawMsg}},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0),
- Checked = maps:without([MsgNum], Con#consumer.checked_out),
- {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0),
+ DlMsg = {MsgNum, Msg},
+ Effects = dead_letter_effects(rejected, maps:put(none, DlMsg, #{}),
+ State0, Effects0),
+ Checked = Con#consumer.checked_out,
+ {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked,
+ Effects, State0),
{add_bytes_settle(RawMsg, State1), Effects1};
_ ->
- Msg = {RaftId, {Header, RawMsg}},
%% this should not affect the release cursor in any way
{add_bytes_return(RawMsg,
State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0}
@@ -1293,9 +1316,9 @@ append_send_msg_effects(Effects0, AccMap) ->
%%
%% When we return it is always done to the current return queue
%% for both prefix messages and current messages
-take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) ->
+take_next_msg(#state{prefix_msgs = {[Header | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
- {{'$prefix_msg', Bytes},
+ {{'$prefix_msg', Header},
State#state{prefix_msgs = {Rem, P}}};
take_next_msg(#state{returns = Returns,
low_msg_num = Low0,
@@ -1325,9 +1348,9 @@ take_next_msg(#state{returns = Returns,
end
end;
empty ->
- [Bytes | Rem] = P,
+ [Header | Rem] = P,
%% There are prefix msgs
- {{'$prefix_msg', Bytes},
+ {{'$prefix_msg', Header},
State#state{prefix_msgs = {R, Rem}}}
end.
@@ -1486,15 +1509,15 @@ dehydrate_state(#state{messages = Messages,
returns = Returns,
prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
%% TODO: optimise this function as far as possible
- PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) ->
- [Bytes | Acc];
- ({_, {_, {_, Raw}}}, Acc) ->
- [message_size(Raw) | Acc]
+ PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) ->
+ [Header | Acc];
+ ({_, {_, {Header, _}}}, Acc) ->
+ [Header | Acc]
end,
lists:reverse(PrefRet0),
lqueue:to_list(Returns)),
- PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) ->
- [message_size(Raw) | Acc]
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) ->
+ [Header| Acc]
end,
lists:reverse(PrefMsg0),
lists:sort(maps:to_list(Messages))),
@@ -1512,8 +1535,8 @@ dehydrate_state(#state{messages = Messages,
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
M;
- (_, {_, {_, {_, Raw}}}) ->
- {'$prefix_msg', message_size(Raw)}
+ (_, {_, {_, {Header, _}}}) ->
+ {'$prefix_msg', Header}
end, Checked0),
Con#consumer{checked_out = Checked}.
@@ -1591,7 +1614,7 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
-message_size({'$prefix_msg', B}) ->
+message_size({'$prefix_msg', #{size := B}}) ->
B;
message_size(B) when is_binary(B) ->
byte_size(B);
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index b58cc9ced0..dd56659bda 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -5,8 +5,8 @@
-export([
]).
--include_lib("common_test/include/ct.hrl").
-include_lib("proper/include/proper.hrl").
+-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
%%%===================================================================
@@ -36,7 +36,9 @@ all_tests() ->
scenario12,
scenario13,
scenario14,
- scenario15
+ scenario15,
+ scenario16,
+ fake_pid
].
groups() ->
@@ -251,20 +253,59 @@ scenario15(_Config) ->
delivery_limit => 1}, Commands),
ok.
+scenario16(_Config) ->
+ C1Pid = c:pid(0,883,1),
+ C1 = {<<>>, C1Pid},
+ C2 = {<<>>, c:pid(0,882,1)},
+ E = c:pid(0,176,1),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E, 1, msg1),
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ {down, C1Pid, noproc}, %% msg1 allocated to C2
+ make_return(C2, [0]), %% msg1 returned
+ make_enqueue(E, 2, <<>>),
+ make_settle(C2, [0])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ delivery_limit => 1}, Commands),
+ ok.
+
+fake_pid(_Config) ->
+ Pid = fake_external_pid(<<"mynode@banana">>),
+ ?assertNotEqual(node(Pid), node()),
+ ?assert(is_pid(Pid)),
+ ok.
+
+fake_external_pid(Node) when is_binary(Node) ->
+ ThisNodeSize = size(term_to_binary(node())) + 1,
+ Pid = spawn(fun () -> ok end),
+ %% drop the local node data from a local pid
+ <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid),
+ S = size(Node),
+ %% replace it with the incoming node binary
+ Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>,
+ binary_to_term(Final).
+
snapshots(_Config) ->
run_proper(
fun () ->
?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit},
frequency([{10, {0, 0, false, 0}},
- {5, {non_neg_integer(), non_neg_integer(),
- boolean(), non_neg_integer()}}]),
- ?FORALL(O, ?LET(Ops, log_gen(200), expand(Ops)),
- collect({Length, Bytes},
+ {5, {oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined]),
+ boolean(),
+ oneof([range(1, 3), undefined])
+ }}]),
+ ?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)),
+ collect({log_size, length(O)},
snapshots_prop(
config(?FUNCTION_NAME,
- Length, Bytes,
- SingleActiveConsumer, DeliveryLimit), O))))
- end, [], 2000).
+ Length,
+ Bytes,
+ SingleActiveConsumer,
+ DeliveryLimit), O))))
+ end, [], 2500).
config(Name, Length, Bytes, SingleActive, DeliveryLimit) ->
#{name => Name,
@@ -305,7 +346,10 @@ log_gen(Size) ->
]))))).
pid_gen() ->
- ?LET(_, integer(), spawn(fun () -> ok end)).
+ ?LET(Node, oneof([atom_to_binary(node(), utf8),
+ <<"fakenode@fake">>,
+ <<"fakenode@fake2">>
+ ]), fake_external_pid(Node)).
down_gen(Pid) ->
?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
@@ -493,6 +537,7 @@ run_snapshot_test0(Conf, Commands) ->
State = rabbit_fifo:normalize(State0),
[begin
+ % ct:pal("release_cursor: ~b~n", [SnapIdx]),
%% drop all entries below and including the snapshot
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
@@ -506,6 +551,7 @@ run_snapshot_test0(Conf, Commands) ->
ct:pal("Snapshot tests failed run log:~n"
"~p~n from ~n~p~n Entries~n~p~n",
[Filtered, SnapState, Entries]),
+ ct:pal("Expected~n~p~nGot:~n~p", [State, S]),
?assertEqual(State, S)
end
end || {release_cursor, SnapIdx, SnapState} <- Effects],