diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-22 12:28:26 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-22 14:14:31 +0000 |
| commit | 723972b3cbdf63e836c26150540b868833f30df8 (patch) | |
| tree | be06a96378691b700625412fbb462966fced795e /src | |
| parent | e66284166322fce2a22218ecd5941d15f589a330 (diff) | |
| download | rabbitmq-server-git-723972b3cbdf63e836c26150540b868833f30df8.tar.gz | |
Fix rabbit_fifo poison handling
By ensuring the delivery count is retained when "dehydrating" the state
in preparation for snapshotting. Now the entire message header map is
stored which will take additional space w.r.t to keynamne duplication
although this can be optimised.
Also updated the property test to generate fake pids for multiple nodes
so that multi node scenarios are better covered.
[#163513253]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 111 |
1 files changed, 67 insertions, 44 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 4ed2bb743a..39dbd8f3f1 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -81,7 +81,8 @@ %% in enqueue messages. Used to ensure ordering of messages send from the %% same process --type msg_header() :: #{delivery_count => non_neg_integer()}. +-type msg_header() :: #{size := msg_size(), + delivery_count => non_neg_integer()}. %% The message header map: %% delivery_count: the number of unsuccessful delivery attempts. %% A non-zero value indicates a previous attempt. @@ -94,7 +95,7 @@ -type indexed_msg() :: {ra_index(), msg()}. --type prefix_msg() :: {'$prefix_msg', msg_size()}. +-type prefix_msg() :: {'$prefix_msg', msg_header()}. -type delivery_msg() :: {msg_id(), msg()}. %% A tuple consisting of the message id and the headered message. @@ -242,8 +243,8 @@ %% overflow calculations). %% This is done so that consumers are still served in a deterministic %% order on recovery. - prefix_msgs = {[], []} :: {Return :: [msg_size()], - PrefixMsgs :: [msg_size()]}, + prefix_msgs = {[], []} :: {Return :: [msg_header()], + PrefixMsgs :: [msg_header()]}, msg_bytes_enqueue = 0 :: non_neg_integer(), msg_bytes_checkout = 0 :: non_neg_integer(), max_length :: maybe(non_neg_integer()), @@ -970,11 +971,9 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1 end. apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> - Bytes = message_size(RawMsg), case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of {ok, State1, Effects1} -> - State2 = append_to_master_index(RaftIdx, - add_bytes_enqueue(Bytes, State1)), + State2 = append_to_master_index(RaftIdx, State1), {State, ok, Effects} = checkout(Meta, State2, Effects1), {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects}; {duplicate, State, Effects} -> @@ -991,7 +990,7 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) -> Effects = dead_letter_effects(maxlen, maps:put(none, FullMsg, #{}), State, Effects0), {State, Effects}; - {{'$prefix_msg', Bytes}, State1} -> + {{'$prefix_msg', #{size := Bytes}}, State1} -> State = add_bytes_drop(Bytes, State1), {State, Effects0}; empty -> @@ -1001,12 +1000,14 @@ drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) -> enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> - Msg = {RaftIdx, {#{}, RawMsg}}, % indexed message with header map - State0#state{messages = Messages#{NextMsgNum => Msg}, - % this is probably only done to record it when low_msg_num - % is undefined - low_msg_num = min(LowMsgNum, NextMsgNum), - next_msg_num = NextMsgNum + 1}. + Size = message_size(RawMsg), + Msg = {RaftIdx, {#{size => Size}, RawMsg}}, % indexed message with header map + State = add_bytes_enqueue(Size, State0), + State#state{messages = Messages#{NextMsgNum => Msg}, + % this is probably only done to record it when low_msg_num + % is undefined + low_msg_num = min(LowMsgNum, NextMsgNum), + next_msg_num = NextMsgNum + 1}. append_to_master_index(RaftIdx, #state{ra_indexes = Indexes0} = State0) -> @@ -1088,11 +1089,14 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, credit = increase_credit(Con0, length(MsgNumMsgs))}, {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) -> - return_one(0, Msg, S0, E0, ConsumerId, Con); - ({MsgNum, Msg}, {S0, E0}) -> - return_one(MsgNum, Msg, S0, E0, ConsumerId, Con) - end, {State0, Effects1}, MsgNumMsgs), + {State1, Effects2} = lists:foldl( + fun({'$prefix_msg', _} = Msg, {S0, E0}) -> + return_one(0, Msg, S0, E0, + ConsumerId, Con); + ({MsgNum, Msg}, {S0, E0}) -> + return_one(MsgNum, Msg, S0, E0, + ConsumerId, Con) + end, {State0, Effects1}, MsgNumMsgs), checkout(Meta, State1#state{consumers = Cons, service_queue = SQ}, Effects2). @@ -1152,9 +1156,8 @@ dead_letter_effects(_Reason, _Discarded, Effects; dead_letter_effects(Reason, Discarded, #state{dead_letter_handler = {Mod, Fun, Args}}, Effects) -> - DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, - % MsgId, MsgIdID, RaftId, Header - Acc) -> [{Reason, Msg} | Acc] + DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}}, Acc) -> + [{Reason, Msg} | Acc] end, [], Discarded), [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects]. @@ -1200,24 +1203,44 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. -return_one(0, {'$prefix_msg', _} = Msg, - #state{returns = Returns} = State0, Effects, _ConsumerId, _Con) -> - {add_bytes_return(Msg, - State0#state{returns = lqueue:in(Msg, Returns)}), Effects}; +return_one(0, {'$prefix_msg', Header0}, + #state{returns = Returns, + delivery_limit = DeliveryLimit} = State0, Effects0, + ConsumerId, Con) -> + Header = maps:update_with(delivery_count, + fun (C) -> C+1 end, + 1, Header0), + Msg = {'$prefix_msg', Header}, + case maps:get(delivery_count, Header) of + DeliveryCount when DeliveryCount > DeliveryLimit -> + Checked = Con#consumer.checked_out, + {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked, + Effects0, State0), + {add_bytes_settle(Msg, State1), Effects}; + _ -> + %% this should not affect the release cursor in any way + {add_bytes_return(Msg, + State0#state{returns = lqueue:in(Msg, Returns)}), + Effects0} + end; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, #state{returns = Returns, - delivery_limit = DeliveryLimit} = State0, Effects0, ConsumerId, Con) -> + delivery_limit = DeliveryLimit} = State0, + Effects0, ConsumerId, Con) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), + Msg = {RaftId, {Header, RawMsg}}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0), - Checked = maps:without([MsgNum], Con#consumer.checked_out), - {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, Effects, State0), + DlMsg = {MsgNum, Msg}, + Effects = dead_letter_effects(rejected, maps:put(none, DlMsg, #{}), + State0, Effects0), + Checked = Con#consumer.checked_out, + {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, + Effects, State0), {add_bytes_settle(RawMsg, State1), Effects1}; _ -> - Msg = {RaftId, {Header, RawMsg}}, %% this should not affect the release cursor in any way {add_bytes_return(RawMsg, State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} @@ -1293,9 +1316,9 @@ append_send_msg_effects(Effects0, AccMap) -> %% %% When we return it is always done to the current return queue %% for both prefix messages and current messages -take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) -> +take_next_msg(#state{prefix_msgs = {[Header | Rem], P}} = State) -> %% there are prefix returns, these should be served first - {{'$prefix_msg', Bytes}, + {{'$prefix_msg', Header}, State#state{prefix_msgs = {Rem, P}}}; take_next_msg(#state{returns = Returns, low_msg_num = Low0, @@ -1325,9 +1348,9 @@ take_next_msg(#state{returns = Returns, end end; empty -> - [Bytes | Rem] = P, + [Header | Rem] = P, %% There are prefix msgs - {{'$prefix_msg', Bytes}, + {{'$prefix_msg', Header}, State#state{prefix_msgs = {R, Rem}}} end. @@ -1486,15 +1509,15 @@ dehydrate_state(#state{messages = Messages, returns = Returns, prefix_msgs = {PrefRet0, PrefMsg0}} = State) -> %% TODO: optimise this function as far as possible - PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) -> - [Bytes | Acc]; - ({_, {_, {_, Raw}}}, Acc) -> - [message_size(Raw) | Acc] + PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) -> + [Header | Acc]; + ({_, {_, {Header, _}}}, Acc) -> + [Header | Acc] end, lists:reverse(PrefRet0), lqueue:to_list(Returns)), - PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) -> - [message_size(Raw) | Acc] + PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {Header, _}}}, Acc) -> + [Header| Acc] end, lists:reverse(PrefMsg0), lists:sort(maps:to_list(Messages))), @@ -1512,8 +1535,8 @@ dehydrate_state(#state{messages = Messages, dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> M; - (_, {_, {_, {_, Raw}}}) -> - {'$prefix_msg', message_size(Raw)} + (_, {_, {_, {Header, _}}}) -> + {'$prefix_msg', Header} end, Checked0), Con#consumer{checked_out = Checked}. @@ -1591,7 +1614,7 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout, message_size(#basic_message{content = Content}) -> #content{payload_fragments_rev = PFR} = Content, iolist_size(PFR); -message_size({'$prefix_msg', B}) -> +message_size({'$prefix_msg', #{size := B}}) -> B; message_size(B) when is_binary(B) -> byte_size(B); |
